Federation Reference
Complete reference documentation for FraiseQL’s federation capabilities. Federation Guide
This guide covers solutions for common issues when using FraiseQL’s federation and NATS capabilities.
Symptoms: DatabaseConnectionError: Failed to connect to database 'inventory'
Causes:
Solutions:
# 1. Test database connectivityfraiseql health --database inventory
# 2. Check configurationcat fraiseql.toml | grep -A 5 "\[databases.inventory\]"
# 3. Verify credentialsecho $INVENTORY_DATABASE_URL # Check URL format
# 4. Test manual connectionpsql $INVENTORY_DATABASE_URL -c "SELECT 1"# 4. Enable connection debug logging — set environment variable:# FRAISEQL_LOG=debug## Or configure the database in fraiseql.toml:[database]url = "${INVENTORY_DATABASE_URL}"pool_max = 10Symptoms: PoolExhaustedError: No available connections in pool for database 'inventory'
Causes:
Solutions:
# 1. Increase pool size[databases.inventory]pool_size = 50 # Was 10, now 50pool_timeout = 30000 # Timeout waiting for connection
[databases.inventory.idle]max_age = 3600000 # Close idle connections after 1 hour# 2. Monitor connection pool via the metrics endpoint:# GET /metrics → fraiseql_database_pool_active, fraiseql_database_pool_idle
# 3. Identify slow queries — set slow query threshold in fraiseql.toml:# [database]# log_slow_queries_ms = 1000 # Log queries slower than 1 second# Or use FRAISEQL_LOG=debug for full query logging.Symptoms: FederationTimeoutError: Federated query to 'inventory' timed out after 5000ms
Causes:
Solutions:
# 1. Increase federation timeout[federation]default_timeout = 10000 # Increased from 5000msbatch_size = 50 # Smaller batches = faster queries
# 2. Per-database timeout[federation.database_timeouts]inventory = 10000payments = 15000 # Slower database needs more time# 2. Configure federation batch size and timeout in fraiseql.toml:[federation]batch_size = 100default_timeout = 10000# The Python type declares the shape only — no database routing in the decorator:@fraiseql.typeclass Order: id: ID items: list[OrderItem] # Federated from inventory DB — see fraiseql.toml-- 3. Add database indexes on the inventory database:CREATE INDEX idx_order_item_order_id ON tb_order_item(order_id);Symptoms: CircularReferenceError when loading schema or executing deeply nested queries.
Causes:
Solutions:
# BAD: Circular reference — avoid federating back to the originating type@fraiseql.typeclass Order: items: list[OrderItem] # Federated from inventory DB
@fraiseql.typeclass OrderItem: order: Order # Circular! OrderItem federates back to Order
# GOOD: Only federate in one direction@fraiseql.typeclass Order: items: list[OrderItem] # Federated from inventory DB (one way only)
@fraiseql.typeclass OrderItem: order_id: ID # Just store the ID — don't federate back to OrderSymptoms: ForeignKeyError: Order references Product ID that doesn't exist
Causes:
Solutions:
# 1. Add validation before federation@fraiseql.mutationasync def add_item_to_order(order_id: ID, product_id: ID) -> OrderItem: """Validate product exists before adding."""
# Check product exists in inventory database product = await fraiseql.query( f""" query {{ product(id: "{product_id}") {{ id }} }} """ )
if not product: raise ValueError(f"Product {product_id} not found")
# Now safe to create item return await create_order_item(order_id, product_id)
# 2. Implement foreign key constraints-- On primary databaseALTER TABLE tb_order_itemADD CONSTRAINT fk_product_existsCHECK ( product_id IN ( SELECT id FROM v_product_from_inventory ));Symptoms: SagaCompensationError: Compensation step 'reserve_inventory' failed
Causes:
Solutions:
# 1. Add detailed compensation logging@compensate("reserve_inventory")async def compensate_reserve_inventory(ctx): """Release reserved inventory with logging."""
try: logging.info(f"Compensating reservations: {ctx.reservations}")
for reservation_id in ctx.reservations: logging.debug(f"Releasing reservation {reservation_id}")
await execute_sql( "UPDATE tb_reservation SET status = 'released' WHERE id = $1", [reservation_id] )
logging.debug(f"Released reservation {reservation_id}")
except Exception as e: logging.error(f"Compensation failed: {e}", exc_info=True)
# Re-raise so saga knows it failed raise SagaCompensationError( step="reserve_inventory", reason=str(e) ) from e
# 2. Make compensation idempotent@compensate("process_payment")async def compensate_payment(ctx): """Refund payment (idempotent)."""
transaction = await execute_sql( "SELECT status FROM tb_transaction WHERE id = $1", [ctx.transaction_id] )
# Only refund if not already refunded if transaction['status'] == 'refunded': logging.info(f"Payment already refunded: {ctx.transaction_id}") return
# Process refund await refund_payment(ctx.transaction_id)
# Mark as refunded await execute_sql( "UPDATE tb_transaction SET status = 'refunded' WHERE id = $1", [ctx.transaction_id] )
# 3. Increase timeout for compensation@saga( steps=["create_order", "reserve_inventory"], compensation_timeout=30000 # 30 seconds for compensation)async def create_order(...): passSymptoms: Order created but saga never completes; stuck in pending status.
Causes:
Solutions:
# 1. Monitor saga/mutation progress via structured logs.# Set FRAISEQL_LOG=debug to see each request with its requestId.# Use pg_notify observers in fraiseql.toml to track step completions:# [[observers]]# channel = "saga_step_completed"# nats_subject = "fraiseql.saga.step"
# 2. Implement saga timeout@saga( steps=["create_order", "reserve_inventory"], timeout=60000 # Overall saga timeout: 60 seconds)async def create_order(...): pass
# 3. Check stuck sagas in databaseSELECT *FROM tb_saga_executionWHERE status = 'pending' AND created_at < NOW() - INTERVAL '5 minutes'ORDER BY created_at DESC;
# 4. Manual cleanup of stuck sagas@fraiseql.mutationasync def cleanup_stuck_saga(saga_id: ID) -> bool: """Manually trigger compensation for stuck saga."""
saga = await execute_sql( "SELECT * FROM tb_saga_execution WHERE id = $1", [saga_id] )
if saga['status'] != 'pending': raise ValueError(f"Saga not pending: {saga['status']}")
# Trigger compensations in reverse order for step in reversed(saga['completed_steps']): await trigger_compensation(step, saga)
await execute_sql( "UPDATE tb_saga_execution SET status = 'compensated' WHERE id = $1", [saga_id] )
return TrueSymptoms: Query with federated field takes 10+ seconds
Causes:
Solutions:
# 1. Check if federation is batching correctly@fraiseql.querydef orders_with_items(limit: int = 100) -> list[Order]: """ With batching: Should be 2 queries total - 1 query: SELECT * FROM orders LIMIT 100 - 1 query: SELECT * FROM order_items WHERE order_id IN (...) """ pass
# Enable query logging to verifyawait fraiseql.enable_query_logging( database="primary", database="inventory")
# 2. Denormalize to reduce federated queries@fraiseql.typeclass Order: item_count: int # Denormalized count from SQL view
# Federated from inventory DB — configured in fraiseql.toml items: list[OrderItem]
# 3. Use selective federation@fraiseql.querydef order_summary(id: ID) -> dict: """ Return only needed fields to avoid full federation. """ return { "id": order.id, "total": order.total, "item_count": order.item_count # Don't federate full items if not needed }
# 4. Add indexes-- On inventory databaseCREATE INDEX idx_order_items_order_id_product_idON tb_order_item(order_id, product_id);Symptoms: NatsConnectionError: Failed to connect to NATS server
Causes:
Solutions:
# 1. Check NATS server statusnats server info
# 2. Test connectionnats ping
# 3. Check configurationcat fraiseql.toml | grep -A 3 "\[nats\]"
# 4. Verify URL formatecho $NATS_URL # Should be: nats://host:4222
# 5. Start NATS if not runningdocker run -it --rm -p 4222:4222 natsSymptoms: AuthorizationError: NATS authentication failed
Causes:
Solutions:
# 1. Update credentials[nats.auth]type = "token"token = "${NATS_TOKEN}" # Ensure env var is set
# 2. Verify tokenecho $NATS_TOKEN
# 3. Use NKey authentication (more secure)[nats.auth]type = "nkey"nkey = "${NATS_NKEY}"# 4. Generate new credentialsnats user create fraiseql-usernats nkey gen user -o fraiseql.nk # NKeySymptoms: StreamNotFoundError: Stream 'orders' not found
Causes:
Solutions:
# 1. List existing streamsnats stream list
# 2. Check stream configurationnats stream info orders
# 3. Create missing streamnats stream add orders \ --subjects "fraiseql.order.>" \ --max-msgs 1000000 \ --max-bytes 10GB \ --retention limits# 4. Ensure stream is configured[nats.jetstream.streams.orders]subjects = ["fraiseql.order.>"]replicas = 3max_msgs = 1000000max_bytes = 10737418240Symptoms: Consumer far behind in processing; queue backs up
Causes:
Solutions:
# 1. Check consumer statusnats consumer info orders order-processor
# Output shows:# Pending: 50000 # Many messages waiting# Delivered: 1000# Acked: 800
# 2. Increase processing capacity# Scale up consumer service: 1 instance -> 3 instances
# 3. Check consumer queue groupnats consumer info orders order-processor
# 4. Increase ack wait if processing is slow[nats.jetstream.consumers.order-processor]ack_wait = "60s" # Increased from 30s# 5. Optimize event handler@subscribe("fraiseql.order.created", queue_group="order-processors")async def process_order(event: dict): """Process order efficiently."""
order_id = event["data"]["order_id"]
# Batch process if possible cache_key = f"processed:{order_id}" if await redis.exists(cache_key): return # Already processed
# Do work await process_order_work(event)
# Mark as done await redis.set(cache_key, "true", ex=3600)Symptoms: Event published but subscribers don’t receive it
Causes:
Solutions:
# 1. Check consumer statusnats consumer info orders order-processor
# Look for:# - NumPending (messages waiting)# - NumAckPending (unacked messages)
# 2. Check subject matches# Published to: fraiseql.order.created# Subscribed to: fraiseql.order.> (match)# Subscribed to: orders.created (no match)# 3. Verify subscriber is running@subscribe("fraiseql.order.created")async def handle_order_created(event: dict): """Handle order created.""" logging.info(f"Received order event: {event['order_id']}") # If this log never appears, subscriber isn't running# 4. Check max deliver limit# If message is redelivered more than max_deliver times,# it goes to dead letter queue[nats.jetstream.consumers.order-processor]max_deliver = 3 # Redelivered max 3 times# Check dead letter queue@subscribe("fraiseql.dlq")async def handle_dead_letter(event: dict): logging.error(f"Dead letter: {event}")Symptoms: Status changed events arrive before creation event
Causes:
Solutions:
# 1. Process events sequentially per order@subscribe("fraiseql.order.>", queue_group="order-seq")async def process_order_event_sequentially(event: dict): """Process orders sequentially (one at a time)."""
order_id = event["data"]["order_id"]
# Use distributed lock per order async with distributed_lock(f"order:{order_id}"): # Only one process can handle events for this order at a time await process_event(event)
# 2. OR partition by order ID to ensure ordering[nats.partitions]enabled = truekey = "order_id" # Same order always goes to same partitioncount = 8# 3. Use durable consumer with explicit ack[nats.jetstream.consumers.order-processor]deliver_policy = "all" # Start from beginningack_policy = "explicit" # Must explicitly ACKack_wait = "30s" # Timeout if not ACKedmax_deliver = 3 # Retry 3 timesSymptoms: Same event processed multiple times; duplicate orders created
Causes:
Solutions:
# 1. Implement idempotent handler@subscribe("fraiseql.order.created")async def handle_order_created_idempotent(event: dict): """Process event idempotently."""
# Generate deterministic event ID event_id = f"{event['order_id']}-{event['timestamp']}"
# Check if already processed if await is_processed(event_id): logging.debug(f"Event already processed: {event_id}") return
# Process try: await create_order(event) await mark_processed(event_id) except Exception as e: logging.error(f"Error processing event: {e}") raise # Let NATS retry
async def is_processed(event_id: str) -> bool: """Check if event ID was already processed.""" return await redis.exists(f"event_processed:{event_id}")
async def mark_processed(event_id: str): """Mark event as processed (idempotency key).""" await redis.setex( f"event_processed:{event_id}", 30 * 24 * 3600, # 30-day TTL "true" )
# 2. Use event version/correlation IDevent = { "id": "evt_550e8400", # Unique event ID "correlation_id": "order_550e8400", # Links to entity "version": 1, # Version for this event type "type": "order.created", "data": {...}}Symptoms: Order created successfully but notification service doesn’t receive event
Causes:
Solutions:
# 1. Wait for event confirmation@saga.step("publish_confirmation")async def step_publish_confirmation(ctx): """Final step: publish confirmation with retry."""
max_retries = 3 retry_count = 0
while retry_count < max_retries: try: await publish( subject="fraiseql.order.confirmed", data={...}, timeout=5000 ) return
except Exception as e: retry_count += 1 if retry_count >= max_retries: raise
wait_time = 2 ** retry_count # Exponential backoff await asyncio.sleep(wait_time)
# 2. Store events in database for guaranteed delivery@saga.step("store_events")async def step_store_events(ctx): """Store events in DB for async publishing."""
await execute_sql( """ INSERT INTO tb_pending_events (order_id, event_type, payload) VALUES ($1, $2, $3) """, [ctx.order_id, "order.created", json.dumps({...})] )
# Separate service publishes from DB to NATS # Guaranteed to eventually publish
# 3. Add publishing monitoring@fraiseql.after_mutation("create_order")async def after_create_order_monitored(order): """Publish with monitoring."""
try: await metrics.time("event.publish", async_fn=lambda: publish(...)) except Exception as e: await metrics.increment("event.publish.error") await alert_ops(f"Failed to publish event for order {order.id}")Symptoms: Notification service processes event but queries return empty
Causes:
Solutions:
# 1. Only publish after transaction commits@saga.step("final_publish")async def step_final_publish(ctx): """Publish only after all federation steps complete."""
# Ensure all federation updates are committed # This is the final step - publish here await publish( subject="fraiseql.order.confirmed", data={...} )
# 2. Event handler waits for data availability@subscribe("fraiseql.order.created")async def handle_order_created_with_retry(event: dict): """Handle event with retry for data availability."""
order_id = event["data"]["order_id"]
# Retry if order not yet available max_retries = 5 for attempt in range(max_retries): order = await fraiseql.query( f'query {{ order(id: "{order_id}") {{ id }} }}' )
if order: await process_order(order) return
if attempt < max_retries - 1: await asyncio.sleep(2 ** attempt) # Backoff
logging.error(f"Order not found after retries: {order_id}")
# 3. Include data in event (denormalization)# Don't require subscribers to query federationawait publish( subject="fraiseql.order.created", data={ "order_id": order.id, "customer_id": order.customer_id, "total": str(order.total), "items": [...] # Include items in event # No need for subscriber to federate })-- Problem: Saga holds lock while federation waits-- Solution: Use lower isolation levelSET TRANSACTION ISOLATION LEVEL READ COMMITTED;# Use streaming results@fraiseql.queryasync def large_order_list(): # Use cursor-based pagination cursor = None while True: orders = await fraiseql.query( f'query {{ orders(after: "{cursor}", limit: 100) {{ id }} }}' ) if not orders: break for order in orders: yield order cursor = orders[-1]["id"]# SQLite has single writer - serialize federated writesclass SQLiteSerializer: _lock = asyncio.Lock()
@classmethod async def federated_write(cls, query): async with cls._lock: return await execute_federation_write(query)Federation Reference
Complete reference documentation for FraiseQL’s federation capabilities. Federation Guide
NATS Reference
Reference documentation for NATS integration and JetStream configuration. NATS Guide
Error Handling
Patterns for handling errors in federated and event-driven applications. Error Handling Guide
General Troubleshooting
Diagnose connection, query, and infrastructure issues. Troubleshooting Index