Skip to content

NATS Integration

FraiseQL integrates with NATS to publish database change events to a message bus, enabling event-driven architectures where downstream services react to data mutations in real time. This integration is configured through the observers system.

Before configuring NATS integration, you need:

  • A running NATS server (version 2.2 or later)
  • JetStream enabled on the server if you require durable, replayable streams
  • Network access from the machine running fraiseql to the NATS server
  1. Install and start the NATS server

    The quickest way to run NATS locally is with the official Docker image:

    Terminal window
    docker run -d --name nats \
    -p 4222:4222 \
    -p 8222:8222 \
    nats:latest -js

    The -js flag enables JetStream. Port 4222 is the client port; port 8222 exposes the HTTP monitoring endpoint.

  2. Configure observers to use NATS transport

    Add the NATS configuration to your fraiseql.toml under the [observers] section:

    In fraiseql.toml, set backend = "nats" and provide a connection URL:

    [observers]
    backend = "nats"
    nats_url = "${NATS_URL}" # e.g. nats://localhost:4222

    Advanced JetStream settings (stream name, consumer name, deduplication window, retention) go in the separate observer runtime config (fraiseql-observer.toml):

    [transport]
    transport = "nats"
    run_bridge = false
    run_executors = true
    [transport.nats]
    url = "${NATS_URL}"
    stream_name = "fraiseql_events"
    subject_prefix = "fraiseql.mutation"
    consumer_name = "fraiseql_observer_worker"
    [transport.nats.jetstream]
    dedup_window_minutes = 5
    max_age_days = 7
    max_msgs = 10_000_000
    max_bytes = 10_737_418_240 # 10 GB
    ack_wait_secs = 30
    max_deliver = 3
  3. Start FraiseQL

    Terminal window
    fraiseql run

    On startup, FraiseQL connects to NATS and logs a confirmation:

    [observers] transport: nats (nats://localhost:4222)
    [observers] JetStream enabled: true
    [observers] stream: fraiseql_events

The observers system supports three transport backends:

TransportUse CaseConfiguration
postgresDefault, single-instancePostgreSQL LISTEN/NOTIFY
natsMulti-instance, distributedNATS JetStream
in_memoryTesting onlyIn-memory event bus

NATS Transport Settings (fraiseql-observer.toml)

Section titled “NATS Transport Settings (fraiseql-observer.toml)”

The detailed NATS transport settings belong in fraiseql-observer.toml, the observer runtime config. This is a separate file from fraiseql.toml.

[transport.nats]
url = "nats://localhost:4222" # Supports multiple: "nats://n1:4222,nats://n2:4222"
subject_prefix = "fraiseql.mutation"
consumer_name = "fraiseql_observer_worker"
stream_name = "fraiseql_events"
KeyTypeDefaultDescription
urlstringnats://localhost:4222NATS server URL(s)
subject_prefixstringfraiseql.mutationPrefix for published subjects
consumer_namestringfraiseql_observer_workerDurable consumer name
stream_namestringfraiseql_eventsJetStream stream name

JetStream Settings (fraiseql-observer.toml)

Section titled “JetStream Settings (fraiseql-observer.toml)”
[transport.nats.jetstream]
dedup_window_minutes = 5 # Message deduplication window
max_age_days = 7 # Maximum message retention
max_msgs = 10000000 # Maximum messages in stream
max_bytes = 10737418240 # Maximum stream size (10 GB)
ack_wait_secs = 30 # Acknowledgment timeout
max_deliver = 3 # Maximum delivery attempts

When NATS is configured as the observer transport, FraiseQL automatically publishes structured JSON messages to NATS subjects whenever an observer-triggering event occurs.

The NATS subject follows the pattern:

{subject_prefix}.{entity_type}.{operation}
# e.g., fraiseql.mutation.Order.INSERT

Entity type and operation are not lowercased. The operation is one of INSERT, UPDATE, DELETE, or CUSTOM in uppercase.

Every NATS message follows this JSON structure:

{
"event": "INSERT",
"entity": "Order",
"subject": "fraiseql.mutation.Order.INSERT",
"timestamp": "2024-01-15T10:30:00Z",
"data": {
"id": "ord_abc123",
"total": 149.99,
"status": "pending",
"customer_id": "cus_xyz789"
}
}
FieldDescription
eventThe database operation: INSERT, UPDATE, or DELETE
entityThe schema type that triggered the change
subjectThe full NATS subject the message was published to
timestampISO 8601 UTC timestamp of the database event
dataThe full row payload from the triggering operation

Downstream services subscribe to NATS subjects independently of FraiseQL using any NATS client library.

import asyncio
import json
import nats
async def main():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
async def order_handler(msg):
payload = json.loads(msg.data.decode())
order_id = payload["data"]["id"]
print(f"New order received: {order_id}")
await msg.ack()
# Durable consumer — survives service restarts
await js.subscribe(
"fraiseql.mutation.Order.INSERT",
durable="order-processor",
cb=order_handler,
)
await asyncio.Event().wait()
asyncio.run(main())

Core NATS (without JetStream) provides at-most-once delivery. JetStream adds persistence, acknowledgement, and replay. Enable it for any production workload where losing events is unacceptable.

The JetStream stream is automatically created on startup with these defaults:

  • Stream name: fraiseql_events (configurable via stream_name)
  • Subjects: Matches the configured subject_prefix with wildcard
  • Retention: Limits-based (discards oldest when max_msgs or max_bytes exceeded)
  • Acknowledgment: Required (messages redelivered if not acknowledged)

Messages that exceed the maximum delivery attempt count are dropped. Monitor consumer lag via NATS monitoring:

Terminal window
nats consumer info fraiseql_events order-processor

JetStream handles retries automatically based on ack_wait_secs and max_deliver:

AttemptTiming
1Immediate delivery
2After ack_wait_secs if not acknowledged
3After another ack_wait_secs
FailureMessage dropped after max_deliver attempts

If your NATS server enforces authorization, grant FraiseQL publish and subscribe access to its subjects. The following snippet is an excerpt from a NATS server configuration file:

accounts {
fraiseql_account {
users = [
{
user: fraiseql
permissions: {
publish: ["fraiseql.mutation.>"]
subscribe: ["fraiseql.mutation.>", "_INBOX.>"]
}
}
]
}
}

_INBOX.> is required for request-reply patterns used internally by the NATS client.

For capturing existing PostgreSQL changes, you can run a bridge that forwards PostgreSQL LISTEN/NOTIFY events to NATS:

In fraiseql.toml:

[observers]
backend = "nats"
nats_url = "${NATS_URL}"

In fraiseql-observer.toml:

[transport]
transport = "nats"
run_bridge = true
run_executors = false # Bridge process only
[transport.bridge]
transport_name = "pg_to_nats"
batch_size = 100
poll_interval_secs = 1
notify_channel = "fraiseql_events"

The bridge polls the PostgreSQL change log and publishes to NATS, enabling gradual migration from LISTEN/NOTIFY to NATS without application changes.

FraiseQL exposes the following Prometheus metrics for the observer system, including the NATS transport:

MetricTypeDescription
fraiseql_observer_events_processed_totalCounterTotal events processed by the observer
fraiseql_observer_events_failed_total{error_type}CounterTotal events that failed processing
fraiseql_observer_action_executed_total{action_type}CounterTotal actions executed (use action_type="webhook" to filter)
fraiseql_observer_action_duration_seconds{action_type}HistogramAction execution duration in seconds
fraiseql_observer_backlog_sizeGaugeCurrent number of events in the processing queue
fraiseql_observer_dlq_itemsGaugeCurrent number of items in the dead letter queue

Check NATS connection status via the health endpoint:

Terminal window
curl http://localhost:8080/health

Expected response:

{
"status": "ok",
"databases": { "primary": { "status": "ok" } },
"observers": {
"transport": "nats",
"connected": true,
"stream": "fraiseql_events"
}
}

Keep payloads small. Include only the fields consumers need. Large payloads increase storage and network cost:

# Preferred: reference data, let consumers fetch details if needed
{"order_id": "ord_123", "customer_id": "cus_456"}
# Avoid: embedding full nested objects
{"order": full_order_with_all_fields, "customer": full_customer_object}

Design handlers to be idempotent. JetStream guarantees at-least-once delivery. A message may be redelivered after a crash or network partition. Guard against duplicate processing:

@subscribe("fraiseql.mutation.Order.INSERT")
async def process_order(event):
order_id = event.data["id"]
if await is_already_processed(order_id):
return
await do_process_order(order_id)
await mark_processed(order_id)

Use durable consumers in production. Ephemeral consumers lose their position when the subscriber disconnects. Durable consumers resume from where they left off.

Monitor consumer lag. If your consumer falls behind, increase the number of consumer instances or optimize processing time:

Terminal window
nats consumer info fraiseql_events order-processor

“NATS transport requires nats.url to be set”

Add the [transport.nats] section with a valid URL in fraiseql-observer.toml:

[transport.nats]
url = "nats://localhost:4222"

“run_bridge=true requires transport=nats”

The PostgreSQL to NATS bridge only works with NATS transport. Ensure fraiseql.toml sets NATS as the backend, and fraiseql-observer.toml sets run_bridge = true:

fraiseql.toml
[observers]
backend = "nats"
nats_url = "${NATS_URL}"
# fraiseql-observer.toml
[transport]
transport = "nats"
run_bridge = true

Messages not being delivered

  1. Verify NATS is running: nats server check
  2. Check JetStream is enabled: nats stream info fraiseql_events
  3. Confirm the consumer exists: nats consumer info fraiseql_events <consumer_name>
  4. Check FraiseQL logs for connection errors

Observers

Observers — Define the database change listeners that trigger NATS events

Subscriptions

Subscriptions — Push real-time updates to browser clients over WebSocket

Deployment

Deployment — Configure secrets and environment variables for production