Observers
Observers — Server-side event reactions
FraiseQL supports GraphQL subscriptions for real-time updates, with multiple transport options optimized for different use cases.
Subscriptions push data to clients when events occur:
subscription { orderUpdated(customerId: "123") { id status updatedAt }}When an order changes, connected clients receive the following WebSocket message:
{ "id": "1", "type": "next", "payload": { "data": { "orderUpdated": { "id": "order-456", "status": "shipped", "updatedAt": "2024-01-15T10:30:00Z" } } }}The "type": "next" field is part of the graphql-ws protocol. Each subscription event arrives as a separate next message on the WebSocket connection identified by "id".
Enable subscriptions in fraiseql.toml:
[subscriptions]max_active_per_connection = 10max_fields_per_subscription = 50
[subscriptions.hooks]on_connect = "http://localhost:8001/hooks/ws-connect"on_disconnect = "http://localhost:8001/hooks/ws-disconnect"on_subscribe = "http://localhost:8001/hooks/ws-subscribe"timeout_ms = 500Define a subscription in your Python schema:
import fraiseql
@fraiseql.subscription( sql_source="tb_order", event="UPDATE")def order_updated(customer_id: fraiseql.ID | None = None) -> Order: """Subscribe to order updates, optionally filtered by customer.""" passCompile and run the server:
fraiseql compilefraiseql runConnect a client using graphql-ws:
npm install graphql-wsSubscribe from your frontend (see client examples below).
The default transport for browser clients. Uses the modern graphql-ws protocol.
Configuration:
[subscriptions]max_active_per_connection = 10Client examples:
import { createClient } from 'graphql-ws';
const client = createClient({ url: 'wss://api.example.com/ws', connectionParams: { authToken: 'Bearer eyJ...' }});
// Subscribeconst unsubscribe = client.subscribe<{ orderUpdated: { id: string; status: string } }>( { query: `subscription OrderUpdates($customerId: ID!) { orderUpdated(customerId: $customerId) { id status updatedAt } }`, variables: { customerId: '123' } }, { next: ({ data }) => { console.log('Order updated:', data?.orderUpdated); // Output: Order updated: { id: 'order-456', status: 'shipped', updatedAt: '...' } }, error: (err) => console.error('Subscription error:', err), complete: () => console.log('Subscription complete') });
// Clean up when doneunsubscribe();import asynciofrom gql import Client, gqlfrom gql.transport.websockets import WebsocketsTransport
async def watch_orders(customer_id: str) -> None: transport = WebsocketsTransport( url="wss://api.example.com/ws", headers={"Authorization": "Bearer eyJ..."} )
async with Client(transport=transport) as session: subscription = gql(""" subscription OrderUpdates($customerId: ID!) { orderUpdated(customerId: $customerId) { id status updatedAt } } """)
async for result in session.subscribe( subscription, variable_values={"customerId": customer_id} ): order = result["orderUpdated"] print(f"Order {order['id']} is now: {order['status']}") # Output: Order order-456 is now: shipped
asyncio.run(watch_orders("123"))package main
import ( "context" "fmt" "log" "time"
"github.com/hasura/go-graphql-client")
func watchOrders(customerID string) { client := graphql.NewSubscriptionClient("wss://api.example.com/ws"). WithConnectionParams(map[string]interface{}{ "headers": map[string]string{ "Authorization": "Bearer eyJ...", }, })
defer client.Close()
subscription := ` subscription OrderUpdates($customerId: ID!) { orderUpdated(customerId: $customerId) { id status updatedAt } } `
variables := map[string]interface{}{ "customerId": graphql.ID(customerID), }
subscriptionId, err := client.Subscribe(subscription, variables, func(data []byte, err error) error { if err != nil { log.Printf("Subscription error: %v", err) return nil }
// Parse result var result struct { OrderUpdated struct { ID string Status string UpdatedAt string } }
if err := json.Unmarshal(data, &result); err != nil { return err }
fmt.Printf("Order %s is now: %s\n", result.OrderUpdated.ID, result.OrderUpdated.Status) return nil })
if err != nil { log.Fatal(err) }
log.Printf("Subscribed with ID: %s", subscriptionId)
// Run for 60 seconds time.Sleep(60 * time.Second)}
func main() { watchOrders("123")}For server-to-server communication without WebSocket overhead.
Database trigger:
-- Automatic trigger generated by FraiseQLCREATE FUNCTION notify_order_update() RETURNS trigger AS $$BEGIN PERFORM pg_notify( 'fraiseql_order_updated', json_build_object( 'id', NEW.id, 'status', NEW.status, 'customer_id', NEW.fk_customer )::text ); RETURN NEW;END;$$ LANGUAGE plpgsql;import fraiseql
@fraiseql.typeclass Order: id: fraiseql.ID status: str customer_id: fraiseql.ID updated_at: fraiseql.DateTime
@fraiseql.subscription( sql_source="tb_order", event="UPDATE")def order_updated(customer_id: fraiseql.ID | None = None) -> Order: """Subscribe to order updates, optionally filtered by customer.""" pass@fraiseql.subscription( sql_source="tb_order", event="UPDATE", filter="status.changed()" # Only when status changes)def order_status_changed() -> Order: pass@fraiseql.subscription( sql_source="tb_order", event=["INSERT", "UPDATE", "DELETE"])def order_changed() -> Order: """Subscribe to any order change.""" passFraiseQL supports both the modern graphql-transport-ws protocol and the legacy graphql-ws (Apollo subscriptions-transport-ws) protocol. The protocol is automatically negotiated from the Sec-WebSocket-Protocol WebSocket header — no configuration required. Clients without the header default to graphql-transport-ws.
[subscriptions]max_subscriptions_per_connection = 100 # optional, default: unlimited
[subscriptions.hooks]on_connect = "https://your-app/ws/on-connect" # fail-closedon_disconnect = "https://your-app/ws/on-disconnect" # fire-and-forgeton_subscribe = "https://your-app/ws/on-subscribe" # fail-closedtimeout_ms = 500 # hook timeout (default: 500ms)Client Server | | |------- connection_init ------>| { type: "connection_init", payload: { authToken: "..." } } |<------ connection_ack --------| { type: "connection_ack" } | | |------- subscribe ------------>| { id: "1", type: "subscribe", payload: { query: "..." } } |<------ next (data) -----------| { id: "1", type: "next", payload: { data: { ... } } } |<------ next (data) -----------| { id: "1", type: "next", payload: { data: { ... } } } | | |------- complete ------------->| { id: "1", type: "complete" } | | |<------ ping ------------------| { type: "ping" } |------- pong ----------------->| { type: "pong" }Pass authentication via connectionParams:
const client = createClient({ url: 'wss://api.example.com/ws', connectionParams: { authToken: 'Bearer eyJ...', tenantId: 'tenant-123' }});Network interruptions are common in browser environments. Configure automatic reconnection with exponential backoff:
import { createClient } from 'graphql-ws';
const client = createClient({ url: 'wss://api.example.com/ws', connectionParams: { authToken: 'Bearer eyJ...' },
// Retry indefinitely with exponential backoff (cap at 30s) retryAttempts: Infinity, shouldRetry: () => true, retryWait: async (retries) => { const delay = Math.min(1000 * Math.pow(2, retries), 30000); await new Promise(r => setTimeout(r, delay)); },
// Re-subscribe to active subscriptions after reconnect on: { connected: () => console.log('WebSocket connected'), closed: (event) => console.warn('WebSocket closed:', event), error: (err) => console.error('WebSocket error:', err) }});Proxy keepalive: If your deployment sits behind a load balancer or reverse proxy, configure its idle-connection timeout to be longer than ping_interval. For example, with nginx:
proxy_read_timeout 90s; # Must exceed ping_interval (default 30s)proxy_send_timeout 90s;subscription OrderUpdates($customerId: ID!) { orderUpdated(customerId: $customerId) { id status }}client.subscribe( { query: `subscription OrderUpdates($customerId: ID!) { orderUpdated(customerId: $customerId) { id status } }`, variables: { customerId: '123' } }, handlers);FraiseQL filters events before sending to clients:
@fraiseql.subscription( sql_source="tb_order", event="UPDATE", auth_filter="customer_id == context.user_id" # Only user's orders)def my_order_updated() -> Order: passconst client = createClient({ url: 'wss://api.example.com/ws', retryAttempts: 5, retryWait: async (retries) => { // Exponential backoff await new Promise(r => setTimeout(r, 1000 * Math.pow(2, retries))); }, on: { error: (err) => console.error('Connection error:', err), closed: () => console.log('Connection closed') }});client.subscribe( { query: '...' }, { error: (errors) => { // GraphQL errors during subscription console.error('Subscription errors:', errors); } });| Metric | Description |
|---|---|
fraiseql_subscriptions_active | Active subscriptions |
fraiseql_subscriptions_connections | WebSocket connections |
fraiseql_subscriptions_events_total | Events delivered |
fraiseql_subscriptions_latency_ms | Event delivery latency |
WebSocket connections are authenticated via the same JWT/API key middleware as HTTP requests. The token is passed in the connectionParams of the connection_init message:
const client = createClient({ url: 'wss://api.example.com/ws', connectionParams: { authToken: 'Bearer eyJ...' }});The server validates the token before accepting the connection. Rate limiting uses the same [security.rate_limiting] configuration as the GraphQL HTTP endpoint.
Prevent expensive subscriptions:
[subscriptions]max_selection_depth = 5max_selected_fields = 50# Good - filteredsubscription { orderUpdated(customerId: "123") { id status }}
# Avoid - unfiltered (receives all orders)subscription { orderUpdated { id status }}// Store unsubscribe functionconst unsubscribe = client.subscribe(...);
// Clean up on unmountuseEffect(() => { return () => unsubscribe();}, []);ping_interval matches client expectationsSend a subscription over WebSocket:
# Using wscat (npm install -g wscat)wscat -c ws://localhost:8080/graphql -s graphql-wsThen send:
{"type":"connection_init","payload":{}}{"id":"1","type":"subscribe","payload":{"query":"subscription { postCreated { id title author { name } } }"}}In another terminal, create a post to trigger the subscription:
curl -s http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -d '{"query":"mutation { createPost(input: { title: \"Test\", content: \"Hello\" }) { id } }"}'Your WebSocket connection should receive:
{ "id": "1", "type": "next", "payload": { "data": { "postCreated": { "id": "post-abc123", "title": "Test", "author": { "name": "Alice" } } } }}