Skip to content

OpenDAL Extractor

Polls files from local filesystems, cloud storage (S3, GCS, Azure Blob), and other sources supported by OpenDAL. Processes matching files according to path patterns and parser configuration.

Configuration

toml
[sources.file_source.extractor]
type = "opendal"
kind = "fs"
polling_interval = "30s"
recursive = true
path_patterns = ["**/*.json"]
parser = "json"
parameters = { root = "/path/to/events" }

Parameters

ParameterTypeDefaultDescription
kindstringStorage service type ("fs", "s3", "gcs", "azblob", etc.)
parametersobjectService-specific configuration (root, bucket, credentials, etc.)
polling_intervaldurationInterval between polls (e.g. "10s", "1m")
path_patternsarrayGlob patterns to match files
parserstring"auto"How to parse file contents
recursivebooleantrueSearch subdirectories recursively
try_read_headers_jsonbooleanfalseRead headers from companion .headers.json files
metadataobjectStatic metadata for all events; context.source is auto-populated if unset

Parsers

ParserFormatOutputDocumentation
autoAuto-detectVariesDefault — detects by extension
jsonJSON1 messageSingle JSON object
jsonlJSON LinesN messagesOne per line
csv_rowCSVN messagesOne per row
textPlain text1 messageComplete file
text_linePlain textN messagesOne per line
xmlXML1 messageXML to JSON (parser_xml feature)
tapTAP1 messageTest results (parser_tap feature)
metadataAny1 messageFile metadata only

→ Complete Parsers Documentation

Examples

Local filesystem

toml
[sources.local_events.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
path_patterns = ["**/*.json"]
parser = "json"
parameters = { root = "/var/events" }

AWS S3

toml
[sources.s3_events.extractor]
type = "opendal"
kind = "s3"
polling_interval = "1m"
path_patterns = ["events/*.json", "logs/*.csv"]
parser = "auto"

[sources.s3_events.extractor.parameters]
bucket = "my-events-bucket"
region = "us-west-2"
access_key_id = "AKIA..."
secret_access_key = "..."