Observers
Observers — Event-driven webhooks
FraiseQL provides both outgoing webhooks (via Observers) and incoming webhook verification for secure integrations.
Send webhooks when data changes using the Observer system.
from fraiseql import observer, webhook
@observer( entity="Order", event="INSERT", actions=[ webhook("https://api.example.com/orders/new") ])def on_new_order(): passwebhook( "https://api.example.com/orders", headers={ "Authorization": "Bearer ${WEBHOOK_API_KEY}", "X-Source": "fraiseql" })webhook( "https://api.example.com/orders", body_template=''' { "event": "order.created", "order_id": "{{id}}", "total": {{total}}, "customer": "{{customer_email}}", "timestamp": "{{_timestamp}}" } ''')webhook(url_env="ORDER_WEBHOOK_URL")FraiseQL signs all outgoing webhooks with HMAC-SHA256.
[webhooks]signing_enabled = truesigning_secret = "${WEBHOOK_SIGNING_SECRET}"signing_algorithm = "sha256" # or "sha512"
# Headers for signaturesignature_header = "X-FraiseQL-Signature"timestamp_header = "X-FraiseQL-Timestamp"X-FraiseQL-Signature: sha256=abc123def456...X-FraiseQL-Timestamp: 1704067200Signature computed as:
HMAC-SHA256(secret, timestamp + "." + body)import hmacimport hashlibimport time
def verify_webhook( payload: bytes, signature: str, timestamp: str, secret: str, max_age_seconds: int = 300) -> bool: # Check timestamp freshness ts = int(timestamp) if abs(time.time() - ts) > max_age_seconds: return False # Replay attack protection
# Compute expected signature message = f"{timestamp}.".encode() + payload expected = "sha256=" + hmac.new( secret.encode(), message, hashlib.sha256 ).hexdigest()
# Constant-time comparison return hmac.compare_digest(signature, expected)
# Usage in Flask@app.route("/webhook", methods=["POST"])def handle_webhook(): if not verify_webhook( request.data, request.headers.get("X-FraiseQL-Signature"), request.headers.get("X-FraiseQL-Timestamp"), WEBHOOK_SECRET ): return "Invalid signature", 401
data = request.json # Process webhook... return "OK", 200const crypto = require('crypto');
function verifyWebhook(payload, signature, timestamp, secret, maxAge = 300) { // Check timestamp freshness const ts = parseInt(timestamp); if (Math.abs(Date.now() / 1000 - ts) > maxAge) { return false; }
// Compute expected signature const message = `${timestamp}.${payload}`; const expected = 'sha256=' + crypto .createHmac('sha256', secret) .update(message) .digest('hex');
// Constant-time comparison return crypto.timingSafeEqual( Buffer.from(signature), Buffer.from(expected) );}
// Express middlewareapp.post('/webhook', express.raw({ type: 'application/json' }), (req, res) => { const valid = verifyWebhook( req.body.toString(), req.headers['x-fraiseql-signature'], req.headers['x-fraiseql-timestamp'], process.env.WEBHOOK_SECRET );
if (!valid) { return res.status(401).send('Invalid signature'); }
const data = JSON.parse(req.body); // Process webhook... res.send('OK');});package main
import ( "crypto/hmac" "crypto/sha256" "encoding/hex" "fmt" "math" "strconv" "time")
func verifyWebhook(payload []byte, signature, timestamp, secret string, maxAge int64) bool { // Check timestamp freshness ts, err := strconv.ParseInt(timestamp, 10, 64) if err != nil { return false } if math.Abs(float64(time.Now().Unix()-ts)) > float64(maxAge) { return false }
// Compute expected signature message := fmt.Sprintf("%s.%s", timestamp, string(payload)) mac := hmac.New(sha256.New, []byte(secret)) mac.Write([]byte(message)) expected := "sha256=" + hex.EncodeToString(mac.Sum(nil))
// Constant-time comparison return hmac.Equal([]byte(signature), []byte(expected))}Receive webhooks from external services with verification.
[webhooks.incoming.github]enabled = truepath = "/webhooks/github"secret = "${GITHUB_WEBHOOK_SECRET}"signature_header = "X-Hub-Signature-256"events = ["push", "pull_request", "issues"][webhooks.incoming.stripe]enabled = truepath = "/webhooks/stripe"secret = "${STRIPE_WEBHOOK_SECRET}"signature_header = "Stripe-Signature"tolerance_seconds = 300[webhooks.incoming.custom]enabled = truepath = "/webhooks/custom"secret = "${CUSTOM_WEBHOOK_SECRET}"signature_header = "X-Signature"signature_format = "sha256={signature}"timestamp_header = "X-Timestamp"[webhooks.retry]enabled = truemax_attempts = 5initial_delay_ms = 1000max_delay_ms = 60000backoff_multiplier = 2.0
# Retry on these status codesretry_status_codes = [408, 429, 500, 502, 503, 504][webhooks.dlq]enabled = truestorage = "postgres" # or "redis"retention_days = 7
[webhooks.dlq.postgres]table_name = "tb_webhook_dlq"Inspect failed webhooks:
# List failed webhooksfraiseql-cli webhook dlq list
# Retry a specific webhookfraiseql-cli webhook dlq retry --id "webhook-123"
# Retry all failedfraiseql-cli webhook dlq retry-all --older-than 1h[webhooks]# Connection timeoutconnect_timeout_ms = 5000
# Request timeoutrequest_timeout_ms = 30000
# Max payload sizemax_payload_bytes = 1048576 # 1MB
# Max concurrent requestsmax_concurrent = 100Available in webhook body templates:
| Variable | Description |
|---|---|
{{field_name}} | Entity field value |
{{_id}} | Entity ID |
{{_timestamp}} | Event timestamp (ISO 8601) |
{{_event}} | Event type (INSERT, UPDATE, DELETE) |
{{_entity}} | Entity type name |
{{_json}} | Entire entity as JSON |
{{_old}} | Previous values (UPDATE only) |
{{_changed}} | Changed fields (UPDATE only) |
webhook( "https://api.example.com/events", body_template=''' { "event_type": "{{_entity}}.{{_event}}", "entity_id": "{{_id}}", "timestamp": "{{_timestamp}}", "data": {{_json}}, "changes": {{_changed}} } ''')| Metric | Description |
|---|---|
fraiseql_webhook_requests_total | Total webhook requests |
fraiseql_webhook_success_total | Successful deliveries |
fraiseql_webhook_failure_total | Failed deliveries |
fraiseql_webhook_retry_total | Retry attempts |
fraiseql_webhook_latency_ms | Delivery latency |
fraiseql_webhook_dlq_size | Dead letter queue size |
Include idempotency key for safe retries:
webhook( "https://api.example.com/orders", headers={ "Idempotency-Key": "{{_entity}}-{{_id}}-{{_timestamp}}" })Always verify incoming webhook signatures:
if not verify_webhook(payload, signature, timestamp, secret): raise HTTPException(401, "Invalid signature")Expect duplicate deliveries:
@app.post("/webhook")def handle_webhook(data: dict): # Use idempotency key to prevent duplicate processing key = request.headers.get("Idempotency-Key") if already_processed(key): return {"status": "already_processed"}
process_webhook(data) mark_processed(key) return {"status": "ok"}# Webhook success ratesum(rate(fraiseql_webhook_success_total[5m])) /sum(rate(fraiseql_webhook_requests_total[5m]))
# Alert on DLQ growthfraiseql_webhook_dlq_size > 100Observers
Observers — Event-driven webhooks
Security
Security — Webhook authentication
Rate Limiting
Rate Limiting — Protect webhook endpoints