Mutations
Observers
Observers provide event-driven logic for your FraiseQL API. They react to database changes and trigger actions like webhooks, emails, or Slack notifications.
Why Observers?
Section titled “Why Observers?”Traditional approaches to post-mutation logic:
- Application code: Business logic scattered across services
- Database triggers: Limited to SQL, hard to debug
- Message queues: Infrastructure complexity
Observers centralize event-driven logic in your schema:
@observer( entity="Order", event="INSERT", condition="total > 1000", actions=[ slack("#sales", "High-value order: ${total}"), email(to="sales@example.com", subject="New order") ])def on_high_value_order(): passimport { observer, slack, email } from 'fraiseql';
observer({ entity: 'Order', event: 'INSERT', condition: 'total > 1000', actions: [ slack('#sales', 'High-value order: ${total}'), email({ to: 'sales@example.com', subject: 'New order' }), ], handler: function onHighValueOrder() {},});Observer lifecycle
Section titled “Observer lifecycle”Observer Anatomy
Section titled “Observer Anatomy”An observer consists of:
- Entity: The table/type being watched
- Event: INSERT, UPDATE, or DELETE
- Condition: When to trigger (optional)
- Actions: What to do when triggered
from fraiseql import observer, webhook, email, slack
@observer( entity="Order", # Watch tb_order event="INSERT", # On new records condition="total > 100", # Only if total > 100 actions=[ # Actions to execute webhook("https://api.example.com/orders"), slack("#orders", "New order: {id}") ])def on_new_order(): """Triggered when a new order over $100 is created.""" passimport { observer, webhook, slack } from 'fraiseql';
observer({ entity: 'Order', // Watch tb_order event: 'INSERT', // On new records condition: 'total > 100', // Only if total > 100 actions: [ // Actions to execute webhook('https://api.example.com/orders'), slack('#orders', 'New order: {id}'), ], handler: function onNewOrder() { // Triggered when a new order over $100 is created. },});Events
Section titled “Events”INSERT
Section titled “INSERT”Triggered when a new record is created:
@observer( entity="User", event="INSERT", actions=[ email( to="{email}", subject="Welcome to our platform!", body="Hello {name}, thanks for signing up." ) ])def on_user_signup(): passobserver({ entity: 'User', event: 'INSERT', actions: [ email({ to: '{email}', subject: 'Welcome to our platform!', body: 'Hello {name}, thanks for signing up.', }), ], handler: function onUserSignup() {},});UPDATE
Section titled “UPDATE”Triggered when a record is modified:
@observer( entity="Order", event="UPDATE", condition="status.changed() and status == 'shipped'", actions=[ email( to="{customer_email}", subject="Your order {id} has shipped!", body="Your order is on its way." ) ])def on_order_shipped(): passobserver({ entity: 'Order', event: 'UPDATE', condition: "status.changed() and status == 'shipped'", actions: [ email({ to: '{customer_email}', subject: 'Your order {id} has shipped!', body: 'Your order is on its way.', }), ], handler: function onOrderShipped() {},});Change detection:
field.changed()— Field value changedfield.old— Previous valuefield.new— New value
DELETE
Section titled “DELETE”Triggered when a record is removed:
@observer( entity="Order", event="DELETE", actions=[ webhook( "https://api.example.com/archive", body_template='{"type": "order", "id": "{{id}}", "data": {{_json}}}' ) ])def on_order_deleted(): passobserver({ entity: 'Order', event: 'DELETE', actions: [ webhook('https://api.example.com/archive', { bodyTemplate: '{"type": "order", "id": "{{id}}", "data": {{_json}}}', }), ], handler: function onOrderDeleted() {},});Conditions
Section titled “Conditions”Filter which events trigger the observer:
Simple Comparisons
Section titled “Simple Comparisons”condition="total > 1000"condition="status == 'active'"condition="is_premium == true"Change Detection
Section titled “Change Detection”# Field changed to specific valuecondition="status.changed() and status == 'shipped'"
# Field changed from specific valuecondition="status.old == 'pending' and status == 'approved'"
# Any change to fieldcondition="email.changed()"Complex Logic
Section titled “Complex Logic”# Multiple conditionscondition="total > 1000 and is_premium == true"
# OR conditionscondition="status == 'failed' or retry_count > 3"
# Field comparisonscondition="quantity > min_quantity"Actions
Section titled “Actions”Webhook
Section titled “Webhook”Send HTTP requests to external services:
from fraiseql import webhook
# Simple webhookwebhook("https://api.example.com/orders")
# With custom headerswebhook( "https://api.example.com/orders", headers={"Authorization": "Bearer {API_TOKEN}"})
# With custom bodywebhook( "https://api.example.com/orders", body_template='{"order_id": "{{id}}", "total": {{total}}}')
# URL from environment variablewebhook(url_env="SHIPPING_WEBHOOK_URL")Template variables:
{field_name}— Field value{ENV_VAR}— Environment variable{{field}}— Mustache-style for JSON templates{{_json}}— Complete record as JSON
Webhook payload format
Section titled “Webhook payload format”When no body_template is specified, FraiseQL sends a standard JSON payload to the webhook URL:
// Webhook payload received by your endpoint{ "event": "INSERT", "table": "tb_order", "timestamp": "2026-02-25T10:00:00Z", "data": { "new": { "id": "order-123", "status": "confirmed", "total": 99.99, "user_id": "user-456" }, "old": null }}For UPDATE events, "old" contains the previous field values and "new" contains the updated values. For DELETE events, "new" is null and "old" contains the deleted record. The "timestamp" field is a stable event ID you can use for deduplication.
Send email notifications:
from fraiseql import email
email( to="{customer_email}", subject="Order {id} confirmed", body="Thank you for your order of ${total}.", from_email="orders@example.com")
# Multiple recipientsemail( to=["admin@example.com", "{customer_email}"], subject="New order", body="Order {id} was placed.")
# HTML bodyemail( to="{email}", subject="Welcome!", body_html="<h1>Welcome {name}!</h1><p>Thanks for joining.</p>")Send Slack messages:
from fraiseql import slack
# Simple messageslack("#orders", "New order: {id} for ${total}")
# With formattingslack( "#sales", ":moneybag: High-value order {id}: ${total} from {customer_email}")
# Direct messageslack( "@sales-lead", "Urgent: Order {id} requires approval")Retry Configuration
Section titled “Retry Configuration”Configure retry behavior for failed actions:
from fraiseql import observer, webhook, RetryConfig
@observer( entity="Payment", event="UPDATE", condition="status == 'failed'", actions=[ webhook("https://api.example.com/payment-failures") ], retry=RetryConfig( max_attempts=5, backoff_strategy="exponential", initial_delay_ms=100, max_delay_ms=60000 ))def on_payment_failure(): passRetry options:
max_attempts: Maximum retry count (default: 3)backoff_strategy:"fixed","linear", or"exponential"initial_delay_ms: First retry delay in millisecondsmax_delay_ms: Maximum delay between retries
Complete Example
Section titled “Complete Example”Here’s a full e-commerce observer setup:
import fraiseqlfrom fraiseql import ( ID, DateTime, email, observer, slack, type, webhook, RetryConfig)
@fraiseql.typeclass Order: id: ID customer_email: str status: str total: float created_at: DateTime
@fraiseql.typeclass Payment: id: ID order_id: ID amount: float status: str processed_at: DateTime | None
# High-value order notifications@observer( entity="Order", event="INSERT", condition="total > 1000", actions=[ webhook("https://api.example.com/high-value-orders"), slack("#sales", ":moneybag: High-value order {id}: ${total}"), email( to="sales@example.com", subject="High-value order {id}", body="Order {id} for ${total} was created by {customer_email}" ) ])def on_high_value_order(): """Triggered when a high-value order is created.""" pass
# Order shipped notifications@observer( entity="Order", event="UPDATE", condition="status.changed() and status == 'shipped'", actions=[ webhook(url_env="SHIPPING_WEBHOOK_URL"), email( to="{customer_email}", subject="Your order {id} has shipped!", body="Your order is on its way.", from_email="noreply@example.com" ) ])def on_order_shipped(): """Triggered when an order status changes to 'shipped'.""" pass
# Payment failure handling@observer( entity="Payment", event="UPDATE", condition="status == 'failed'", actions=[ slack("#payments", ":warning: Payment failed for order {order_id}"), webhook( "https://api.example.com/payment-failures", headers={"Authorization": "Bearer {PAYMENT_API_TOKEN}"} ) ], retry=RetryConfig( max_attempts=5, backoff_strategy="exponential", initial_delay_ms=100, max_delay_ms=60000 ))def on_payment_failure(): """Triggered when a payment fails.""" pass
# Archive deleted orders@observer( entity="Order", event="DELETE", actions=[ webhook( "https://api.example.com/archive", body_template='{"type": "order", "id": "{{id}}", "data": {{_json}}}' ) ])def on_order_deleted(): """Triggered when an order is deleted.""" passimport { type, observer, webhook, email, slack, RetryConfig } from 'fraiseql';
@type()class Order { id: string; customerEmail: string; status: string; total: number; createdAt: Date;}
@type()class Payment { id: string; orderId: string; amount: number; status: string; processedAt: Date | null;}
// High-value order notificationsobserver({ entity: 'Order', event: 'INSERT', condition: 'total > 1000', actions: [ webhook('https://api.example.com/high-value-orders'), slack('#sales', ':moneybag: High-value order {id}: ${total}'), email({ to: 'sales@example.com', subject: 'High-value order {id}', body: 'Order {id} for ${total} was created by {customer_email}', }), ], handler: function onHighValueOrder() {},});
// Order shipped notificationsobserver({ entity: 'Order', event: 'UPDATE', condition: "status.changed() and status == 'shipped'", actions: [ webhook({ urlEnv: 'SHIPPING_WEBHOOK_URL' }), email({ to: '{customer_email}', subject: 'Your order {id} has shipped!', body: 'Your order is on its way.', fromEmail: 'noreply@example.com', }), ], handler: function onOrderShipped() {},});
// Payment failure handlingobserver({ entity: 'Payment', event: 'UPDATE', condition: "status == 'failed'", actions: [ slack('#payments', ':warning: Payment failed for order {order_id}'), webhook('https://api.example.com/payment-failures', { headers: { Authorization: 'Bearer {PAYMENT_API_TOKEN}' }, }), ], retry: new RetryConfig({ maxAttempts: 5, backoffStrategy: 'exponential', initialDelayMs: 100, maxDelayMs: 60000, }), handler: function onPaymentFailure() {},});
// Archive deleted ordersobserver({ entity: 'Order', event: 'DELETE', actions: [ webhook('https://api.example.com/archive', { bodyTemplate: '{"type": "order", "id": "{{id}}", "data": {{_json}}}', }), ], handler: function onOrderDeleted() {},});Observer Log Output
Section titled “Observer Log Output”When LOG_LEVEL=debug is set, FraiseQL prints each observer invocation to stdout. Here is what firing the high-value order observer looks like:
[observer] on_high_value_order fired: Order{id=ord_123, total=1250.00}[observer] Sending Slack notification to #sales[observer] Slack notification delivered (channel=#sales, ts=1710509048.123456)[observer] Sending email to sales@example.com[observer] Email delivered (message_id=<msg_01HV3K@mail.fraiseql.io>)[observer] on_high_value_order completed in 287msIf an action fails, the log includes the error and retry schedule:
[observer] on_high_value_order fired: Order{id=ord_124, total=2400.00}[observer] Sending Slack notification to #sales[observer] Slack notification failed: connection timeout (attempt 1/3)[observer] Retrying in 200ms (exponential backoff)[observer] Slack notification delivered on attempt 2How Observers Work
Section titled “How Observers Work”- Compile time: Observers are compiled into database triggers and action handlers
- Write operation: Mutation modifies
tb_table - Trigger fires: Database trigger evaluates condition
- Action dispatch: Matching observers queue their actions
- Action execution: Actions execute asynchronously with retry logic
When multiple observers match the same event (same entity and event type), they execute in definition order — top-to-bottom as they appear in your schema file. This is deterministic and stable across deployments.
The pipeline from trigger to execution:
Observer execution pipeline
graph LR A[GraphQL Mutation] --> B[Write to tb_* Table] B --> C[Database Trigger] C --> D{Condition Met?} D -->|Yes| E[Queue Actions] D -->|No| F[Skip] E --> G[Async Execution<br/>with Retry]Each observer can dispatch one or more action types:
Available action types
graph LR O[Observer] --> W[Webhook<br/>HTTP POST] O --> E[Email<br/>SMTP / SES] O --> S[Slack<br/>Channel / DM] O --> C[Custom<br/>Handler]Best Practices
Section titled “Best Practices”Keep Conditions Simple
Section titled “Keep Conditions Simple”Complex conditions are harder to debug. Prefer simple, readable conditions:
# Goodcondition="status == 'shipped'"
# Avoidcondition="status == 'shipped' and total > 100 and customer_type == 'premium' and region in ('US', 'CA')"For complex logic, create separate observers or use webhook endpoints that handle the logic.
Use Environment Variables
Section titled “Use Environment Variables”Never hardcode secrets or URLs:
# Goodwebhook(url_env="WEBHOOK_URL")webhook(headers={"Authorization": "Bearer {API_TOKEN}"})
# Badwebhook("https://api.example.com/secret-endpoint")webhook(headers={"Authorization": "Bearer sk-12345"})Handle Failures Gracefully
Section titled “Handle Failures Gracefully”Configure appropriate retry strategies:
# Idempotent actions (safe to retry)retry=RetryConfig(max_attempts=5, backoff_strategy="exponential")
# Non-idempotent actions (risky to retry)retry=RetryConfig(max_attempts=1) # or omit retryDocument Observer Purpose
Section titled “Document Observer Purpose”Use docstrings to explain business logic:
@observer( entity="Order", event="UPDATE", condition="status.changed() and status == 'cancelled'", actions=[...])def on_order_cancelled(): """ Triggered when an order is cancelled.
Actions: - Notify warehouse to stop processing - Send cancellation email to customer - Update analytics dashboard
Business rule: Cancellation is only allowed within 24 hours. """ passVerify It Works
Section titled “Verify It Works”-
Define an observer in your schema:
@observer(entity="User",event="INSERT",actions=[webhook("http://localhost:3001/webhook")])def on_user_signup():"""Triggered when a new user signs up."""pass -
Start a test webhook server (in another terminal):
Terminal window # Using Python for quick testingpython3 -m http.server 3001 & -
Create a user to trigger the observer:
Terminal window curl -X POST http://localhost:8080/graphql \-H "Content-Type: application/json" \-H "Authorization: Bearer $TOKEN" \-d '{"query": "mutation { createUser(input: { name: \"Test User\", email: \"test@example.com\" }) { id } }"}' -
Check webhook was received:
Terminal window # The webhook payload should appear in the http.server output:# {"event": "INSERT", "table": "tb_user", "timestamp": "2024-01-15T10:30:00Z", "data": {"new": {...}}} -
Check observer logs (if
LOG_LEVEL=debugis set):[observer] on_user_signup fired: User{id=usr_abc123, name="Test User"}[observer] Sending webhook to http://localhost:3001/webhook[observer] Webhook delivered (status=200) -
Test with conditions:
@observer(entity="Order",event="INSERT",condition="total > 100",actions=[slack("#sales", "High-value order: {id}")])def on_high_value_order():passCreate a small order (should NOT trigger):
Terminal window curl -X POST http://localhost:8080/graphql \-d '{"query": "mutation { createOrder(input: { total: 50 }) { id } }"}'Create a large order (SHOULD trigger):
Terminal window curl -X POST http://localhost:8080/graphql \-d '{"query": "mutation { createOrder(input: { total: 150 }) { id } }"}'
Troubleshooting
Section titled “Troubleshooting”Observer Not Firing
Section titled “Observer Not Firing”-
Check entity name matches the table name (case-sensitive):
entity="Order" # Matches tb_order -
Verify the mutation actually modifies the table:
- Observers only fire on INSERT/UPDATE/DELETE
- Read queries (SELECT) do not trigger observers
-
Check condition syntax:
# Validcondition="total > 100"# Invalid (no spaces around operator)condition="total>100" -
Enable debug logging:
Terminal window LOG_LEVEL=debug fraiseql run
Webhook Not Received
Section titled “Webhook Not Received”-
Test the webhook endpoint directly:
Terminal window curl -X POST http://your-webhook-url \-H "Content-Type: application/json" \-d '{"test": true}' -
Check network connectivity from FraiseQL server:
Terminal window # On the FraiseQL servercurl -v http://your-webhook-url -
Review retry logs:
[observer] Webhook failed: connection timeout (attempt 1/3)[observer] Retrying in 200ms
Performance Issues
Section titled “Performance Issues”If observers slow down mutations:
-
Use async execution (default):
@observer(entity="Order",event="INSERT",sync=False # Fire-and-forget) -
Reduce action count per observer
-
Move heavy processing to background jobs via NATS
Next Steps
Section titled “Next Steps”NATS Integration
Security