Sources
Sources collect events from external systems and feed them into the CDviz pipeline.
Quick Reference
[sources.my_source]
enabled = true
transformer_refs = ["my_transformer"]
[sources.my_source.extractor]
type = "webhook" # webhook | opendal | sse | kafka | nats | noop
# ... extractor-specific parametersThe opendal extractor supports parsers for different file formats (json, jsonl, csv, xml, tap, text, etc.). Other extractors parse their native protocol format directly.
Messages
A Message is composed of:
metadata: aMap<String, JSON>- Includes base extractor metadata with automaticcontext.sourcepopulationheaders: aMap<String, String>body: aJSONlike structure, also namedpayload
Extractor Metadata Configuration
All extractors accept a metadata field to inject static key/value pairs into every event — useful for tagging events with environment, team, or region without a custom transformer.
If metadata.context.source is not set, it defaults to {http.root_url}/?source={source_name}.
[sources.my_webhook.extractor]
type = "webhook"
id = "events"
metadata.environment = "production"
metadata.team = "platform"
# metadata.context.source = "/my-custom-source" # override auto-generated URLAvailable Extractors
| Type | Description | Use Cases |
|---|---|---|
noop | No-operation extractor for testing | Configuration testing, pipeline validation |
webhook | HTTP webhook endpoints | CI/CD systems, GitHub/GitLab webhooks, API integrations |
opendal | File system and cloud storage | Log files, artifact monitoring, batch processing |
sse | Server-Sent Events client | Real-time event streams, SSE endpoints |
kafka | Apache Kafka consumer | Event streaming, message queues, Kafka-compatible brokers |
nats | NATS Core / JetStream consumer | Cloud-native messaging, lightweight pub/sub, JetStream |
Shared Configuration
Header Configuration
Headers are used differently by various components:
- Header Validation: Validate incoming HTTP requests (Source webhook, Sink SSE)
- Header Authentication: Authenticate outgoing HTTP requests (Source SSE, Sink webhook)
→ Header Validation Guide - For incoming request validation → Header Authentication Guide - For outgoing request authentication
Loader
No configuration required. The loader converts each Message into a CDEvent:
- Computes a content-based
context.id(CID) whencontext.idis"0"or absent — enables downstream deduplication. - Serializes the message body as a CDEvent.
- Pushes the CDEvent to all configured Sinks.
Examples
Some examples come from the cdviz-collector repository (you can look at cdviz-collector.toml to see an up-to-date configuration)
Read a CDEvent from json files
Read from 1 folder, with 1 json file already in cdevents format.
[sources.cdevents_local_json]
enabled = false
[sources.cdevents_local_json.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "./source" }
recursive = true
path_patterns = ["**/*.json"]
parser = "json"As this source (with this name) is already part of the base configuration, You only need to copy (and rename) it or to enable it and override the parameters you want.
[sources.cdevents_local_json]
enabled = true
[sources.cdevents_local_json.extractor]
parameters = { root = "./inputs/cdevents_json" }Read a CSV file
Read a CSV file from local filesystem and convert each row into a CDEvents: 1 row/line -> 1 message -> 1 event
[sources.cdevents_local_csv]
enabled = true
transformer_refs = ["service_deployed"]
[sources.cdevents_local_csv.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "./inputs" }
recursive = false
path_patterns = ["cdevents.csv"]
parser = "csv_row"
[transformers.service_deployed]
type = "vrl"
template = """
[{
"metadata": .metadata,
"headers": .headers,
"body": {
"context": {
"version": "0.4.1",
"type": "dev.cdevents.service.deployed.0.1.1",
"timestamp": .body.timestamp,
},
"subject": {
"id": .body.id,
"type": "service",
"content": {
"environment": {
"id": .body.env,
},
"artifactId": .body.artifact_id,
}
}
}
}]
"""