Observers
Observers are functions that run after a mutation completes successfully. The mutation commits to the database first, then your observers fire. This means observers cannot roll back the write — they are for side effects: sending emails, updating projection tables, publishing events, calling webhooks, writing audit logs.
This guide builds directly on the blog API from Your First API — users, posts, and comments.
How Observers Fit In
Section titled “How Observers Fit In”When a createUser mutation runs:
- FraiseQL executes your SQL function inside a transaction
- The transaction commits
- The result (a
Userobject) is passed to every registered observer forcreateUser - The GraphQL response returns to the client — it does not wait for observers to finish
Observers receive the same object returned to the client. They have access to the request context, which carries your database connection, auth token, and any injected services.
Step 1: Your First Observer — Welcome Email on createUser
Section titled “Step 1: Your First Observer — Welcome Email on createUser”Register an observer that sends a welcome email after a user account is created.
import fraiseqlfrom fraiseql import Contextfrom schema import User
@fraiseql.observer("createUser")async def send_welcome_email(result: User, context: Context) -> None: await context.email_client.send( to=result.email, subject="Welcome to the blog", body=f"Hi {result.username}, your account is ready.", )import fraiseql, { type Context } from 'fraiseql';import { type User } from './schema';
fraiseql.observe('createUser', async (result: User, context: Context) => { await context.emailClient.send({ to: result.email, subject: 'Welcome to the blog', body: `Hi ${result.username}, your account is ready.`, });});package observers
import ( "context" "fmt"
"github.com/fraiseql/fraiseql-go" "your-module/schema" "your-module/email")
func Register(app *fraiseql.App) { fraiseql.Observe(app, "createUser", func(ctx context.Context, result schema.User) error { return email.Send(ctx, email.Message{ To: result.Email, Subject: "Welcome to the blog", Body: fmt.Sprintf("Hi %s, your account is ready.", result.Username), }) })}Register your observers at startup, after the app is configured:
import fraiseqlimport observers # registers all observers on import
app = fraiseql.create_app(config="fraiseql.toml")import fraiseql from 'fraiseql';import './observers'; // registers all observers on import
const app = fraiseql.createApp({ config: 'fraiseql.toml' });package main
import ( "github.com/fraiseql/fraiseql-go" "your-module/observers")
func main() { app := fraiseql.NewApp("fraiseql.toml") observers.Register(app) app.Serve()}Step 2: Observer on createComment — Notify the Post Author
Section titled “Step 2: Observer on createComment — Notify the Post Author”When someone comments on a post, the post author should receive a notification. The observer receives the new Comment — it can use the context to look up the post author.
@fraiseql.observer("createComment")async def notify_post_author(result: Comment, context: Context) -> None: # The comment result embeds the post and its author # (as shaped by v_comment joining v_post and v_user) post_author = result.post.author
# Don't notify if the commenter is the post author if post_author.id == result.author.id: return
await context.email_client.send( to=post_author.email, subject=f"New comment on \"{result.post.title}\"", body=( f"{result.author.username} commented: " f"\"{result.content[:100]}\"" ), )fraiseql.observe('createComment', async (result: Comment, context: Context) => { // The comment result embeds the post and its author const postAuthor = result.post.author;
// Don't notify if the commenter is the post author if (postAuthor.id === result.author.id) return;
await context.emailClient.send({ to: postAuthor.email, subject: `New comment on "${result.post.title}"`, body: `${result.author.username} commented: "${result.content.slice(0, 100)}"`, });});fraiseql.Observe(app, "createComment", func(ctx context.Context, result schema.Comment) error { // The comment result embeds the post and its author postAuthor := result.Post.Author
// Don't notify if the commenter is the post author if postAuthor.ID == result.Author.ID { return nil }
return emailClient.Send(ctx, email.Message{ To: postAuthor.Email, Subject: fmt.Sprintf("New comment on %q", result.Post.Title), Body: fmt.Sprintf( "%s commented: %q", result.Author.Username, truncate(result.Content, 100), ), })})The observer uses data already present on the result object — no extra database query needed, because v_comment joins through to v_post and v_user.
If you need data that is not on the result, the context provides a database connection:
@fraiseql.observer("createComment")async def notify_post_author(result: Comment, context: Context) -> None: # Fetch additional data via the context db connection row = await context.db.fetchrow( "SELECT email FROM tb_user WHERE pk_user = $1", result.post.fk_author, ) await context.email_client.send(to=row["email"], ...)fraiseql.observe('createComment', async (result: Comment, context: Context) => { const row = await context.db.queryOne( 'SELECT email FROM tb_user WHERE pk_user = $1', [result.post.fkAuthor], ); await context.emailClient.send({ to: row.email, ... });});fraiseql.Observe(app, "createComment", func(ctx context.Context, result schema.Comment) error { var authorEmail string err := db.QueryRowContext(ctx, "SELECT email FROM tb_user WHERE pk_user = $1", result.Post.FkAuthor, ).Scan(&authorEmail) if err != nil { return err } return emailClient.Send(ctx, email.Message{To: authorEmail})})Step 3: Observer on publishPost — Update a Projection Table
Section titled “Step 3: Observer on publishPost — Update a Projection Table”Observers are the natural place to keep projection tables in sync. The Projection Tables guide introduces tv_post_stats. When a post is published, you can update its published_at timestamp from an observer.
from datetime import datetime, timezone
@fraiseql.observer("publishPost")async def update_post_stats_on_publish(result: Post, context: Context) -> None: await context.db.execute( """ UPDATE tv_post_stats SET published_at = $1 WHERE post_id = $2 """, datetime.now(timezone.utc), result.id, )fraiseql.observe('publishPost', async (result: Post, context: Context) => { await context.db.query( `UPDATE tv_post_stats SET published_at = $1 WHERE post_id = $2`, [new Date(), result.id], );});fraiseql.Observe(app, "publishPost", func(ctx context.Context, result schema.Post) error { _, err := db.ExecContext(ctx, `UPDATE tv_post_stats SET published_at = $1 WHERE post_id = $2`, time.Now().UTC(), result.ID, ) return err})This is the connection between observers and projection tables: mutations write to the canonical tables, observers keep your read models current.
Step 4: Error Handling in Observers
Section titled “Step 4: Error Handling in Observers”An observer failure does not roll back the mutation. The write has already committed. Your observer must handle its own errors — failing silently is worse than failing loudly.
Log errors, do not re-raise
Section titled “Log errors, do not re-raise”import logging
logger = logging.getLogger(__name__)
@fraiseql.observer("createUser")async def send_welcome_email(result: User, context: Context) -> None: try: await context.email_client.send( to=result.email, subject="Welcome to the blog", body=f"Hi {result.username}, your account is ready.", ) except Exception: logger.exception( "Failed to send welcome email to user %s", result.id ) # Do not re-raise — observer failure must not affect the mutation responsefraiseql.observe('createUser', async (result: User, context: Context) => { try { await context.emailClient.send({ to: result.email, subject: 'Welcome to the blog', body: `Hi ${result.username}, your account is ready.`, }); } catch (err) { context.logger.error('Failed to send welcome email', { userId: result.id, error: err, }); // Do not re-throw — observer failure must not affect the mutation response }});fraiseql.Observe(app, "createUser", func(ctx context.Context, result schema.User) error { err := emailClient.Send(ctx, email.Message{ To: result.Email, Subject: "Welcome to the blog", Body: fmt.Sprintf("Hi %s, your account is ready.", result.Username), }) if err != nil { // Log and return nil — observer error must not surface to the client slog.ErrorContext(ctx, "failed to send welcome email", "user_id", result.ID, "error", err, ) return nil } return nil})Dead-letter queue for critical observers
Section titled “Dead-letter queue for critical observers”For observers that must not lose events (billing, compliance, audit), push failures to a dead-letter queue rather than discarding them:
@fraiseql.observer("createUser")async def record_signup_event(result: User, context: Context) -> None: try: await context.event_bus.publish("user.created", { "user_id": result.id, "email": result.email, "created_at": result.created_at.isoformat(), }) except Exception: logger.exception("Failed to publish user.created event for %s", result.id) # Push to dead-letter queue for later replay await context.dlq.enqueue("user.created", { "user_id": result.id, "email": result.email, })fraiseql.observe('createUser', async (result: User, context: Context) => { try { await context.eventBus.publish('user.created', { userId: result.id, email: result.email, createdAt: result.createdAt, }); } catch (err) { context.logger.error('Failed to publish user.created', { userId: result.id, err }); // Push to dead-letter queue for later replay await context.dlq.enqueue('user.created', { userId: result.id, email: result.email, }); }});fraiseql.Observe(app, "createUser", func(ctx context.Context, result schema.User) error { err := eventBus.Publish(ctx, "user.created", map[string]any{ "user_id": result.ID, "email": result.Email, "created_at": result.CreatedAt, }) if err != nil { slog.ErrorContext(ctx, "failed to publish user.created", "user_id", result.ID, "error", err) // Push to dead-letter queue for later replay _ = dlq.Enqueue(ctx, "user.created", result.ID) } return nil})Retry configuration
Section titled “Retry configuration”FraiseQL supports configuring retries on observers for transient failures:
from fraiseql import RetryConfig
@fraiseql.observer( "createUser", retry=RetryConfig( max_attempts=3, backoff_strategy="exponential", initial_delay_ms=200, ),)async def send_welcome_email(result: User, context: Context) -> None: await context.email_client.send( to=result.email, subject="Welcome to the blog", body=f"Hi {result.username}, your account is ready.", )fraiseql.observe( 'createUser', async (result: User, context: Context) => { await context.emailClient.send({ to: result.email, subject: 'Welcome to the blog', body: `Hi ${result.username}, your account is ready.`, }); }, { retry: { maxAttempts: 3, backoffStrategy: 'exponential', initialDelayMs: 200, }, },);fraiseql.Observe(app, "createUser", sendWelcomeEmail, fraiseql.WithRetry(fraiseql.RetryConfig{ MaxAttempts: 3, BackoffStrategy: fraiseql.BackoffExponential, InitialDelayMs: 200, }),)Retries are appropriate for idempotent observers — sending the same welcome email twice is acceptable. For non-idempotent actions (e.g. charging a card), set max_attempts=1 or handle idempotency in the action itself.
Step 5: Multiple Observers on One Mutation
Section titled “Step 5: Multiple Observers on One Mutation”You can register multiple observers on the same mutation. They run sequentially in registration order.
# Observer 1 — runs first@fraiseql.observer("createUser")async def send_welcome_email(result: User, context: Context) -> None: try: await context.email_client.send( to=result.email, subject="Welcome to the blog", body=f"Hi {result.username}, your account is ready.", ) except Exception: logger.exception("Failed to send welcome email to %s", result.id)
# Observer 2 — runs second@fraiseql.observer("createUser")async def record_signup_event(result: User, context: Context) -> None: try: await context.event_bus.publish("user.created", { "user_id": result.id, "username": result.username, }) except Exception: logger.exception("Failed to publish user.created for %s", result.id)// Observer 1 — runs firstfraiseql.observe('createUser', async (result: User, context: Context) => { try { await context.emailClient.send({ to: result.email, subject: 'Welcome to the blog', body: `Hi ${result.username}, your account is ready.`, }); } catch (err) { context.logger.error('Failed to send welcome email', { userId: result.id, err }); }});
// Observer 2 — runs secondfraiseql.observe('createUser', async (result: User, context: Context) => { try { await context.eventBus.publish('user.created', { userId: result.id, username: result.username, }); } catch (err) { context.logger.error('Failed to publish user.created', { userId: result.id, err }); }});func Register(app *fraiseql.App) { // Observer 1 — runs first fraiseql.Observe(app, "createUser", func(ctx context.Context, result schema.User) error { if err := sendWelcomeEmail(ctx, result); err != nil { slog.ErrorContext(ctx, "welcome email failed", "user_id", result.ID, "error", err) } return nil })
// Observer 2 — runs second fraiseql.Observe(app, "createUser", func(ctx context.Context, result schema.User) error { if err := publishSignupEvent(ctx, result); err != nil { slog.ErrorContext(ctx, "signup event failed", "user_id", result.ID, "error", err) } return nil })}Each observer catches its own errors independently. A failure in observer 1 does not prevent observer 2 from running.
Observers vs Database Triggers
Section titled “Observers vs Database Triggers”Both observers and database triggers react to data changes. They serve different purposes:
| Observer | DB Trigger | |
|---|---|---|
| Runs | After commit, outside transaction | Inside transaction |
| Can call external services | Yes | No |
| Rolls back on failure | No | Yes |
| Access to context / auth | Yes | No |
| Can read application state | Yes | No |
| Visible in application code | Yes | Only as SQL |
Use a database trigger when the action must be atomic with the write — for example, enforcing a derived column or maintaining a denormalized counter that must never be out of sync.
Use an observer when the action involves the outside world — sending an email, calling a webhook, updating a projection table, publishing an event. These actions cannot participate in the database transaction anyway, so placing them in observers keeps responsibility clear.
Webhook Payload Format
Section titled “Webhook Payload Format”When an observer calls context.webhook_client.send() or when FraiseQL dispatches a built-in webhook action, the payload it sends to the external endpoint looks like this:
{ "event": "createUser", "timestamp": "2024-03-15T14:22:05.123Z", "payload": { "id": "usr_01HV3KQBN7MXSZQZQR4F5P0G2Y", "username": "alice", "email": "alice@example.com", "created_at": "2024-03-15T14:22:04.987Z" }, "meta": { "fraiseql_version": "0.9.1", "request_id": "req_01HV3KQBN7MXSZQZQR4F5P0G2Y" }}The payload field contains the full object returned by the mutation — the same shape your GraphQL client receives. You can use any field in your webhook endpoint logic.
Debugging Observers
Section titled “Debugging Observers”Run FraiseQL with debug logging to see observer dispatch in real time:
LOG_LEVEL=debug fraiseql runWith debug logging enabled, every observer invocation prints to stdout:
[observer] createUser fired → send_welcome_email[observer] send_welcome_email: dispatching email to alice@example.com[observer] send_welcome_email: completed in 142ms[observer] createUser fired → record_signup_event[observer] record_signup_event: publishing user.created to event bus[observer] record_signup_event: completed in 18msIf an observer fails, the log includes the full stack trace and the retry schedule:
[observer] send_welcome_email: attempt 1/3 failed — ConnectionRefusedError[observer] send_welcome_email: retrying in 200ms (exponential backoff)[observer] send_welcome_email: attempt 2/3 succeededTo trace a specific mutation end-to-end, filter by the request ID that FraiseQL prints at query start:
LOG_LEVEL=debug fraiseql run 2>&1 | grep "req_01HV3KQBN7"What’s Next
Section titled “What’s Next”- Projection Tables — Keep read models up to date
- Advanced Patterns — Audit trails, versioning, soft deletes
- Webhooks — Push events to external systems