OpenDAL Extractor ​
Polls files from local filesystems, cloud storage (S3, GCS, Azure Blob), and other sources supported by OpenDAL. Processes matching files according to path patterns and parser configuration.
Configuration ​
toml
[sources.file_source.extractor]
type = "opendal"
kind = "fs"
polling_interval = "30s"
recursive = true
path_patterns = ["**/*.json"]
parser = "json"
parameters = { root = "/path/to/events" }Parameters ​
| Parameter | Type | Default | Description |
|---|---|---|---|
kind | string | — | Storage service type ("fs", "s3", "gcs", "azblob", etc.) |
parameters | object | — | Service-specific configuration (root, bucket, credentials, etc.) |
polling_interval | duration | — | Interval between polls (e.g. "10s", "1m") |
path_patterns | array | — | Glob patterns to match files |
parser | string | "auto" | How to parse file contents |
recursive | boolean | true | Search subdirectories recursively |
try_read_headers_json | boolean | false | Read headers from companion .headers.json files |
metadata | object | — | Static metadata for all events; context.source is auto-populated if unset |
Parsers ​
| Parser | Format | Output | Documentation |
|---|---|---|---|
auto | Auto-detect | Varies | Default — detects by extension |
json | JSON | 1 message | Single JSON object |
jsonl | JSON Lines | N messages | One per line |
csv_row | CSV | N messages | One per row |
text | Plain text | 1 message | Complete file |
text_line | Plain text | N messages | One per line |
xml | XML | 1 message | XML to JSON (parser_xml feature) |
tap | TAP | 1 message | Test results (parser_tap feature) |
metadata | Any | 1 message | File metadata only |
→ Complete Parsers Documentation
Examples ​
Local filesystem ​
toml
[sources.local_events.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
path_patterns = ["**/*.json"]
parser = "json"
parameters = { root = "/var/events" }AWS S3 ​
toml
[sources.s3_events.extractor]
type = "opendal"
kind = "s3"
polling_interval = "1m"
path_patterns = ["events/*.json", "logs/*.csv"]
parser = "auto"
[sources.s3_events.extractor.parameters]
bucket = "my-events-bucket"
region = "us-west-2"
access_key_id = "AKIA..."
secret_access_key = "..."