Skip to content

Observers

Observers are functions that run after a mutation completes successfully. The mutation commits to the database first, then your observers fire. This means observers cannot roll back the write — they are for side effects: sending emails, updating projection tables, publishing events, calling webhooks, writing audit logs.

This guide builds directly on the blog API from Your First API — users, posts, and comments.

When a createUser mutation runs:

  1. FraiseQL executes your SQL function inside a transaction
  2. The transaction commits
  3. The result (a User object) is passed to every registered observer for createUser
  4. The GraphQL response returns to the client — it does not wait for observers to finish

Observers receive the same object returned to the client. They have access to the request context, which carries your database connection, auth token, and any injected services.

Step 1: Your First Observer — Welcome Email on createUser

Section titled “Step 1: Your First Observer — Welcome Email on createUser”

Register an observer that sends a welcome email after a user account is created.

observers.py
import fraiseql
from fraiseql import Context
from schema import User
@fraiseql.observer("createUser")
async def send_welcome_email(result: User, context: Context) -> None:
await context.email_client.send(
to=result.email,
subject="Welcome to the blog",
body=f"Hi {result.username}, your account is ready.",
)

Register your observers at startup, after the app is configured:

main.py
import fraiseql
import observers # registers all observers on import
app = fraiseql.create_app(config="fraiseql.toml")

Step 2: Observer on createComment — Notify the Post Author

Section titled “Step 2: Observer on createComment — Notify the Post Author”

When someone comments on a post, the post author should receive a notification. The observer receives the new Comment — it can use the context to look up the post author.

observers.py
@fraiseql.observer("createComment")
async def notify_post_author(result: Comment, context: Context) -> None:
# The comment result embeds the post and its author
# (as shaped by v_comment joining v_post and v_user)
post_author = result.post.author
# Don't notify if the commenter is the post author
if post_author.id == result.author.id:
return
await context.email_client.send(
to=post_author.email,
subject=f"New comment on \"{result.post.title}\"",
body=(
f"{result.author.username} commented: "
f"\"{result.content[:100]}\""
),
)

The observer uses data already present on the result object — no extra database query needed, because v_comment joins through to v_post and v_user.

If you need data that is not on the result, the context provides a database connection:

observers.py
@fraiseql.observer("createComment")
async def notify_post_author(result: Comment, context: Context) -> None:
# Fetch additional data via the context db connection
row = await context.db.fetchrow(
"SELECT email FROM tb_user WHERE pk_user = $1",
result.post.fk_author,
)
await context.email_client.send(to=row["email"], ...)

Step 3: Observer on publishPost — Update a Projection Table

Section titled “Step 3: Observer on publishPost — Update a Projection Table”

Observers are the natural place to keep projection tables in sync. The Projection Tables guide introduces tv_post_stats. When a post is published, you can update its published_at timestamp from an observer.

observers.py
from datetime import datetime, timezone
@fraiseql.observer("publishPost")
async def update_post_stats_on_publish(result: Post, context: Context) -> None:
await context.db.execute(
"""
UPDATE tv_post_stats
SET published_at = $1
WHERE post_id = $2
""",
datetime.now(timezone.utc),
result.id,
)

This is the connection between observers and projection tables: mutations write to the canonical tables, observers keep your read models current.

An observer failure does not roll back the mutation. The write has already committed. Your observer must handle its own errors — failing silently is worse than failing loudly.

observers.py
import logging
logger = logging.getLogger(__name__)
@fraiseql.observer("createUser")
async def send_welcome_email(result: User, context: Context) -> None:
try:
await context.email_client.send(
to=result.email,
subject="Welcome to the blog",
body=f"Hi {result.username}, your account is ready.",
)
except Exception:
logger.exception(
"Failed to send welcome email to user %s", result.id
)
# Do not re-raise — observer failure must not affect the mutation response

For observers that must not lose events (billing, compliance, audit), push failures to a dead-letter queue rather than discarding them:

observers.py
@fraiseql.observer("createUser")
async def record_signup_event(result: User, context: Context) -> None:
try:
await context.event_bus.publish("user.created", {
"user_id": result.id,
"email": result.email,
"created_at": result.created_at.isoformat(),
})
except Exception:
logger.exception("Failed to publish user.created event for %s", result.id)
# Push to dead-letter queue for later replay
await context.dlq.enqueue("user.created", {
"user_id": result.id,
"email": result.email,
})

FraiseQL supports configuring retries on observers for transient failures:

observers.py
from fraiseql import RetryConfig
@fraiseql.observer(
"createUser",
retry=RetryConfig(
max_attempts=3,
backoff_strategy="exponential",
initial_delay_ms=200,
),
)
async def send_welcome_email(result: User, context: Context) -> None:
await context.email_client.send(
to=result.email,
subject="Welcome to the blog",
body=f"Hi {result.username}, your account is ready.",
)

Retries are appropriate for idempotent observers — sending the same welcome email twice is acceptable. For non-idempotent actions (e.g. charging a card), set max_attempts=1 or handle idempotency in the action itself.

Step 5: Multiple Observers on One Mutation

Section titled “Step 5: Multiple Observers on One Mutation”

You can register multiple observers on the same mutation. They run sequentially in registration order.

observers.py
# Observer 1 — runs first
@fraiseql.observer("createUser")
async def send_welcome_email(result: User, context: Context) -> None:
try:
await context.email_client.send(
to=result.email,
subject="Welcome to the blog",
body=f"Hi {result.username}, your account is ready.",
)
except Exception:
logger.exception("Failed to send welcome email to %s", result.id)
# Observer 2 — runs second
@fraiseql.observer("createUser")
async def record_signup_event(result: User, context: Context) -> None:
try:
await context.event_bus.publish("user.created", {
"user_id": result.id,
"username": result.username,
})
except Exception:
logger.exception("Failed to publish user.created for %s", result.id)

Each observer catches its own errors independently. A failure in observer 1 does not prevent observer 2 from running.

Both observers and database triggers react to data changes. They serve different purposes:

ObserverDB Trigger
RunsAfter commit, outside transactionInside transaction
Can call external servicesYesNo
Rolls back on failureNoYes
Access to context / authYesNo
Can read application stateYesNo
Visible in application codeYesOnly as SQL

Use a database trigger when the action must be atomic with the write — for example, enforcing a derived column or maintaining a denormalized counter that must never be out of sync.

Use an observer when the action involves the outside world — sending an email, calling a webhook, updating a projection table, publishing an event. These actions cannot participate in the database transaction anyway, so placing them in observers keeps responsibility clear.

When an observer calls context.webhook_client.send() or when FraiseQL dispatches a built-in webhook action, the payload it sends to the external endpoint looks like this:

{
"event": "createUser",
"timestamp": "2024-03-15T14:22:05.123Z",
"payload": {
"id": "usr_01HV3KQBN7MXSZQZQR4F5P0G2Y",
"username": "alice",
"email": "alice@example.com",
"created_at": "2024-03-15T14:22:04.987Z"
},
"meta": {
"fraiseql_version": "0.9.1",
"request_id": "req_01HV3KQBN7MXSZQZQR4F5P0G2Y"
}
}

The payload field contains the full object returned by the mutation — the same shape your GraphQL client receives. You can use any field in your webhook endpoint logic.

Run FraiseQL with debug logging to see observer dispatch in real time:

Terminal window
LOG_LEVEL=debug fraiseql run

With debug logging enabled, every observer invocation prints to stdout:

[observer] createUser fired → send_welcome_email
[observer] send_welcome_email: dispatching email to alice@example.com
[observer] send_welcome_email: completed in 142ms
[observer] createUser fired → record_signup_event
[observer] record_signup_event: publishing user.created to event bus
[observer] record_signup_event: completed in 18ms

If an observer fails, the log includes the full stack trace and the retry schedule:

[observer] send_welcome_email: attempt 1/3 failed — ConnectionRefusedError
[observer] send_welcome_email: retrying in 200ms (exponential backoff)
[observer] send_welcome_email: attempt 2/3 succeeded

To trace a specific mutation end-to-end, filter by the request ID that FraiseQL prints at query start:

Terminal window
LOG_LEVEL=debug fraiseql run 2>&1 | grep "req_01HV3KQBN7"