Skip to content

Sources ​

Sources collect events from external systems and feed them into the CDviz pipeline.

inside a source

Quick Reference ​

toml
[sources.my_source]
enabled = true
transformer_refs = ["my_transformer"]  # Optional processing

[sources.my_source.extractor]
type = "webhook"  # or "opendal", "sse", "noop"
# ... extractor-specific parameters

Common Parameters:

  • enabled - Enable/disable the source
  • extractor - How to collect events (required)
  • transformer_refs - Optional event processing chain
toml
[sources.aaaa]
enabled = false
transformer_refs = ["bbbb", "log"]

[sources.aaaa.extractor]
type = "xzy"
parser = "json"
# ... parameters for `xzy` extractor

[transformers.bbbb]
type = "abc"
# ... parameters for `abc` transformer

Messages ​

A Message is composed of:

  • metadata: a Map<String, JSON> - Includes base extractor metadata with automatic context.source population
  • headers: a Map<String, String>
  • body: a JSON like structure, also named payload

Extractor Metadata Configuration ​

All extractors support a metadata field for injecting static metadata into every event. This is useful for adding context information without creating custom transformers.

Automatic context.source Population ​

If metadata.context.source is not explicitly set, it will be automatically populated using the pattern:

{http.root_url}/?source={source_name}

Where:

  • {http.root_url} is configured in [http] section (default: http://cdviz-collector.example.com)
  • {source_name} is the configuration key for the source (e.g., github_webhook, file_source)

Example: Custom Metadata ​

toml
[http]
root_url = "https://cdviz.example.com"

[sources.my_webhook]
enabled = true

[sources.my_webhook.extractor]
type = "webhook"
id = "custom-events"

# Custom metadata injected into all events
[sources.my_webhook.extractor.metadata]
environment = "production"
team = "platform"

[sources.my_webhook.extractor.metadata.context]
source = "/my-custom-source"  # Override automatic source URL

Example: Automatic Source URL ​

toml
[http]
root_url = "https://cdviz.example.com"

[sources.github_webhook]
enabled = true

[sources.github_webhook.extractor]
type = "webhook"
id = "github"

# No metadata.context.source specified
# Will automatically be set to: https://cdviz.example.com/?source=github_webhook

Use Cases ​

Avoid Creating Transformers for Static Metadata: Instead of creating a VRL transformer just to add static fields, use the metadata configuration:

toml
# Before: Required a transformer to add static fields
[sources.my_webhook]
transformer_refs = ["add_metadata", "my_transform"]

# After: Use extractor metadata directly
[sources.my_webhook]
transformer_refs = ["my_transform"]

[sources.my_webhook.extractor]
type = "webhook"
id = "events"

[sources.my_webhook.extractor.metadata]
datacenter = "us-east-1"
version = "v2"

Environment-Specific Configuration: Tag events with environment information:

toml
[sources.production_webhook.extractor]
type = "webhook"
id = "prod"
metadata.environment_id = "/production/eu-1"
metadata.criticality = "high"

Extractors ​

CDviz Collector supports several types of extractors for different event sources. Each extractor type has its own configuration options and use cases.

Available Extractors ​

TypeDescriptionUse Cases
noopNo-operation extractor for testingConfiguration testing, pipeline validation
webhookHTTP webhook endpointsCI/CD systems, GitHub/GitLab webhooks, API integrations
opendalFile system and cloud storageLog files, artifact monitoring, batch processing
sseServer-Sent Events clientReal-time event streams, SSE endpoints

Quick Reference ​

Noop/Sleep Extractor ​

For testing and development:

toml
[sources.test.extractor]
type = "noop"  # or "sleep"

→ Full Documentation

Webhook Extractor ​

For HTTP-based event ingestion:

toml
[sources.github.extractor]
type = "webhook"
id = "github-events"
headers_to_keep = ["X-GitHub-Event"]

→ Full Documentation

OpenDAL Extractor ​

For file-based event sources:

toml
[sources.files.extractor]
type = "opendal"
kind = "fs"  # or "s3", "gcs", etc.
polling_interval = "30s"
path_patterns = ["**/*.json"]
parser = "json"

[sources.files.extractor.parameters]
root = "/events"

→ Full Documentation

SSE Extractor ​

For Server-Sent Events:

toml
[sources.events.extractor]
type = "sse"
url = "https://events.example.com/stream"
max_retries = 10

→ Full Documentation

Shared Configuration ​

Header Configuration ​

Headers are used differently by various components:

  • Header Validation: Validate incoming HTTP requests (Source webhook, Sink SSE)
  • Header Authentication: Authenticate outgoing HTTP requests (Source SSE, Sink webhook)

→ Header Validation Guide - For incoming request validation → Header Authentication Guide - For outgoing request authentication

Loader ​

No configuration is required.

The loader is responsible for converting the Message into CDEvent. What is done is mainly:

  • to compute a context.id from the Message's body if context.id is set to "0". Having a context.id based on content allow to identify / filter duplicates later (this filtering is not DONE by the collector).
  • to serialize the Message's body into a CDEvent.
  • to push the CDEvent to the queue to be broadcasted to every Sinks.

Examples ​

Some examples come from the cdviz-collector repository (you can look at cdviz-collector.toml to see an up-to-date configuration)

Read a CDEvent from json files ​

Read from 1 folder, with 1 json file already in cdevents format.

toml
[sources.cdevents_local_json]
enabled = false

[sources.cdevents_local_json.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "./source" }
recursive = true
path_patterns = ["**/*.json"]
parser = "json"

As this source (with this name) is already part of the base configuration, You only need to copy (and rename) it or to enable it and override the parameters you want.

toml
[sources.cdevents_local_json]
enabled = true

[sources.cdevents_local_json.extractor]
parameters = { root = "./inputs/cdevents_json" }

Read a CSV file ​

Read a CSV file from local filesystem and convert each row into a CDEvents: 1 row/line -> 1 message -> 1 event

toml
[sources.cdevents_local_csv]
enabled = true
transformer_refs = ["service_deployed"]

[sources.cdevents_local_csv.extractor]
type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "./inputs" }
recursive = false
path_patterns = ["cdevents.csv"]
parser = "csv_row"

[transformers.service_deployed]
type = "vrl"
template = """
[{
    "metadata": .metadata,
    "header": .header,
    "body": {
        "context": {
            "version": "0.4.0-draft",
            "id": "0",
            "source": "/event/source/123",
            "type": "dev.cdevents.service.deployed.0.1.1",
            "timestamp": .body.timestamp,
        },
        "subject": {
            "id": .body.id,
            "source": "/event/source/123",
            "type": "service",
            "content": {
                "environment": {
                    "id": .body.env,
                },
                "artifactId": .body.artifact_id,
            }
        }
    }
}]
"""