Advanced Federation
Multi-DB patterns for analytics pipelines.
A real-time analytics platform combining data federation, NATS event streaming, and custom resolvers for live dashboards.
Event Sources (Web, Mobile, Server) | NATS Event Bus | +----+----+ | |Raw Events Aggregates DB DB | | +----+----+ | FraiseQL API (Federation + Custom Resolvers) | Dashboards-- Raw events (immutable log)CREATE TABLE tb_pageview_event ( pk_event SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, user_id UUID NOT NULL, session_id UUID NOT NULL, page_url TEXT NOT NULL, referrer TEXT, timestamp TIMESTAMPTZ NOT NULL DEFAULT now(), properties JSONB NOT NULL DEFAULT '{}');
-- Pre-computed aggregatesCREATE TABLE tb_pageview_aggregate ( pk_aggregate SERIAL PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, page_url TEXT NOT NULL, date DATE NOT NULL, view_count INT NOT NULL DEFAULT 0, unique_visitors INT NOT NULL DEFAULT 0, bounce_rate FLOAT NOT NULL DEFAULT 0.0, avg_session_duration FLOAT NOT NULL DEFAULT 0.0, computed_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE(page_url, date));
-- Views with JSONB data columnCREATE VIEW v_pageview_event ASSELECT e.id, jsonb_build_object( 'id', e.id::text, 'user_id', e.user_id::text, 'session_id', e.session_id::text, 'page_url', e.page_url, 'referrer', e.referrer, 'timestamp', e.timestamp, 'properties', e.properties ) AS dataFROM tb_pageview_event e;
CREATE VIEW v_pageview_aggregate ASSELECT a.id, jsonb_build_object( 'id', a.id::text, 'page_url', a.page_url, 'date', a.date, 'view_count', a.view_count, 'unique_visitors', a.unique_visitors, 'bounce_rate', a.bounce_rate, 'avg_session_duration', a.avg_session_duration, 'computed_at', a.computed_at ) AS dataFROM tb_pageview_aggregate a;import fraiseqlfrom datetime import datetimefrom typing import Optional
@fraiseql.type(database="raw_events")class PageViewEvent: """Raw immutable page view event.""" id: fraiseql.ID user_id: fraiseql.ID session_id: fraiseql.ID page_url: str referrer: Optional[str] timestamp: datetime properties: dict
@fraiseql.type(database="aggregates")class PageViewAggregate: """Pre-computed page view metrics.""" id: fraiseql.ID page_url: str date: str # YYYY-MM-DD view_count: int unique_visitors: int bounce_rate: float avg_session_duration: float computed_at: datetime@fraiseql.nats.subscribe( subject="events.pageview", consumer_group="analytics_ingestion", max_concurrent=100 # High concurrency for real-time)async def ingest_pageview(message): """ Ingest page view events. Store raw for long-term analysis. Update aggregates for dashboards. """ event = message.data
try: # Store raw event (immutable) await ctx.db.execute( """ INSERT INTO tb_pageview_event (user_id, session_id, page_url, referrer, timestamp, properties) VALUES ($1, $2, $3, $4, $5, $6::jsonb) """, [ event["user_id"], event["session_id"], event["page_url"], event.get("referrer"), event["timestamp"], json.dumps(event.get("properties", {})) ] )
# Upsert real-time aggregate today = datetime.now().strftime("%Y-%m-%d") await ctx.db.execute( """ INSERT INTO tb_pageview_aggregate (page_url, date, view_count, unique_visitors, computed_at) VALUES ($1, $2, 1, 1, NOW()) ON CONFLICT (page_url, date) DO UPDATE SET view_count = tb_pageview_aggregate.view_count + 1, computed_at = NOW() """, [event["page_url"], today] )
# Publish enriched event for other consumers await fraiseql.nats.publish( subject="analytics.pageview.processed", data=event )
await message.ack()
except Exception as e: await log_error({"event": event, "error": str(e)}) await message.nak(timeout=5000)@fraiseql.type(database="aggregates")class PageAnalytics: """Analytics for a specific page.""" page_url: str date: str
# Federated data from raw events raw_events: list[PageViewEvent] = fraiseql.federated( database="raw_events", lookup="page_url" )
# Custom computed metrics @fraiseql.field_resolver async def bounce_rate(self, ctx) -> float: """ Bounce rate: % of sessions with single pageview. Computed from raw events. """ sessions = await ctx.db.query( """SELECT DISTINCT session_id FROM tb_pageview_event WHERE page_url = $1 AND DATE(timestamp) = $2""", [self.page_url, self.date] )
if not sessions: return 0.0
single_view_sessions = await ctx.db.query_one( """SELECT COUNT(DISTINCT session_id) as count FROM ( SELECT session_id FROM tb_pageview_event WHERE page_url = $1 AND DATE(timestamp) = $2 GROUP BY session_id HAVING COUNT(*) = 1 ) sub""", [self.page_url, self.date] )
return (single_view_sessions["count"] / len(sessions)) * 100
@fraiseql.field_resolver async def avg_session_duration(self, ctx) -> float: """Average session duration in seconds.""" result = await ctx.db.query_one( """SELECT AVG(EXTRACT(EPOCH FROM (MAX(timestamp) - MIN(timestamp)))) as avg_duration FROM tb_pageview_event WHERE page_url = $1 AND DATE(timestamp) = $2 GROUP BY session_id""", [self.page_url, self.date] ) return result["avg_duration"] or 0.0
@fraiseql.field_resolver async def conversion_funnel(self, ctx) -> dict: """Analyze conversion funnel for this page.""" funnel_steps = await ctx.db.query( """SELECT step_number, COUNT(DISTINCT user_id) as users FROM tb_conversion_funnel WHERE landing_page = $1 AND DATE(timestamp) = $2 GROUP BY step_number ORDER BY step_number""", [self.page_url, self.date] )
result = {} prev_count = None for step in funnel_steps: result[f"step_{step['step_number']}"] = { "users": step["users"], "conversion_rate": ( (step["users"] / prev_count * 100) if prev_count else 100 ) } prev_count = step["users"]
return result
# Root queries for dashboards@fraiseql.query(sql_source="v_pageview_aggregate", cache_ttl=60)async def page_analytics( page_url: str, date: str) -> PageAnalytics: """Get live analytics for a specific page.""" pass
@fraiseql.query(sql_source="v_pageview_aggregate", cache_ttl=300)async def top_pages(limit: int = 50) -> list[PageAnalytics]: """Get top pages by view count (today's stats).""" pass@fraiseql.subscriptionasync def live_metrics(ctx) -> dict: """ Real-time metrics subscription. Updates every second with latest stats. """ @fraiseql.nats.subscribe( subject="analytics.pageview.processed", deliver_policy="deliver_new" # Only new events ) async def stream_metrics(message): today = datetime.now().strftime("%Y-%m-%d") latest = await ctx.db.query_one( """SELECT COUNT(DISTINCT user_id) as active_users, COUNT(DISTINCT session_id) as active_sessions, COUNT(*) as events_per_second FROM tb_pageview_event WHERE DATE(timestamp) = $1 AND timestamp > NOW() - INTERVAL '1 second'""", [today] )
yield { "timestamp": datetime.now().isoformat(), "active_users": latest["active_users"], "active_sessions": latest["active_sessions"], "events_per_second": latest["events_per_second"] }
await message.ack()# Batch aggregation for older data@fraiseql.job(interval=3600000) # Run every hourasync def batch_aggregate_historical(): """ Batch re-compute aggregates for yesterday. More efficient than real-time updates. """ yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
# Drop old aggregate await ctx.db.execute( "DELETE FROM tb_pageview_aggregate WHERE date = $1", [yesterday] )
# Batch recompute from raw events aggregates = await ctx.db.query( """SELECT page_url, $1::date AS date, COUNT(*) AS view_count, COUNT(DISTINCT user_id) AS unique_visitors FROM tb_pageview_event WHERE DATE(timestamp) = $1 GROUP BY page_url""", [yesterday] )
for agg in aggregates: await ctx.db.execute( """ INSERT INTO tb_pageview_aggregate (page_url, date, view_count, unique_visitors, computed_at) VALUES ($1, $2, $3, $4, NOW()) ON CONFLICT (page_url, date) DO UPDATE SET view_count = EXCLUDED.view_count, unique_visitors = EXCLUDED.unique_visitors, computed_at = NOW() """, [agg['page_url'], agg['date'], agg['view_count'], agg['unique_visitors']] )# Real-time dashboardquery DashboardMetrics { topPages(limit: 10) { pageUrl viewCount uniqueVisitors bounceRate avgSessionDuration }
pageAnalytics(pageUrl: "/products", date: "2024-01-15") { pageUrl viewCount uniqueVisitors conversionFunnel { step_1 { users, conversionRate } step_2 { users, conversionRate } step_3 { users, conversionRate } } }}
# Historical reportquery HistoricalReport($startDate: String!, $endDate: String!) { cohortAnalysis(startDate: $startDate, endDate: $endDate) { cohorts { date newUsers sessions } totalUsers totalSessions }}@pytest.mark.asyncioasync def test_event_to_aggregate_flow(): """Test complete event ingestion to aggregation.""" # Generate test events for i in range(100): event = { "user_id": f"user-{i % 10}", "session_id": f"session-{i % 5}", "page_url": "/products", "timestamp": datetime.now().isoformat() } await nats.publish("events.pageview", event)
# Wait for processing await asyncio.sleep(0.5)
# Verify aggregates updated agg = await get_aggregate("/products") assert agg.view_count == 100 assert agg.unique_visitors == 10
@pytest.mark.asyncioasync def test_bounce_rate_calculation(): """Test bounce rate resolver.""" analytics = await get_page_analytics("/products") bounce_rate = await analytics.bounce_rate() assert 0 <= bounce_rate <= 100Advanced Federation
Multi-DB patterns for analytics pipelines.
Advanced NATS
JetStream streaming and replay patterns.
Custom Resolvers
Complex calculations in field resolvers.