NATS Source
Consumes events from NATS subjects. Supports NATS Core (pub/sub with queue groups) and NATS JetStream (persistent, replay-capable messaging).
Configuration
toml
[sources.nats_events.extractor]
type = "nats"
url = "nats://localhost:4222"
subject = "cdevents.>"Parameters
Required Parameters
url(string): NATS server URL (e.g.nats://localhost:4222,tls://secure-nats:4222)subject(string): NATS subject to subscribe to (supports wildcards:*for single token,>for multiple tokens)
Optional Parameters
queue_group(string): Queue group for load-balanced consumption across multiple collector instances (NATS Core)headers_to_keep(array of strings): NATS header names to preserve through the pipelinecredentials(string): Path to a NATS credentials file (.creds) for NKey/JWT authenticationtoken(string): Token for NATS token authenticationusername/password(string): User/password authenticationheaders(array): Header validation rules for incoming messagesmetadata(object): Static metadata for all events;context.sourceis auto-populated if unsetjetstream(object): JetStream consumer configuration (see JetStream)
JetStream Parameters
When jetstream is present, subscribes as a JetStream consumer:
jetstream.stream(string): JetStream stream namejetstream.durable(string): Durable consumer name (persists position across restarts)jetstream.deliver_policy(string):"all","last","new", or"by_start_sequence"(default:"new")jetstream.ack_policy(string):"explicit","none", or"all"(default:"explicit")
Examples
NATS Core consumer
toml
[sources.nats_events]
enabled = true
transformer_refs = ["process_events"]
[sources.nats_events.extractor]
type = "nats"
url = "nats://localhost:4222"
subject = "cdevents.>"
queue_group = "cdviz-collector"
headers_to_keep = ["content-type", "x-event-type"]JetStream consumer
toml
[sources.nats_persistent]
enabled = true
transformer_refs = ["process_events"]
[sources.nats_persistent.extractor]
type = "nats"
url = "nats://localhost:4222"
subject = "cdevents.>"
[sources.nats_persistent.extractor.jetstream]
stream = "CDEVENTS"
durable = "cdviz-collector"
deliver_policy = "new"
ack_policy = "explicit"Secure with NKey credentials
toml
[sources.secure_nats.extractor]
type = "nats"
url = "tls://secure-nats.company.com:4222"
subject = "events.>"
credentials = "/etc/nats/cdviz.creds"→ Complete Header Validation Guide