NATS Integration
Complete NATS reference and configuration.
A complete event-driven order processing system using FraiseQL’s NATS integration with JetStream persistence, CDC (Change Data Capture), and multiple microservices.
GraphQL Client | FraiseQL API | +------+------+ | |Orders DB NATS JetStream | | +------+------+ | +-----+-----+ | | | Inventory Payment Shipping Service Service Service | | | Inv DB Pay DB Ship DBIn fraiseql.toml, enable NATS as the observer backend:
[observers]enabled = truebackend = "nats"nats_url = "${NATS_URL}" # e.g. nats://nats-1:4222,nats://nats-2:4222
[[observers.handlers]]name = "order-pipeline"event = "order.created"action = "webhook"webhook_url = "${ORDER_PROCESSOR_URL}"Advanced JetStream settings (stream retention, consumers, DLQ) are configured in the observer runtime config file (fraiseql-observer.toml):
[transport]transport = "nats"
[transport.nats]url = "${NATS_URL}"stream_name = "fraiseql_events"subject_prefix = "fraiseql.mutation"consumer_name = "fraiseql_observer_worker"
[transport.nats.jetstream]max_age_days = 30max_bytes = 10_737_418_240 # 10 GBmax_msgs = 1_000_000max_deliver = 3ack_wait_secs = 30dedup_window_minutes = 5CREATE TABLE tb_order ( pk_order SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, customer_id UUID NOT NULL, total DECIMAL(10, 2) NOT NULL, status TEXT NOT NULL DEFAULT 'pending', created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now());
CREATE TABLE tb_order_item ( pk_order_item SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, fk_order INTEGER NOT NULL REFERENCES tb_order(pk_order), product_id UUID NOT NULL, quantity INT NOT NULL, unit_price DECIMAL(10, 2) NOT NULL);
-- Views with JSONB data columnCREATE VIEW v_order ASSELECT o.id, o.identifier, jsonb_build_object( 'id', o.id::text, 'identifier', o.identifier, 'customer_id', o.customer_id::text, 'total', o.total, 'status', o.status, 'created_at', o.created_at, 'updated_at', o.updated_at ) AS dataFROM tb_order o;import fraiseqlfrom fraiseql.scalars import IDfrom fraiseql.nats import publishfrom typing import Optional, Anyfrom decimal import Decimalfrom datetime import datetime
# ==================== TYPES ====================
@fraiseql.typeclass Order: id: ID customer_id: ID total: Decimal status: str # 'pending', 'confirmed', 'shipped', 'delivered', 'cancelled' created_at: datetime updated_at: datetime
@fraiseql.typeclass OrderItem: id: ID order_id: ID product_id: ID quantity: int unit_price: Decimal
@fraiseql.typeclass Reservation: id: ID order_id: ID product_id: ID quantity: int status: str created_at: datetime expires_at: Optional[datetime]
@fraiseql.typeclass Shipping: id: ID order_id: ID address: str status: str tracking_number: Optional[str] estimated_delivery: Optional[datetime]
@fraiseql.typeclass OrderEvent: """Base event structure for all order events.""" type: str timestamp: datetime order_id: ID data: Any
# ==================== MUTATIONS ====================
@fraiseql.mutation(sql_source="fn_create_order", operation="CREATE")async def create_order( customer_id: ID, items: list[dict], shipping_address: dict) -> Order: """Create order and publish event to NATS.""" pass
# Publish custom event after order creation@fraiseql.after_mutation("create_order")async def after_create_order(order: Order, context): """Publish order created event for other services.""" await publish( subject="fraiseql.order.created", data={ "type": "order.created", "timestamp": datetime.utcnow().isoformat(), "order_id": order.id, "customer_id": order.customer_id, "total": str(order.total), "data": { "customer_id": order.customer_id, "items": context.variables.get("items"), "shipping_address": context.variables.get("shipping_address") } } )
@fraiseql.mutation(sql_source="fn_update_order_status", operation="UPDATE")async def update_order_status(id: ID, status: str) -> Order: """Update order status and publish event.""" pass
@fraiseql.after_mutation("update_order_status")async def after_update_order_status(order: Order): """Publish status changed event.""" await publish( subject="fraiseql.order.status_changed", data={ "type": "order.status_changed", "timestamp": datetime.utcnow().isoformat(), "order_id": order.id, "new_status": order.status } )
# ==================== SUBSCRIPTIONS ====================
@fraiseql.subscription( entity_type="Order", topic="order_created", jetstream=True, replay=False # Only new orders)def order_created() -> Order: """Subscribe to newly created orders.""" pass
@fraiseql.subscription( entity_type="Order", topic="order_status_changed", jetstream=True, filter="new_status == 'shipped'")def order_shipped() -> Order: """Subscribe only to shipped orders.""" pass
@fraiseql.subscription( entity_type="Order", topic="order_status_changed", jetstream=True, replay=True, replay_from="2024-01-01T00:00:00Z")def order_status_history() -> Order: """Replay all status changes from a point in time.""" pass
# ==================== QUERIES ====================
@fraiseql.query(sql_source="v_order")def order(id: ID) -> Order: """Get order by ID.""" pass
@fraiseql.query(sql_source="v_order")def orders_by_status(status: str, limit: int = 100) -> list[Order]: """Get orders by status.""" passfrom fraiseql.nats import subscribe, publish, requestimport asyncio
@subscribe("fraiseql.order.created")async def handle_order_created(event: dict): """ Orchestrate order processing: 1. Request inventory reservation 2. Request shipping quote 3. Publish order.confirmed when ready """ order_id = event["data"]["order_id"] items = event["data"]["items"]
try: # Step 1: Request inventory service to reserve items reservation = await request( subject="inventory.reserve", data={"order_id": order_id, "items": items}, timeout=5000 )
if not reservation.get("success"): await publish( subject="fraiseql.order.failed", data={ "type": "order.reservation_failed", "timestamp": datetime.utcnow().isoformat(), "order_id": order_id, "reason": reservation.get("reason") } ) return
# Step 2: Request shipping service shipping = await request( subject="shipping.quote", data={ "order_id": order_id, "address": event["data"]["shipping_address"], "items": items }, timeout=5000 )
# Step 3: Publish confirmation await publish( subject="fraiseql.order.confirmed", data={ "type": "order.confirmed", "timestamp": datetime.utcnow().isoformat(), "order_id": order_id, "reservation_id": reservation.get("reservation_id"), "shipping_id": shipping.get("shipping_id"), "estimated_delivery": shipping.get("estimated_delivery") } )
except Exception as e: await publish( subject="fraiseql.order.error", data={ "type": "order.error", "timestamp": datetime.utcnow().isoformat(), "order_id": order_id, "error": str(e) } )from fraiseql.nats import service, subscribe, publish
@service("inventory.reserve")async def handle_inventory_reserve(data: dict) -> dict: """Handle inventory reservation request."""
order_id = data["order_id"] items = data["items"]
try: reservations = []
for item in items: product_id = item["product_id"] quantity = item["quantity"]
available = await ctx.db.query_one( """ SELECT pk_inventory, available FROM tb_inventory WHERE fk_product = (SELECT pk_product FROM tb_product WHERE id = $1) AND available >= $2 FOR UPDATE """, [product_id, quantity] )
if not available: return { "success": False, "reason": f"Insufficient inventory for {product_id}" }
reservations.append({ "product_id": product_id, "quantity": quantity })
reservation_id = await create_reservation_record(order_id, reservations)
await publish( subject="fraiseql.inventory.reserved", data={ "type": "inventory.reserved", "timestamp": datetime.utcnow().isoformat(), "order_id": order_id, "reservation_id": reservation_id } )
return { "success": True, "reservation_id": reservation_id, "items": reservations }
except Exception as e: return {"success": False, "reason": str(e)}from fraiseql.nats import subscribe
@subscribe("fraiseql.order.>", queue_group="analytics")async def process_order_event(event: dict): """ Process all order events for analytics. Queue group ensures exactly-once processing. """
event_type = event.get("type") order_id = event["data"].get("order_id")
if event_type == "order.created": await increment_metric("orders.created", 1) await increment_metric("orders.total_revenue", float(event["data"].get("total", 0)))
elif event_type == "order.confirmed": await increment_metric("orders.confirmed", 1)
elif event_type == "order.error": error = event["data"].get("error", "unknown") await increment_metric(f"orders.errors.{error}", 1) await increment_metric("orders.failed", 1){ "type": "order.created", "timestamp": "2024-01-15T10:30:00Z", "order_id": "550e8400-e29b-41d4-a716-446655440000", "customer_id": "client-123", "total": "1234.56", "data": { "customer_id": "client-123", "items": [ { "product_id": "prod-001", "quantity": 2, "unit_price": "99.99", "weight": 0.5 } ], "shipping_address": { "street": "123 Main St", "city": "San Francisco", "postal": "94102", "country": "USA" } }}{ "type": "order.status_changed", "timestamp": "2024-01-15T10:31:15Z", "order_id": "550e8400-e29b-41d4-a716-446655440000", "new_status": "confirmed", "previous_status": "pending", "data": { "reservation_id": "res-456", "shipping_id": "ship-789" }}from fraiseql.nats import subscribeimport hashlib
@subscribe("fraiseql.order.created")async def handle_order_created_idempotent(event: dict): """Process event idempotently using event ID."""
# Create deterministic event ID event_id = hashlib.sha256( f"{event['order_id']}-{event['timestamp']}".encode() ).hexdigest()
# Check if already processed if await redis.exists(f"event_processed:{event_id}"): return
try: await process_order(event) # Store with TTL of 30 days await redis.setex(f"event_processed:{event_id}", 30 * 24 * 3600, "true")
except Exception: raise # Let NATS retry (max_deliver = 3)from fraiseql.nats import subscribe
@subscribe("fraiseql.dlq")async def handle_dead_letter(event: dict): """Handle messages that failed 3 times.""" await log_dead_letter({ "timestamp": datetime.utcnow(), "event": event, "reason": "Max retries exceeded" })
await notify_ops(f"Dead letter: {event.get('order_id')}")
if should_manual_retry(event): await publish( subject="fraiseql.order.manual_review", data=event )# Check JetStream statusnats account info
# Monitor consumersnats consumer list orders
# Check consumer statenats consumer info orders order-processor
# View pending messagesnats consumer info orders order-processor --raw | jq '.state.pending'
# Check for slow consumersnats consumer report orders order-processorNATS Integration
Complete NATS reference and configuration.
Federation
Synchronous cross-database access.
Subscriptions
GraphQL subscriptions over WebSocket.
Error Handling
Handle failures and retries gracefully.