Federation Configuration
Multi-Database Setup — Synchronous federation reference
This guide shows how to combine FraiseQL’s federation (synchronous cross-database access) with NATS (asynchronous event-driven communication) to build robust, scalable distributed systems.
Use Case: You need ACID guarantees but want event notifications.
from fraiseql import saga, publish, compensate
@saga(steps=["create_order", "reserve_inventory", "process_payment"])@fraiseql.mutationasync def create_order_with_events( customer_id: ID, items: list[dict], payment_method_id: ID) -> Order: """ Synchronous saga that: 1. Uses federation to coordinate atomically 2. Publishes events for other services 3. Returns immediately with guaranteed state """ pass
# Step 1: Create order (federation to primary DB)@saga.step("create_order", database="primary")async def step_create_order(ctx, customer_id, items): order = await execute_order_creation(customer_id, items) ctx.order_id = order['id']
# Publish event for async subscribers await publish( subject="fraiseql.order.created", data={ "order_id": order['id'], "customer_id": customer_id, "total": order['total'], "items": items } )
return order
# Step 2: Reserve inventory (federation to inventory DB)@saga.step("reserve_inventory", database="inventory")async def step_reserve_inventory(ctx, items): reservations = await reserve_items_atomically(ctx.order_id, items) ctx.reservations = reservations
# Publish event await publish( subject="fraiseql.inventory.reserved", data={ "order_id": ctx.order_id, "reservations": reservations } )
return reservations
# Step 3: Process payment (federation to payments DB)@saga.step("process_payment", database="payments")async def step_process_payment(ctx, customer_id, total, payment_method_id): transaction = await process_payment_atomically( ctx.order_id, customer_id, total, payment_method_id )
# Publish event (final step - no compensation possible) await publish( subject="fraiseql.payment.processed", data={ "order_id": ctx.order_id, "transaction_id": transaction['id'], "amount": transaction['amount'], "status": "completed" } )
ctx.transaction_id = transaction['id'] return transaction
# Compensation: Use federation to clean up atomically@compensate("create_order")async def compensate_order_creation(ctx): await cancel_order_atomically(ctx.order_id)
# Notify others about cancellation await publish( subject="fraiseql.order.cancelled", data={ "order_id": ctx.order_id, "reason": "saga_compensated" } )
@compensate("reserve_inventory")async def compensate_inventory_reservation(ctx): await release_reservations_atomically(ctx.reservations)
await publish( subject="fraiseql.inventory.released", data={ "order_id": ctx.order_id, "reservations": ctx.reservations } )Use Case: You want immutable event log + live queries on latest state.
from fraiseql import subscribe, publishfrom datetime import datetimefrom decimal import Decimal
# Events are published to NATS (immutable log)# Federation queries read the current projected state
@subscribe("fraiseql.order.>")async def project_order_state(event: dict): """ Subscribe to all order events and update the projection (current state that federation queries read from). """
event_type = event.get("type") order_id = event["data"]["order_id"]
if event_type == "order.created": await handle_order_created_event(event)
elif event_type == "order.confirmed": await handle_order_confirmed_event(event)
elif event_type == "order.shipped": await handle_order_shipped_event(event)
elif event_type == "order.cancelled": await handle_order_cancelled_event(event)
async def handle_order_created_event(event: dict): """Project order.created event into current state."""
# Insert into projection table (what federation queries read) await execute_sql( """ INSERT INTO tb_order_projection ( order_id, customer_id, total, status, created_at ) VALUES ($1, $2, $3, 'created', NOW()) """, [ event["data"]["order_id"], event["data"]["customer_id"], event["data"]["total"] ] )
async def handle_order_confirmed_event(event: dict): """Project order.confirmed event."""
await execute_sql( """ UPDATE tb_order_projection SET status = 'confirmed', updated_at = NOW() WHERE order_id = $1 """, [event["data"]["order_id"]] )
async def handle_order_shipped_event(event: dict): """Project order.shipped event."""
await execute_sql( """ UPDATE tb_order_projection SET status = 'shipped', updated_at = NOW() WHERE order_id = $1 """, [event["data"]["order_id"]] )
# Now federation queries always read the projected state@fraiseql.query(sql_source="v_order_projection", database="primary")def order(id: ID) -> Order: """Query reads current projected state.""" pass
# You can also rebuild projection from eventsasync def rebuild_projection_from_events(start_time: datetime): """Rebuild projection by replaying events from NATS."""
# Subscribe to all past events subscription = await subscribe_to_history( subject="fraiseql.order.>", start_time=start_time )
async for event in subscription: await project_order_state(event)
print("Projection rebuilt successfully")Use Case: Atomically update across databases, then notify external systems.
from fraiseql import saga, publish, requestfrom typing import Optionalimport asyncio
@saga(steps=["update_customer", "update_balance", "record_transaction"])@fraiseql.mutationasync def transfer_funds_between_accounts( from_account_id: ID, to_account_id: ID, amount: Decimal) -> dict: """ Transfer funds with: 1. Federation: Atomic updates to accounts 2. NATS: Notification to compliance and notifications services """ pass
@saga.step("update_customer", database="primary")async def step_update_customer(ctx, from_account_id, to_account_id): # Ensure both accounts exist from_account = await execute_sql( "SELECT id FROM tb_account WHERE id = $1 FOR UPDATE", [from_account_id] ) to_account = await execute_sql( "SELECT id FROM tb_account WHERE id = $1 FOR UPDATE", [to_account_id] )
if not from_account or not to_account: raise ValueError("One or both accounts not found")
ctx.from_account_id = from_account_id ctx.to_account_id = to_account_id
@saga.step("update_balance", database="ledger")async def step_update_balance(ctx, amount): # Debit from account await execute_sql( "UPDATE tb_balance SET balance = balance - $1 WHERE account_id = $2", [amount, ctx.from_account_id] )
# Credit to account await execute_sql( "UPDATE tb_balance SET balance = balance + $1 WHERE account_id = $2", [amount, ctx.to_account_id] )
@saga.step("record_transaction", database="ledger")async def step_record_transaction(ctx, amount): transaction = await execute_sql( """ INSERT INTO tb_transaction ( from_account_id, to_account_id, amount, status, created_at ) VALUES ($1, $2, $3, 'completed', NOW()) RETURNING id """, [ctx.from_account_id, ctx.to_account_id, amount] )
ctx.transaction_id = transaction['id']
# Publish events after atomic completion await publish( subject="fraiseql.transaction.completed", data={ "type": "transaction.completed", "transaction_id": transaction['id'], "from_account_id": ctx.from_account_id, "to_account_id": ctx.to_account_id, "amount": str(amount) } )
# Notify compliance service (async, doesn't block response) await publish( subject="compliance.transaction.created", data={ "transaction_id": transaction['id'], "amount": str(amount), "timestamp": datetime.utcnow().isoformat() } )
return transaction
# External compliance service processes event@subscribe("compliance.transaction.created")async def compliance_check_transaction(event: dict): """Compliance service checks for suspicious patterns."""
transaction_id = event["data"]["transaction_id"] amount = Decimal(event["data"]["amount"])
# Check against risk rules is_suspicious = await check_against_risk_rules(amount)
if is_suspicious: # Publish alert await publish( subject="compliance.alert.suspicious_transaction", data={ "transaction_id": transaction_id, "amount": str(amount), "reason": "exceeds_daily_limit" } )Use Case: Some data from federation (consistency), some from events (completeness).
@fraiseql.typeclass OrderWithHistory: # Current state from federation (consistent) id: ID customer_id: ID status: str total: Decimal
# Historical state from event stream status_history: list[StatusChange] all_events: list[dict]
@fraiseql.query(sql_source="v_order", database="primary")def order_with_history(id: ID) -> OrderWithHistory: """ Hybrid query: 1. Get current state from federation 2. Get event history from NATS """ pass
async def resolve_order_with_history(order_id: ID) -> OrderWithHistory: # Parallel fetch: federation query + NATS event replay current_order, events = await asyncio.gather( # Fetch current state from federation fraiseql.query(f""" query {{ order(id: "{order_id}") {{ id status total customer_id }} }} """),
# Fetch event history from NATS query_event_history(order_id) )
# Build status history from events status_history = [ StatusChange( status=event["data"]["new_status"], timestamp=event["timestamp"] ) for event in events if event["type"] == "order.status_changed" ]
return OrderWithHistory( **current_order, status_history=status_history, all_events=events )When using both federation and NATS, you may have partial failures:
@saga.step("create_and_notify", database="primary")async def step_create_order_and_notify(ctx, customer_id, items): # Step 1: Create order (federation - can fail/compensate) order = await execute_order_creation(customer_id, items) ctx.order_id = order['id']
# Step 2: Publish event (NATS - best-effort) try: await publish( subject="fraiseql.order.created", data={...} ) except Exception as e: # NATS publish failed, but order was created successfully # Log it for retry, but don't fail the whole saga logging.warning(f"Failed to publish event for order {order['id']}: {e}")
# Optionally: Store in dead letter table for later retry await store_failed_event( subject="fraiseql.order.created", data={...}, error=str(e) )
# Continue without failing - NATS is best-effort
return orderUse federation alone - synchronous, atomic, consistent.
@fraiseql.query(sql_source="v_order", database="primary")def order(id: ID) -> Order: """Immediate, consistent read.""" passUse NATS alone - asynchronous, will eventually be consistent.
@subscribe("fraiseql.order.created")async def handle_order_created(event: dict): """Asynchronous, eventually consistent update.""" await update_cache(event["data"]["order_id"])Use both together:
# Write with federationawait create_order_via_federation(...)
# Read current + eventscurrent_order = await federation_query(...)history = await nats_event_query(...)Federation Configuration
Multi-Database Setup — Synchronous federation reference
Advanced NATS
Event-Driven Patterns — Async messaging reference
Advanced Federation Patterns
Enterprise Patterns — Saga orchestration