Kafka Source
Consumes events from Kafka topics. Supports Apache Kafka, Confluent Kafka, Redpanda, Amazon MSK, and other Kafka-compatible brokers.
Configuration
toml
[sources.kafka_events.extractor]
type = "kafka"
brokers = "localhost:9092"
topics = ["cdevents", "alerts"]
group_id = "cdviz-collector"Parameters
Required Parameters
brokers(string): Kafka broker addresses (comma-separated)topics(array of strings): Topics to consume fromgroup_id(string): Consumer group ID
Optional Parameters
headers_to_keep(array of strings): Kafka header names to preserve through the pipelinepoll_timeout(duration): Polling timeout (default:1s)auto_commit(boolean): Commit offsets automatically (default:true)rdkafka_config(object): Additional rdkafka consumer configurationheaders(array): Header validation rules for incoming messagesmetadata(object): Static metadata for all events;context.sourceis auto-populated if unset
Security
toml
[sources.secure_kafka.extractor]
type = "kafka"
brokers = "secure-kafka.company.com:9093"
topics = ["events"]
group_id = "cdviz-secure"
[sources.secure_kafka.extractor.rdkafka_config]
"security.protocol" = "SASL_SSL"
"sasl.mechanisms" = "PLAIN"
"sasl.username" = "cdviz-user"
"sasl.password" = "secure-password"
"ssl.ca.location" = "/path/to/ca-cert.pem"→ Complete Header Validation Guide
Example
toml
[sources.kafka_events]
enabled = true
transformer_refs = ["process_events"]
[sources.kafka_events.extractor]
type = "kafka"
brokers = "localhost:9092"
topics = ["cdevents"]
group_id = "cdviz-collector"
headers_to_keep = ["content-type", "x-event-type"]