Skip to content

Real-Time Analytics Platform

A real-time analytics platform combining data federation, NATS event streaming, and custom resolvers for live dashboards.

Event Sources (Web, Mobile, Server)
|
NATS Event Bus
|
+----+----+
| |
Raw Events Aggregates
DB DB
| |
+----+----+
|
FraiseQL API
(Federation + Custom Resolvers)
|
Dashboards
-- Raw events (immutable log)
CREATE TABLE tb_pageview_event (
pk_event SERIAL PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
user_id UUID NOT NULL,
session_id UUID NOT NULL,
page_url TEXT NOT NULL,
referrer TEXT,
timestamp TIMESTAMPTZ NOT NULL DEFAULT now(),
properties JSONB NOT NULL DEFAULT '{}'
);
-- Pre-computed aggregates
CREATE TABLE tb_pageview_aggregate (
pk_aggregate SERIAL PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
page_url TEXT NOT NULL,
date DATE NOT NULL,
view_count INT NOT NULL DEFAULT 0,
unique_visitors INT NOT NULL DEFAULT 0,
bounce_rate FLOAT NOT NULL DEFAULT 0.0,
avg_session_duration FLOAT NOT NULL DEFAULT 0.0,
computed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(page_url, date)
);
-- Views with JSONB data column
CREATE VIEW v_pageview_event AS
SELECT
e.id,
jsonb_build_object(
'id', e.id::text,
'user_id', e.user_id::text,
'session_id', e.session_id::text,
'page_url', e.page_url,
'referrer', e.referrer,
'timestamp', e.timestamp,
'properties', e.properties
) AS data
FROM tb_pageview_event e;
CREATE VIEW v_pageview_aggregate AS
SELECT
a.id,
jsonb_build_object(
'id', a.id::text,
'page_url', a.page_url,
'date', a.date,
'view_count', a.view_count,
'unique_visitors', a.unique_visitors,
'bounce_rate', a.bounce_rate,
'avg_session_duration', a.avg_session_duration,
'computed_at', a.computed_at
) AS data
FROM tb_pageview_aggregate a;
import fraiseql
from datetime import datetime
from typing import Optional
@fraiseql.type(database="raw_events")
class PageViewEvent:
"""Raw immutable page view event."""
id: fraiseql.ID
user_id: fraiseql.ID
session_id: fraiseql.ID
page_url: str
referrer: Optional[str]
timestamp: datetime
properties: dict
@fraiseql.type(database="aggregates")
class PageViewAggregate:
"""Pre-computed page view metrics."""
id: fraiseql.ID
page_url: str
date: str # YYYY-MM-DD
view_count: int
unique_visitors: int
bounce_rate: float
avg_session_duration: float
computed_at: datetime
@fraiseql.nats.subscribe(
subject="events.pageview",
consumer_group="analytics_ingestion",
max_concurrent=100 # High concurrency for real-time
)
async def ingest_pageview(message):
"""
Ingest page view events.
Store raw for long-term analysis.
Update aggregates for dashboards.
"""
event = message.data
try:
# Store raw event (immutable)
await ctx.db.execute(
"""
INSERT INTO tb_pageview_event
(user_id, session_id, page_url, referrer, timestamp, properties)
VALUES ($1, $2, $3, $4, $5, $6::jsonb)
""",
[
event["user_id"],
event["session_id"],
event["page_url"],
event.get("referrer"),
event["timestamp"],
json.dumps(event.get("properties", {}))
]
)
# Upsert real-time aggregate
today = datetime.now().strftime("%Y-%m-%d")
await ctx.db.execute(
"""
INSERT INTO tb_pageview_aggregate (page_url, date, view_count, unique_visitors, computed_at)
VALUES ($1, $2, 1, 1, NOW())
ON CONFLICT (page_url, date)
DO UPDATE SET
view_count = tb_pageview_aggregate.view_count + 1,
computed_at = NOW()
""",
[event["page_url"], today]
)
# Publish enriched event for other consumers
await fraiseql.nats.publish(
subject="analytics.pageview.processed",
data=event
)
await message.ack()
except Exception as e:
await log_error({"event": event, "error": str(e)})
await message.nak(timeout=5000)
@fraiseql.type(database="aggregates")
class PageAnalytics:
"""Analytics for a specific page."""
page_url: str
date: str
# Federated data from raw events
raw_events: list[PageViewEvent] = fraiseql.federated(
database="raw_events",
lookup="page_url"
)
# Custom computed metrics
@fraiseql.field_resolver
async def bounce_rate(self, ctx) -> float:
"""
Bounce rate: % of sessions with single pageview.
Computed from raw events.
"""
sessions = await ctx.db.query(
"""SELECT DISTINCT session_id FROM tb_pageview_event
WHERE page_url = $1 AND DATE(timestamp) = $2""",
[self.page_url, self.date]
)
if not sessions:
return 0.0
single_view_sessions = await ctx.db.query_one(
"""SELECT COUNT(DISTINCT session_id) as count
FROM (
SELECT session_id
FROM tb_pageview_event
WHERE page_url = $1 AND DATE(timestamp) = $2
GROUP BY session_id
HAVING COUNT(*) = 1
) sub""",
[self.page_url, self.date]
)
return (single_view_sessions["count"] / len(sessions)) * 100
@fraiseql.field_resolver
async def avg_session_duration(self, ctx) -> float:
"""Average session duration in seconds."""
result = await ctx.db.query_one(
"""SELECT AVG(EXTRACT(EPOCH FROM
(MAX(timestamp) - MIN(timestamp)))) as avg_duration
FROM tb_pageview_event
WHERE page_url = $1 AND DATE(timestamp) = $2
GROUP BY session_id""",
[self.page_url, self.date]
)
return result["avg_duration"] or 0.0
@fraiseql.field_resolver
async def conversion_funnel(self, ctx) -> dict:
"""Analyze conversion funnel for this page."""
funnel_steps = await ctx.db.query(
"""SELECT step_number, COUNT(DISTINCT user_id) as users
FROM tb_conversion_funnel
WHERE landing_page = $1 AND DATE(timestamp) = $2
GROUP BY step_number
ORDER BY step_number""",
[self.page_url, self.date]
)
result = {}
prev_count = None
for step in funnel_steps:
result[f"step_{step['step_number']}"] = {
"users": step["users"],
"conversion_rate": (
(step["users"] / prev_count * 100)
if prev_count else 100
)
}
prev_count = step["users"]
return result
# Root queries for dashboards
@fraiseql.query(sql_source="v_pageview_aggregate", cache_ttl=60)
async def page_analytics(
page_url: str,
date: str
) -> PageAnalytics:
"""Get live analytics for a specific page."""
pass
@fraiseql.query(sql_source="v_pageview_aggregate", cache_ttl=300)
async def top_pages(limit: int = 50) -> list[PageAnalytics]:
"""Get top pages by view count (today's stats)."""
pass
@fraiseql.subscription
async def live_metrics(ctx) -> dict:
"""
Real-time metrics subscription.
Updates every second with latest stats.
"""
@fraiseql.nats.subscribe(
subject="analytics.pageview.processed",
deliver_policy="deliver_new" # Only new events
)
async def stream_metrics(message):
today = datetime.now().strftime("%Y-%m-%d")
latest = await ctx.db.query_one(
"""SELECT COUNT(DISTINCT user_id) as active_users,
COUNT(DISTINCT session_id) as active_sessions,
COUNT(*) as events_per_second
FROM tb_pageview_event
WHERE DATE(timestamp) = $1
AND timestamp > NOW() - INTERVAL '1 second'""",
[today]
)
yield {
"timestamp": datetime.now().isoformat(),
"active_users": latest["active_users"],
"active_sessions": latest["active_sessions"],
"events_per_second": latest["events_per_second"]
}
await message.ack()
# Batch aggregation for older data
@fraiseql.job(interval=3600000) # Run every hour
async def batch_aggregate_historical():
"""
Batch re-compute aggregates for yesterday.
More efficient than real-time updates.
"""
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
# Drop old aggregate
await ctx.db.execute(
"DELETE FROM tb_pageview_aggregate WHERE date = $1",
[yesterday]
)
# Batch recompute from raw events
aggregates = await ctx.db.query(
"""SELECT
page_url,
$1::date AS date,
COUNT(*) AS view_count,
COUNT(DISTINCT user_id) AS unique_visitors
FROM tb_pageview_event
WHERE DATE(timestamp) = $1
GROUP BY page_url""",
[yesterday]
)
for agg in aggregates:
await ctx.db.execute(
"""
INSERT INTO tb_pageview_aggregate
(page_url, date, view_count, unique_visitors, computed_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (page_url, date) DO UPDATE
SET view_count = EXCLUDED.view_count,
unique_visitors = EXCLUDED.unique_visitors,
computed_at = NOW()
""",
[agg['page_url'], agg['date'], agg['view_count'], agg['unique_visitors']]
)
# Real-time dashboard
query DashboardMetrics {
topPages(limit: 10) {
pageUrl
viewCount
uniqueVisitors
bounceRate
avgSessionDuration
}
pageAnalytics(pageUrl: "/products", date: "2024-01-15") {
pageUrl
viewCount
uniqueVisitors
conversionFunnel {
step_1 { users, conversionRate }
step_2 { users, conversionRate }
step_3 { users, conversionRate }
}
}
}
# Historical report
query HistoricalReport($startDate: String!, $endDate: String!) {
cohortAnalysis(startDate: $startDate, endDate: $endDate) {
cohorts {
date
newUsers
sessions
}
totalUsers
totalSessions
}
}
@pytest.mark.asyncio
async def test_event_to_aggregate_flow():
"""Test complete event ingestion to aggregation."""
# Generate test events
for i in range(100):
event = {
"user_id": f"user-{i % 10}",
"session_id": f"session-{i % 5}",
"page_url": "/products",
"timestamp": datetime.now().isoformat()
}
await nats.publish("events.pageview", event)
# Wait for processing
await asyncio.sleep(0.5)
# Verify aggregates updated
agg = await get_aggregate("/products")
assert agg.view_count == 100
assert agg.unique_visitors == 10
@pytest.mark.asyncio
async def test_bounce_rate_calculation():
"""Test bounce rate resolver."""
analytics = await get_page_analytics("/products")
bounce_rate = await analytics.bounce_rate()
assert 0 <= bounce_rate <= 100
  • NATS cluster with JetStream persistence
  • Raw events archived to long-term storage
  • Aggregates automatically computed per time period
  • Cache eviction policy tuned for working set
  • Monitoring on event lag and query latency
  • Backfill strategy for missing data
  • Data retention policies defined
  • High-speed dashboard requires sub-second latency

Advanced Federation

Multi-DB patterns for analytics pipelines.

Advanced NATS

JetStream streaming and replay patterns.

Custom Resolvers

Complex calculations in field resolvers.