Skip to content

Sources

Sources collect events from external systems and feed them into the CDviz pipeline.

inside a source

Quick Reference

toml
[sources.my_source]
enabled = true
transformer_refs = ["my_transformer"]

[sources.my_source.extractor]
type = "webhook"  # webhook | opendal | sse | kafka | nats | noop
# ... extractor-specific parameters

The opendal extractor supports parsers for different file formats (json, jsonl, csv, xml, tap, text, etc.). Other extractors parse their native protocol format directly.

→ Parsers Documentation

Messages

A Message is composed of:

  • metadata: a Map<String, JSON> - Includes base extractor metadata with automatic context.source population
  • headers: a Map<String, String>
  • body: a JSON like structure, also named payload

Extractor Metadata Configuration

All extractors accept a metadata field to inject static key/value pairs into every event — useful for tagging events with environment, team, or region without a custom transformer.

If metadata.context.source is not set, it defaults to {http.root_url}/?source={source_name}.

toml
[sources.my_webhook.extractor]
type = "webhook"
id = "events"
metadata.environment = "production"
metadata.team = "platform"
# metadata.context.source = "/my-custom-source"  # override auto-generated URL

Available Extractors

TypeDescriptionUse Cases
noopNo-operation extractor for testingConfiguration testing, pipeline validation
webhookHTTP webhook endpointsCI/CD systems, GitHub/GitLab webhooks, API integrations
opendalFile system and cloud storageLog files, artifact monitoring, batch processing
sseServer-Sent Events clientReal-time event streams, SSE endpoints
kafkaApache Kafka consumerEvent streaming, message queues, Kafka-compatible brokers
natsNATS Core / JetStream consumerCloud-native messaging, lightweight pub/sub, JetStream

Shared Configuration

Header Configuration

Headers are used differently by various components:

  • Header Validation: Validate incoming HTTP requests (Source webhook, Sink SSE)
  • Header Authentication: Authenticate outgoing HTTP requests (Source SSE, Sink webhook)

→ Header Validation Guide - For incoming request validation → Header Authentication Guide - For outgoing request authentication

Loader

No configuration required. The loader converts each Message into a CDEvent:

  • Computes a content-based context.id (CID) when context.id is "0" or absent — enables downstream deduplication.
  • Serializes the message body as a CDEvent.
  • Pushes the CDEvent to all configured Sinks.

Examples

Some examples come from the cdviz-collector repository (you can look at cdviz-collector.toml to see an up-to-date configuration)

Read a CDEvent from json files

Read from 1 folder, with 1 json file already in cdevents format.

toml
[sources.cdevents_local_json]
enabled = false

[sources.cdevents_local_json.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "./source" }
recursive = true
path_patterns = ["**/*.json"]
parser = "json"

As this source (with this name) is already part of the base configuration, You only need to copy (and rename) it or to enable it and override the parameters you want.

toml
[sources.cdevents_local_json]
enabled = true

[sources.cdevents_local_json.extractor]
parameters = { root = "./inputs/cdevents_json" }

Read a CSV file

Read a CSV file from local filesystem and convert each row into a CDEvents: 1 row/line -> 1 message -> 1 event

toml
[sources.cdevents_local_csv]
enabled = true
transformer_refs = ["service_deployed"]

[sources.cdevents_local_csv.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "./inputs" }
recursive = false
path_patterns = ["cdevents.csv"]
parser = "csv_row"

[transformers.service_deployed]
type = "vrl"
template = """
[{
    "metadata": .metadata,
    "headers": .headers,
    "body": {
        "context": {
            "version": "0.4.1",
            "type": "dev.cdevents.service.deployed.0.1.1",
            "timestamp": .body.timestamp,
        },
        "subject": {
            "id": .body.id,
            "type": "service",
            "content": {
                "environment": {
                    "id": .body.env,
                },
                "artifactId": .body.artifact_id,
            }
        }
    }
}]
"""