Advanced Federation
Cross-service queries and data composition.
A complete microservices architecture using event-driven choreography with separate FraiseQL services and NATS for distributed order processing.
sequenceDiagram participant C as Client participant OS as Order Service (FraiseQL) participant N as NATS participant INV as Inventory Worker (nats.py) participant PAY as Payment Worker (nats.py) participant SHP as Shipping Worker (nats.py)
C->>OS: createOrder(items, total) OS->>OS: fn_create_order() → status "pending" OS->>N: events.order.created (via pg_notify → observer) OS-->>C: Order{status: "pending"}
N->>INV: events.order.created INV->>INV: reserve stock in tb_inventory INV->>N: events.inventory.reserved
N->>PAY: events.inventory.reserved PAY->>PAY: charge card, insert tb_payment PAY->>N: events.payment.charged
N->>OS: events.payment.charged OS->>OS: fn_update_order_status() → "paid"
N->>SHP: events.payment.charged SHP->>SHP: create label, insert tb_shipping SHP->>N: events.order.shipped
N->>OS: events.order.shipped OS->>OS: fn_update_order_status() → "shipped"version: '3.9'services: postgres: image: postgres:16-alpine environment: POSTGRES_DB: orders POSTGRES_USER: fraiseql POSTGRES_PASSWORD: password ports: - "5432:5432"
nats: image: nats:latest command: ["-js"] ports: - "4222:4222"
order-service: build: ./order-service depends_on: [postgres, nats] environment: DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders NATS_URL: nats://nats:4222
inventory-worker: build: ./inventory-worker depends_on: [postgres, nats] environment: DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders NATS_URL: nats://nats:4222
payment-worker: build: ./payment-worker depends_on: [postgres, nats] environment: DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders NATS_URL: nats://nats:4222
shipping-worker: build: ./shipping-worker depends_on: [postgres, nats] environment: DATABASE_URL: postgresql://fraiseql:password@postgres:5432/orders NATS_URL: nats://nats:4222Start all services with:
docker-compose up -dCREATE TABLE tb_order ( pk_order BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL, identifier TEXT UNIQUE NOT NULL, fk_user BIGINT NOT NULL REFERENCES tb_user(pk_user), total NUMERIC(10, 2) NOT NULL, status TEXT NOT NULL DEFAULT 'pending', created_at TIMESTAMPTZ NOT NULL DEFAULT now());
CREATE UNIQUE INDEX idx_tb_order_id ON tb_order(id);CREATE INDEX idx_tb_order_fk_user ON tb_order(fk_user);CREATE INDEX idx_tb_order_status ON tb_order(status);
CREATE TABLE tb_order_item ( pk_order_item BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL, identifier TEXT UNIQUE NOT NULL, fk_order BIGINT NOT NULL REFERENCES tb_order(pk_order), fk_product BIGINT NOT NULL REFERENCES tb_product(pk_product), quantity INTEGER NOT NULL, unit_price NUMERIC(10, 2) NOT NULL);
CREATE INDEX idx_tb_order_item_fk_order ON tb_order_item(fk_order);
CREATE TABLE tb_event_log ( pk_event_log BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL, identifier TEXT UNIQUE NOT NULL, event_type TEXT NOT NULL, fk_order BIGINT REFERENCES tb_order(pk_order), data JSONB NOT NULL DEFAULT '{}', recorded_at TIMESTAMPTZ NOT NULL DEFAULT now());
CREATE INDEX idx_tb_event_log_fk_order ON tb_event_log(fk_order);
-- Read view: order statusCREATE VIEW v_order ASSELECT o.id, jsonb_build_object( 'id', o.id::text, 'identifier', o.identifier, 'total', o.total, 'status', o.status, 'created_at', o.created_at ) AS dataFROM tb_order o;
-- Mutation function: create orderCREATE FUNCTION fn_create_order( p_identifier TEXT, p_fk_user BIGINT, p_total NUMERIC) RETURNS mutation_responseLANGUAGE plpgsql AS $$DECLARE v_id UUID; v_result mutation_response;BEGIN INSERT INTO tb_order (identifier, fk_user, total, status) VALUES (p_identifier, p_fk_user, p_total, 'pending') RETURNING id INTO v_id;
v_result.status := 'success'; v_result.entity_id := v_id; v_result.entity_type := 'Order'; RETURN v_result;END;$$;
-- Mutation function: update order status (called by NATS observer via SQL)CREATE FUNCTION fn_update_order_status( p_order_id UUID, p_status TEXT) RETURNS mutation_responseLANGUAGE plpgsql AS $$DECLARE v_result mutation_response;BEGIN UPDATE tb_order SET status = p_status WHERE id = p_order_id; v_result.status := 'success'; v_result.entity_type := 'Order'; RETURN v_result;END;$$;import fraiseqlfrom fraiseql.scalars import ID, DateTimefrom decimal import Decimal
@fraiseql.typeclass Order: """A customer order.""" id: ID identifier: str total: Decimal status: str created_at: DateTime
@fraiseql.inputclass CreateOrderInput: identifier: str total: Decimal
@fraiseql.querydef order(id: ID) -> Order | None: """Get a single order by UUID.""" return fraiseql.config(sql_source="v_order")
@fraiseql.querydef orders(limit: int = 20, offset: int = 0) -> list[Order]: """List orders. RLS ensures users see only their own.""" return fraiseql.config(sql_source="v_order")
@fraiseql.mutation(sql_source="fn_create_order", operation="CREATE")def create_order(input: CreateOrderInput) -> Order: """Create a new order. Rust runtime calls fn_create_order(), pg_notify fires, observer publishes to NATS.""" pass
@fraiseql.subscription(entity_type="Order", topic="events.order.status_updated")def order_status_updated(order_id: ID | None = None) -> Order: """Subscribe to order status changes in real time.""" pass
fraiseql.export_schema("schema.json")[project]name = "order-service"version = "1.0.0"
[database]url = "${DATABASE_URL}"
[fraiseql]schema_file = "schema.json"output_file = "schema.compiled.json"
# JWT/OIDC config via environment variables (no [auth] TOML section):# OIDC_ISSUER_URL, OIDC_CLIENT_ID, OIDC_CLIENT_SECRET
# NATS observer: when tb_order changes, publish to events.order.created# The Rust runtime receives pg_notify from PostgreSQL and forwards to NATS.[[observers]]table = "tb_order"event = "INSERT"subject = "events.order.created"
[[observers]]table = "tb_order"event = "UPDATE"subject = "events.order.status_updated"
[observers]backend = "nats"nats_url = "${NATS_URL}"
[security.enterprise]enabled = truelog_level = "info""""Inventory worker: subscribes to NATS events using nats.py.This is a standalone service, not a FraiseQL schema file."""import asyncioimport jsonimport asyncpgimport natsfrom nats.js import JetStreamContext
async def on_order_created(msg): """ React to order creation. Reserve inventory or emit failure event. """ nc = msg._client # nats.py connection reference js: JetStreamContext = nc.jetstream() data = json.loads(msg.data) order_id = data["order_id"] items = data["items"]
conn = await asyncpg.connect(POSTGRES_URL) try: for item in items: row = await conn.fetchrow( """ SELECT pk_inventory, quantity, reserved FROM tb_inventory WHERE fk_product = ( SELECT pk_product FROM tb_product WHERE identifier = $1 ) """, item["sku"], ) if row is None or (row["quantity"] - row["reserved"]) < item["quantity"]: await js.publish( "events.order.failed", json.dumps({ "order_id": order_id, "reason": "inventory_unavailable", "sku": item["sku"], }).encode(), ) await msg.nak() return
await conn.execute( "UPDATE tb_inventory SET reserved = reserved + $1 WHERE pk_inventory = $2", item["quantity"], row["pk_inventory"], )
await js.publish( "events.inventory.reserved", json.dumps({"order_id": order_id}).encode(), ) await msg.ack() except Exception as exc: await js.publish( "events.order.failed", json.dumps({"order_id": order_id, "reason": str(exc)}).encode(), ) await msg.nak(delay=5) finally: await conn.close()
async def main(): nc = await nats.connect(NATS_URL) js = nc.jetstream()
await js.subscribe( "events.order.created", durable="inventory_processors", cb=on_order_created, ) print("Inventory worker listening on events.order.created") await asyncio.sleep(float("inf"))
if __name__ == "__main__": import os NATS_URL = os.environ["NATS_URL"] POSTGRES_URL = os.environ["DATABASE_URL"] asyncio.run(main())"""Payment worker: subscribes to NATS events using nats.py.This is a standalone service, not a FraiseQL schema file."""import asyncioimport jsonimport asyncpgimport nats
async def on_inventory_reserved(msg): """ Process payment after inventory is reserved. Risk: inventory reserved but payment fails — must compensate. """ nc = msg._client js = nc.jetstream() data = json.loads(msg.data) order_id = data["order_id"]
conn = await asyncpg.connect(POSTGRES_URL) try: order = await conn.fetchrow( "SELECT pk_order, total FROM tb_order WHERE id = $1::uuid", order_id, ) if order is None: await msg.ack() return
# Charge the card via external payment provider payment = await charge_credit_card( order_pk=order["pk_order"], amount=order["total"], )
await conn.execute( """ INSERT INTO tb_payment (identifier, fk_order, amount, status, provider_id) VALUES (gen_random_uuid()::text, $1, $2, 'succeeded', $3) """, order["pk_order"], order["total"], payment.id, )
await js.publish( "events.payment.charged", json.dumps({ "order_id": order_id, "payment_id": payment.id, "amount": float(order["total"]), }).encode(), ) await msg.ack()
except Exception as exc: # Payment failed — emit compensation event so inventory releases await js.publish( "events.payment.failed", json.dumps({ "order_id": order_id, "reason": str(exc), }).encode(), ) await msg.nak(delay=10) finally: await conn.close()
async def on_payment_failed(msg): """Release reserved inventory when payment fails.""" data = json.loads(msg.data) order_id = data["order_id"]
conn = await asyncpg.connect(POSTGRES_URL) try: items = await conn.fetch( """ SELECT oi.fk_product, oi.quantity FROM tb_order_item oi JOIN tb_order o ON o.pk_order = oi.fk_order WHERE o.id = $1::uuid """, order_id, ) for item in items: await conn.execute( "UPDATE tb_inventory SET reserved = reserved - $1 WHERE fk_product = $2", item["quantity"], item["fk_product"], )
js = msg._client.jetstream() await js.publish( "events.order.failed", json.dumps({ "order_id": order_id, "reason": "payment_failed", "compensated": True, }).encode(), ) await msg.ack() finally: await conn.close()
async def main(): nc = await nats.connect(NATS_URL) js = nc.jetstream()
await js.subscribe( "events.inventory.reserved", durable="payment_processors", cb=on_inventory_reserved, ) await js.subscribe( "events.payment.failed", durable="compensation_handlers", cb=on_payment_failed, ) print("Payment worker listening") await asyncio.sleep(float("inf"))
if __name__ == "__main__": import os NATS_URL = os.environ["NATS_URL"] POSTGRES_URL = os.environ["DATABASE_URL"] asyncio.run(main())"""Shipping worker: subscribes to NATS events using nats.py.This is a standalone service, not a FraiseQL schema file."""import asyncioimport jsonimport asyncpgimport nats
async def on_payment_charged(msg): """ Create shipping label after payment succeeds. Last step in the choreography. """ nc = msg._client js = nc.jetstream() data = json.loads(msg.data) order_id = data["order_id"]
conn = await asyncpg.connect(POSTGRES_URL) try: order = await conn.fetchrow( "SELECT pk_order, shipping_address FROM tb_order WHERE id = $1::uuid", order_id, ) if order is None: await msg.ack() return
label = await create_shipping_label( order_pk=order["pk_order"], address=order["shipping_address"], )
await conn.execute( """ INSERT INTO tb_shipping (identifier, fk_order, provider_label_id, carrier, tracking_number) VALUES (gen_random_uuid()::text, $1, $2, $3, $4) """, order["pk_order"], label.id, label.carrier, label.tracking_number, )
await js.publish( "events.order.shipped", json.dumps({ "order_id": order_id, "tracking_number": label.tracking_number, }).encode(), ) await msg.ack()
except Exception as exc: # Log for manual intervention — shipping is the last step print(f"[shipping-worker] ERROR order={order_id}: {exc}") await msg.nak(delay=60) finally: await conn.close()
async def main(): nc = await nats.connect(NATS_URL) js = nc.jetstream()
await js.subscribe( "events.payment.charged", durable="shipping_handlers", cb=on_payment_charged, ) print("Shipping worker listening on events.payment.charged") await asyncio.sleep(float("inf"))
if __name__ == "__main__": import os NATS_URL = os.environ["NATS_URL"] POSTGRES_URL = os.environ["DATABASE_URL"] asyncio.run(main())"""Event monitor: logs all events to tb_event_log.Standalone nats.py service — not a FraiseQL schema file."""import asyncioimport jsonimport asyncpgimport nats
async def monitor_events(msg): """Record all events in tb_event_log for audit trail.""" event_type = msg.subject data = json.loads(msg.data) order_id = data.get("order_id")
conn = await asyncpg.connect(POSTGRES_URL) try: # Look up pk_order for the FK order_pk = None if order_id: row = await conn.fetchrow( "SELECT pk_order FROM tb_order WHERE id = $1::uuid", order_id, ) if row: order_pk = row["pk_order"]
await conn.execute( """ INSERT INTO tb_event_log (identifier, event_type, fk_order, data) VALUES (gen_random_uuid()::text, $1, $2, $3::jsonb) """, event_type, order_pk, json.dumps(data), )
if "failed" in event_type: await send_alert({"severity": "warning", "event": event_type, "details": data})
await msg.ack() finally: await conn.close()
async def main(): nc = await nats.connect(NATS_URL) js = nc.jetstream()
await js.subscribe("events.>", durable="event_monitoring", cb=monitor_events) print("Monitoring worker listening on events.>") await asyncio.sleep(float("inf"))
if __name__ == "__main__": import os NATS_URL = os.environ["NATS_URL"] POSTGRES_URL = os.environ["DATABASE_URL"] asyncio.run(main())import asyncioimport pytestimport httpx
@pytest.mark.asyncioasync def test_full_order_choreography(): """Test complete happy path through all services via GraphQL API.""" async with httpx.AsyncClient(base_url="http://localhost:8080") as client: # Create order via FraiseQL GraphQL API response = await client.post( "/graphql", json={ "query": """ mutation { createOrder(input: { identifier: "ord-test-1", total: "49.99" }) { id status } } """ }, headers={"Authorization": f"Bearer {TEST_TOKEN}"}, ) data = response.json() order_id = data["data"]["createOrder"]["id"] assert data["data"]["createOrder"]["status"] == "pending"
# Give all workers time to process the choreography await asyncio.sleep(3.0)
async with httpx.AsyncClient(base_url="http://localhost:8080") as client: response = await client.post( "/graphql", json={"query": f'query {{ order(id: "{order_id}") {{ id status }} }}'}, headers={"Authorization": f"Bearer {TEST_TOKEN}"}, ) final = response.json() assert final["data"]["order"]["status"] == "shipped"
@pytest.mark.asyncioasync def test_payment_failure_compensation(): """Test inventory release when payment fails.""" # Use a known-bad order identifier to trigger payment failure async with httpx.AsyncClient(base_url="http://localhost:8080") as client: response = await client.post( "/graphql", json={ "query": """ mutation { createOrder(input: { identifier: "ord-bad-card-1", total: "49.99" }) { id } } """ }, headers={"Authorization": f"Bearer {TEST_TOKEN}"}, ) order_id = response.json()["data"]["createOrder"]["id"]
await asyncio.sleep(3.0)
async with httpx.AsyncClient(base_url="http://localhost:8080") as client: response = await client.post( "/graphql", json={"query": f'query {{ order(id: "{order_id}") {{ status }} }}'}, headers={"Authorization": f"Bearer {TEST_TOKEN}"}, ) assert response.json()["data"]["order"]["status"] == "failed"[observers] TOML config routes pg_notify events to NATS — no Python code required.events.payment.failed event that the compensation handler processes to release inventory.tb_event_log for observability.Start all services with Docker Compose:
docker-compose up -dVerify services are running:
docker-compose psExpected output:
NAME STATUS PORTSmicroservices-choreography-postgres-1 Up (healthy) 0.0.0.0:5432->5432/tcpmicroservices-choreography-nats-1 Up (healthy) 0.0.0.0:4222->4222/tcpmicroservices-choreography-order-service-1 Up 0.0.0.0:8080->8080/tcpmicroservices-choreography-inventory-worker-1 Upmicroservices-choreography-payment-worker-1 Upmicroservices-choreography-shipping-worker-1 UpCheck NATS is ready:
docker-compose exec nats nats server infoLook for jetstream: enabled: true in output.
Create a test order:
curl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $TOKEN" \ -d '{ "query": "mutation { createOrder(input: { identifier: \"ord-001\", total: \"49.99\" }) { id status } }" }'Expected response:
{ "data": { "createOrder": { "id": "550e8400-e29b-41d4-a716-446655440000", "status": "pending" } }}Wait for choreography to complete (allow all workers time to process):
sleep 5Verify order reached final state:
curl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $TOKEN" \ -d '{ "query": "query { order(id: \"550e8400-e29b-41d4-a716-446655440000\") { id status } }" }'Expected response:
{ "data": { "order": { "id": "550e8400-e29b-41d4-a716-446655440000", "status": "shipped" } }}Check event log for complete flow:
docker-compose exec postgres psql -U fraiseql -d orders -c \ "SELECT event_type, recorded_at FROM tb_event_log ORDER BY recorded_at;"Expected output:
event_type | recorded_at--------------------------------+----------------------------events.order.created | 2024-01-15 10:30:00events.inventory.reserved | 2024-01-15 10:30:01events.payment.charged | 2024-01-15 10:30:02events.order.shipped | 2024-01-15 10:30:05Test compensation (failure scenario):
curl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $TOKEN" \ -d '{ "query": "mutation { createOrder(input: { identifier: \"ord-fail-001\", total: \"9999.99\" }) { id } }" }'Wait and verify order failed:
sleep 3curl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $TOKEN" \ -d '{"query": "query { order(id: \"<id-from-above>\") { status } }"}'Expected: {"status": "failed"}
Run integration tests:
pytest tests/test_choreography.py -vExpected output:
tests/test_choreography.py::test_full_order_choreography PASSEDtests/test_choreography.py::test_payment_failure_compensation PASSEDdocker-compose up -ddocker-compose logs order-serviceCommon issues:
depends_on but PostgreSQL needs time to initializedocker-compose logs natsIf order does not progress:
Check NATS consumers:
docker-compose exec nats nats consumer reportVerify workers are connected:
docker-compose exec nats nats consumer info events inventory_processorsCheck worker logs:
docker-compose logs inventory-worker | tail -20docker-compose logs payment-worker | tail -20Manual event inspection:
docker-compose exec nats nats sub events.order.createdIf tb_event_log is empty:
Check the FraiseQL observer configuration:
docker-compose logs order-service | grep "observer"Verify the [observers] section in fraiseql.toml is correct and nats_url is reachable.
Check NATS stream exists:
docker-compose exec nats nats stream reportIf payment failure does not release inventory:
Check the payment worker compensation handler log:
docker-compose logs payment-worker | grep "compensation"Verify events.payment.failed subject is being published:
docker-compose exec nats nats sub events.payment.failedIf choreography is slow:
Check consumer lag:
docker-compose exec nats nats consumer info events inventory_processorsLook for num_pending (should be near 0)
Scale workers horizontally: Run multiple instances of each worker with the same durable consumer name — NATS JetStream distributes messages across them.
Monitor database connections:
docker-compose exec postgres psql -U fraiseql -c \ "SELECT count(*) FROM pg_stat_activity;"[observers] batch size and flush interval for throughputAdvanced Federation
Cross-service queries and data composition.
Advanced NATS
Reliability patterns and JetStream.
Observers Reference
TOML observer configuration reference.