Skip to content

Real-Time Analytics Platform

A real-time analytics platform combining NATS-based event ingestion, pre-computed aggregates, Arrow Flight columnar analytics, and live GraphQL subscriptions for dashboards.

Real-time analytics: event ingestion through PostgreSQL to FraiseQL API dashboards Real-time analytics: event ingestion through PostgreSQL to FraiseQL API dashboards
External workers ingest events; FraiseQL exposes read-only queries and subscriptions over the aggregated data.
-- Raw events (immutable log)
CREATE TABLE tb_pageview_event (
pk_event BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT UNIQUE NOT NULL,
fk_user BIGINT REFERENCES tb_user(pk_user),
session_id UUID NOT NULL,
page_url TEXT NOT NULL,
referrer TEXT,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT now(),
properties JSONB NOT NULL DEFAULT '{}'
);
CREATE UNIQUE INDEX idx_tb_pageview_event_id ON tb_pageview_event(id);
CREATE INDEX idx_tb_pageview_event_fk_user ON tb_pageview_event(fk_user);
CREATE INDEX idx_tb_pageview_event_page_url ON tb_pageview_event(page_url);
CREATE INDEX idx_tb_pageview_event_recorded_at ON tb_pageview_event(recorded_at DESC);
-- Pre-computed aggregates (updated by pg_cron or external worker)
CREATE TABLE tb_pageview_aggregate (
pk_aggregate BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT UNIQUE NOT NULL, -- e.g. '/products|2024-01-15'
page_url TEXT NOT NULL,
date DATE NOT NULL,
view_count INTEGER NOT NULL DEFAULT 0,
unique_visitors INTEGER NOT NULL DEFAULT 0,
bounce_rate FLOAT NOT NULL DEFAULT 0.0,
avg_session_duration FLOAT NOT NULL DEFAULT 0.0,
computed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(page_url, date)
);
CREATE UNIQUE INDEX idx_tb_pageview_aggregate_id ON tb_pageview_aggregate(id);
CREATE INDEX idx_tb_pageview_aggregate_page_url_date
ON tb_pageview_aggregate(page_url, date DESC);
-- Read view: raw events
CREATE VIEW v_pageview_event AS
SELECT
e.id,
jsonb_build_object(
'id', e.id::text,
'identifier', e.identifier,
'session_id', e.session_id::text,
'page_url', e.page_url,
'referrer', e.referrer,
'recorded_at', e.recorded_at,
'properties', e.properties
) AS data
FROM tb_pageview_event e;
-- Read view: pre-computed aggregates
CREATE VIEW v_pageview_aggregate AS
SELECT
a.id,
jsonb_build_object(
'id', a.id::text,
'identifier', a.identifier,
'page_url', a.page_url,
'date', a.date,
'view_count', a.view_count,
'unique_visitors', a.unique_visitors,
'bounce_rate', a.bounce_rate,
'avg_session_duration', a.avg_session_duration,
'computed_at', a.computed_at
) AS data
FROM tb_pageview_aggregate a;
-- Arrow Flight view for high-throughput columnar analytics
CREATE VIEW va_pageview_aggregate AS
SELECT
page_url,
date,
view_count,
unique_visitors,
bounce_rate,
avg_session_duration
FROM tb_pageview_aggregate;
schema.py
import fraiseql
from fraiseql.scalars import ID, DateTime, Date
@fraiseql.type
class PageViewEvent:
"""Raw immutable page view event."""
id: ID
identifier: str
session_id: str
page_url: str
referrer: str | None
recorded_at: DateTime
properties: fraiseql.Json
@fraiseql.type
class PageViewAggregate:
"""Pre-computed page view metrics for a page on a given date."""
id: ID
identifier: str
page_url: str
date: Date
view_count: int
unique_visitors: int
bounce_rate: float
avg_session_duration: float
computed_at: DateTime
@fraiseql.query
def pageview_events(
page_url: str | None = None,
limit: int = 100,
offset: int = 0,
) -> list[PageViewEvent]:
"""List raw page view events. Supports filtering by page_url."""
return fraiseql.config(sql_source="v_pageview_event")
@fraiseql.query
def page_analytics(page_url: str, date: str) -> PageViewAggregate | None:
"""Get pre-computed analytics for a specific page on a given date."""
return fraiseql.config(sql_source="v_pageview_aggregate", cache_ttl_seconds=60)
@fraiseql.query
def top_pages(limit: int = 50) -> list[PageViewAggregate]:
"""Get top pages by view count (all dates, ordered by view_count DESC)."""
return fraiseql.config(sql_source="v_pageview_aggregate", cache_ttl_seconds=300)
@fraiseql.subscription(
entity_type="PageViewAggregate",
topic="analytics.aggregate.updated",
)
def live_page_metrics(page_url: str | None = None) -> PageViewAggregate:
"""Subscribe to real-time aggregate updates as new events are processed."""
pass
fraiseql.export_schema("schema.json")
fraiseql.toml
[project]
name = "analytics-platform"
version = "1.0.0"
[database]
url = "${DATABASE_URL}"
[fraiseql]
schema_file = "schema.json"
output_file = "schema.compiled.json"
# JWT/OIDC config via environment variables (no [auth] TOML section):
# OIDC_ISSUER_URL, OIDC_CLIENT_ID, OIDC_CLIENT_SECRET
# When tb_pageview_aggregate is updated, push live subscription events via NATS.
# The Rust runtime receives pg_notify from PostgreSQL and forwards to subscribers.
[[observers]]
table = "tb_pageview_aggregate"
event = "UPDATE"
subject = "analytics.aggregate.updated"
[[observers]]
table = "tb_pageview_aggregate"
event = "INSERT"
subject = "analytics.aggregate.updated"
[observers]
backend = "nats"
nats_url = "${NATS_URL}"
# [caching] is aspirational — this TOML section is unverified and may cause parse errors.
# Remove this block until [caching] support is confirmed stable.
[caching]
enabled = true
backend = "redis"
[security.enterprise]
enabled = true
log_level = "info"

Real-Time Event Ingestion — External Worker

Section titled “Real-Time Event Ingestion — External Worker”
ingestion-worker/main.py
"""
Analytics ingestion worker.
Subscribes to NATS using nats.py and writes raw events to PostgreSQL.
This is a standalone service — not a FraiseQL schema file.
"""
import asyncio
import json
import uuid
import asyncpg
import nats
async def ingest_pageview(msg):
"""
Store a page view event.
Raw events are immutable — always INSERT, never UPDATE.
Upserts the real-time aggregate for today's date.
"""
event = json.loads(msg.data)
conn = await asyncpg.connect(POSTGRES_URL)
try:
# Store raw event
identifier = str(uuid.uuid4())
await conn.execute(
"""
INSERT INTO tb_pageview_event
(identifier, fk_user, session_id, page_url, referrer, recorded_at, properties)
VALUES ($1, $2, $3::uuid, $4, $5, $6::timestamptz, $7::jsonb)
""",
identifier,
event.get("user_pk"), # INTEGER FK resolved by the sender
event["session_id"],
event["page_url"],
event.get("referrer"),
event["timestamp"],
json.dumps(event.get("properties", {})),
)
# Upsert real-time aggregate for today
agg_identifier = f"{event['page_url']}|{event['date']}"
await conn.execute(
"""
INSERT INTO tb_pageview_aggregate
(identifier, page_url, date, view_count, unique_visitors, computed_at)
VALUES ($1, $2, $3::date, 1, 1, NOW())
ON CONFLICT (page_url, date)
DO UPDATE SET
view_count = tb_pageview_aggregate.view_count + 1,
computed_at = NOW()
""",
agg_identifier,
event["page_url"],
event["date"],
)
# pg_notify fires here; FraiseQL observer forwards to analytics.aggregate.updated NATS subject
# which drives live_page_metrics subscription clients.
await msg.ack()
except Exception as exc:
print(f"[ingestion-worker] ERROR: {exc}")
await msg.nak(delay=5)
finally:
await conn.close()
async def main():
nc = await nats.connect(NATS_URL)
js = nc.jetstream()
await js.subscribe(
"events.pageview",
durable="analytics_ingestion",
cb=ingest_pageview,
)
print("Ingestion worker listening on events.pageview")
await asyncio.sleep(float("inf"))
if __name__ == "__main__":
import os
NATS_URL = os.environ["NATS_URL"]
POSTGRES_URL = os.environ["DATABASE_URL"]
asyncio.run(main())
migrations/batch_aggregation.sql
-- Batch re-aggregate yesterday's data from raw events.
-- Called by pg_cron every hour. More accurate than real-time upserts.
CREATE OR REPLACE FUNCTION fn_batch_aggregate_pageviews(p_date DATE)
RETURNS VOID LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO tb_pageview_aggregate
(identifier, page_url, date, view_count, unique_visitors, bounce_rate, computed_at)
SELECT
page_url || '|' || p_date::text AS identifier,
page_url,
p_date,
COUNT(*) AS view_count,
COUNT(DISTINCT session_id) AS unique_visitors,
-- Bounce rate: sessions with exactly one pageview
ROUND(
COUNT(DISTINCT session_id) FILTER (
WHERE session_id IN (
SELECT session_id
FROM tb_pageview_event
WHERE DATE(recorded_at) = p_date
GROUP BY session_id
HAVING COUNT(*) = 1
)
)::numeric
/ NULLIF(COUNT(DISTINCT session_id), 0) * 100,
2
) AS bounce_rate,
NOW() AS computed_at
FROM tb_pageview_event
WHERE DATE(recorded_at) = p_date
GROUP BY page_url
ON CONFLICT (page_url, date)
DO UPDATE SET
view_count = EXCLUDED.view_count,
unique_visitors = EXCLUDED.unique_visitors,
bounce_rate = EXCLUDED.bounce_rate,
computed_at = EXCLUDED.computed_at;
END;
$$;
-- Schedule with pg_cron (runs every hour, re-aggregates yesterday)
-- Requires pg_cron extension: CREATE EXTENSION pg_cron;
SELECT cron.schedule(
'batch-aggregate-pageviews',
'0 * * * *',
$$SELECT fn_batch_aggregate_pageviews(CURRENT_DATE - 1)$$
);
# In schema.py — already declared above, shown here for clarity:
@fraiseql.subscription(
entity_type="PageViewAggregate",
topic="analytics.aggregate.updated",
)
def live_page_metrics(page_url: str | None = None) -> PageViewAggregate:
"""
Subscribe to real-time aggregate updates.
Flow:
1. Ingestion worker upserts tb_pageview_aggregate
2. PostgreSQL pg_notify fires on UPDATE/INSERT
3. FraiseQL Rust observer forwards to NATS analytics.aggregate.updated
4. Subscription clients receive updated PageViewAggregate
Pass page_url to filter events to a specific page.
"""
pass

GraphQL subscription query for dashboards:

subscription LiveDashboard {
livePageMetrics(pageUrl: "/products") {
pageUrl
date
viewCount
uniqueVisitors
bounceRate
avgSessionDuration
computedAt
}
}
# Real-time dashboard — cached for 60 seconds
query DashboardMetrics {
topPages(limit: 10) {
pageUrl
viewCount
uniqueVisitors
bounceRate
avgSessionDuration
}
pageAnalytics(pageUrl: "/products", date: "2024-01-15") {
pageUrl
viewCount
uniqueVisitors
bounceRate
avgSessionDuration
computedAt
}
}

For high-throughput columnar data export (BI tools, data warehouses), the va_pageview_aggregate view is exposed via Arrow Flight:

analytics-client/arrow_query.py
"""
Query FraiseQL Arrow Flight endpoint for columnar analytics.
Uses the va_ view convention for Arrow-native access.
"""
import pyarrow.flight as flight
import pandas as pd
def query_top_pages(limit: int = 100) -> pd.DataFrame:
client = flight.connect("grpc://localhost:8815")
descriptor = flight.FlightDescriptor.for_command(
b'SELECT page_url, date, view_count, unique_visitors, bounce_rate '
b'FROM va_pageview_aggregate '
b'ORDER BY view_count DESC '
b'LIMIT 100'
)
info = client.get_flight_info(descriptor)
reader = client.do_get(info.endpoints[0].ticket)
table = reader.read_all()
return table.to_pandas()
if __name__ == "__main__":
df = query_top_pages()
print(df.head(20))
tests/test_analytics.py
import asyncio
import json
import pytest
import nats
import httpx
@pytest.mark.asyncio
async def test_event_to_aggregate_flow():
"""Publish events via NATS; verify aggregate is updated via GraphQL."""
nc = await nats.connect(NATS_URL)
js = nc.jetstream()
# Publish 10 test events
for i in range(10):
await js.publish(
"events.pageview",
json.dumps({
"user_pk": i % 3 + 1,
"session_id": f"session-{i % 5}",
"page_url": "/test-page",
"date": "2024-01-15",
"timestamp": "2024-01-15T10:00:00Z",
}).encode(),
)
await nc.drain()
# Wait for ingestion worker to process
await asyncio.sleep(1.0)
async with httpx.AsyncClient(base_url="http://localhost:8080") as client:
response = await client.post(
"/graphql",
json={
"query": """
query {
pageAnalytics(pageUrl: "/test-page", date: "2024-01-15") {
viewCount
uniqueVisitors
}
}
"""
},
headers={"Authorization": f"Bearer {TEST_TOKEN}"},
)
data = response.json()
assert data["data"]["pageAnalytics"]["viewCount"] >= 10
assert data["data"]["pageAnalytics"]["uniqueVisitors"] >= 3
  • NATS cluster with JetStream persistence configured
  • Raw events archived to long-term storage (S3, GCS) via a separate consumer
  • pg_cron scheduled for hourly re-aggregation
  • cache_ttl_seconds tuned per query based on acceptable staleness
  • Monitoring on NATS consumer lag and PostgreSQL query latency
  • Backfill strategy defined for missing aggregation windows
  • Data retention policies set for tb_pageview_event (e.g. 90-day TTL)
  • Arrow Flight endpoint secured with mTLS for BI tool access

Subscriptions Reference

Full subscription configuration options.

Arrow Flight

Columnar analytics via va_ views.

Observers Reference

TOML observer configuration for NATS events.