Sources ​
Sources collect events from external systems and feed them into the CDviz pipeline.
Quick Reference ​
[sources.my_source]
enabled = true
transformer_refs = ["my_transformer"] # Optional processing
[sources.my_source.extractor]
type = "webhook" # or "opendal", "sse", "noop"
# ... extractor-specific parametersCommon Parameters:
enabled- Enable/disable the sourceextractor- How to collect events (required)transformer_refs- Optional event processing chain
[sources.aaaa]
enabled = false
transformer_refs = ["bbbb", "log"]
[sources.aaaa.extractor]
type = "xzy"
parser = "json"
# ... parameters for `xzy` extractor
[transformers.bbbb]
type = "abc"
# ... parameters for `abc` transformerMessages ​
A Message is composed of:
metadata: aMap<String, JSON>- Includes base extractor metadata with automaticcontext.sourcepopulationheaders: aMap<String, String>body: aJSONlike structure, also namedpayload
Extractor Metadata Configuration ​
All extractors support a metadata field for injecting static metadata into every event. This is useful for adding context information without creating custom transformers.
Automatic context.source Population ​
If metadata.context.source is not explicitly set, it will be automatically populated using the pattern:
{http.root_url}/?source={source_name}Where:
{http.root_url}is configured in[http]section (default:http://cdviz-collector.example.com){source_name}is the configuration key for the source (e.g.,github_webhook,file_source)
Example: Custom Metadata ​
[http]
root_url = "https://cdviz.example.com"
[sources.my_webhook]
enabled = true
[sources.my_webhook.extractor]
type = "webhook"
id = "custom-events"
# Custom metadata injected into all events
[sources.my_webhook.extractor.metadata]
environment = "production"
team = "platform"
[sources.my_webhook.extractor.metadata.context]
source = "/my-custom-source" # Override automatic source URLExample: Automatic Source URL ​
[http]
root_url = "https://cdviz.example.com"
[sources.github_webhook]
enabled = true
[sources.github_webhook.extractor]
type = "webhook"
id = "github"
# No metadata.context.source specified
# Will automatically be set to: https://cdviz.example.com/?source=github_webhookUse Cases ​
Avoid Creating Transformers for Static Metadata: Instead of creating a VRL transformer just to add static fields, use the metadata configuration:
# Before: Required a transformer to add static fields
[sources.my_webhook]
transformer_refs = ["add_metadata", "my_transform"]
# After: Use extractor metadata directly
[sources.my_webhook]
transformer_refs = ["my_transform"]
[sources.my_webhook.extractor]
type = "webhook"
id = "events"
[sources.my_webhook.extractor.metadata]
datacenter = "us-east-1"
version = "v2"Environment-Specific Configuration: Tag events with environment information:
[sources.production_webhook.extractor]
type = "webhook"
id = "prod"
metadata.environment_id = "/production/eu-1"
metadata.criticality = "high"Extractors ​
CDviz Collector supports several types of extractors for different event sources. Each extractor type has its own configuration options and use cases.
Available Extractors ​
| Type | Description | Use Cases |
|---|---|---|
noop | No-operation extractor for testing | Configuration testing, pipeline validation |
webhook | HTTP webhook endpoints | CI/CD systems, GitHub/GitLab webhooks, API integrations |
opendal | File system and cloud storage | Log files, artifact monitoring, batch processing |
sse | Server-Sent Events client | Real-time event streams, SSE endpoints |
Quick Reference ​
Noop/Sleep Extractor ​
For testing and development:
[sources.test.extractor]
type = "noop" # or "sleep"Webhook Extractor ​
For HTTP-based event ingestion:
[sources.github.extractor]
type = "webhook"
id = "github-events"
headers_to_keep = ["X-GitHub-Event"]OpenDAL Extractor ​
For file-based event sources:
[sources.files.extractor]
type = "opendal"
kind = "fs" # or "s3", "gcs", etc.
polling_interval = "30s"
path_patterns = ["**/*.json"]
parser = "json"
[sources.files.extractor.parameters]
root = "/events"SSE Extractor ​
For Server-Sent Events:
[sources.events.extractor]
type = "sse"
url = "https://events.example.com/stream"
max_retries = 10Shared Configuration ​
Header Configuration ​
Headers are used differently by various components:
- Header Validation: Validate incoming HTTP requests (Source webhook, Sink SSE)
- Header Authentication: Authenticate outgoing HTTP requests (Source SSE, Sink webhook)
→ Header Validation Guide - For incoming request validation → Header Authentication Guide - For outgoing request authentication
Loader ​
No configuration is required.
The loader is responsible for converting the Message into CDEvent. What is done is mainly:
- to compute a
context.idfrom the Message'sbodyifcontext.idis set to"0". Having acontext.idbased on content allow to identify / filter duplicates later (this filtering is not DONE by the collector). - to serialize the Message's body into a CDEvent.
- to push the CDEvent to the queue to be broadcasted to every Sinks.
Examples ​
Some examples come from the cdviz-collector repository (you can look at cdviz-collector.toml to see an up-to-date configuration)
Read a CDEvent from json files ​
Read from 1 folder, with 1 json file already in cdevents format.
[sources.cdevents_local_json]
enabled = false
[sources.cdevents_local_json.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "./source" }
recursive = true
path_patterns = ["**/*.json"]
parser = "json"As this source (with this name) is already part of the base configuration, You only need to copy (and rename) it or to enable it and override the parameters you want.
[sources.cdevents_local_json]
enabled = true
[sources.cdevents_local_json.extractor]
parameters = { root = "./inputs/cdevents_json" }Read a CSV file ​
Read a CSV file from local filesystem and convert each row into a CDEvents: 1 row/line -> 1 message -> 1 event
[sources.cdevents_local_csv]
enabled = true
transformer_refs = ["service_deployed"]
[sources.cdevents_local_csv.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "./inputs" }
recursive = false
path_patterns = ["cdevents.csv"]
parser = "csv_row"
[transformers.service_deployed]
type = "vrl"
template = """
[{
"metadata": .metadata,
"header": .header,
"body": {
"context": {
"version": "0.4.0-draft",
"id": "0",
"source": "/event/source/123",
"type": "dev.cdevents.service.deployed.0.1.1",
"timestamp": .body.timestamp,
},
"subject": {
"id": .body.id,
"source": "/event/source/123",
"type": "service",
"content": {
"environment": {
"id": .body.env,
},
"artifactId": .body.artifact_id,
}
}
}
}]
"""