Subscriptions Reference
Full subscription configuration options.
A real-time analytics platform combining NATS-based event ingestion, pre-computed aggregates, Arrow Flight columnar analytics, and live GraphQL subscriptions for dashboards.
-- Raw events (immutable log)CREATE TABLE tb_pageview_event ( pk_event BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT UNIQUE NOT NULL, fk_user BIGINT REFERENCES tb_user(pk_user), session_id UUID NOT NULL, page_url TEXT NOT NULL, referrer TEXT, recorded_at TIMESTAMPTZ NOT NULL DEFAULT now(), properties JSONB NOT NULL DEFAULT '{}');
CREATE UNIQUE INDEX idx_tb_pageview_event_id ON tb_pageview_event(id);CREATE INDEX idx_tb_pageview_event_fk_user ON tb_pageview_event(fk_user);CREATE INDEX idx_tb_pageview_event_page_url ON tb_pageview_event(page_url);CREATE INDEX idx_tb_pageview_event_recorded_at ON tb_pageview_event(recorded_at DESC);
-- Pre-computed aggregates (updated by pg_cron or external worker)CREATE TABLE tb_pageview_aggregate ( pk_aggregate BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT UNIQUE NOT NULL, -- e.g. '/products|2024-01-15' page_url TEXT NOT NULL, date DATE NOT NULL, view_count INTEGER NOT NULL DEFAULT 0, unique_visitors INTEGER NOT NULL DEFAULT 0, bounce_rate FLOAT NOT NULL DEFAULT 0.0, avg_session_duration FLOAT NOT NULL DEFAULT 0.0, computed_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE(page_url, date));
CREATE UNIQUE INDEX idx_tb_pageview_aggregate_id ON tb_pageview_aggregate(id);CREATE INDEX idx_tb_pageview_aggregate_page_url_date ON tb_pageview_aggregate(page_url, date DESC);
-- Read view: raw eventsCREATE VIEW v_pageview_event ASSELECT e.id, jsonb_build_object( 'id', e.id::text, 'identifier', e.identifier, 'session_id', e.session_id::text, 'page_url', e.page_url, 'referrer', e.referrer, 'recorded_at', e.recorded_at, 'properties', e.properties ) AS dataFROM tb_pageview_event e;
-- Read view: pre-computed aggregatesCREATE VIEW v_pageview_aggregate ASSELECT a.id, jsonb_build_object( 'id', a.id::text, 'identifier', a.identifier, 'page_url', a.page_url, 'date', a.date, 'view_count', a.view_count, 'unique_visitors', a.unique_visitors, 'bounce_rate', a.bounce_rate, 'avg_session_duration', a.avg_session_duration, 'computed_at', a.computed_at ) AS dataFROM tb_pageview_aggregate a;
-- Arrow Flight view for high-throughput columnar analyticsCREATE VIEW va_pageview_aggregate ASSELECT page_url, date, view_count, unique_visitors, bounce_rate, avg_session_durationFROM tb_pageview_aggregate;import fraiseqlfrom fraiseql.scalars import ID, DateTime, Date
@fraiseql.typeclass PageViewEvent: """Raw immutable page view event.""" id: ID identifier: str session_id: str page_url: str referrer: str | None recorded_at: DateTime properties: fraiseql.Json
@fraiseql.typeclass PageViewAggregate: """Pre-computed page view metrics for a page on a given date.""" id: ID identifier: str page_url: str date: Date view_count: int unique_visitors: int bounce_rate: float avg_session_duration: float computed_at: DateTime
@fraiseql.querydef pageview_events( page_url: str | None = None, limit: int = 100, offset: int = 0,) -> list[PageViewEvent]: """List raw page view events. Supports filtering by page_url.""" return fraiseql.config(sql_source="v_pageview_event")
@fraiseql.querydef page_analytics(page_url: str, date: str) -> PageViewAggregate | None: """Get pre-computed analytics for a specific page on a given date.""" return fraiseql.config(sql_source="v_pageview_aggregate", cache_ttl_seconds=60)
@fraiseql.querydef top_pages(limit: int = 50) -> list[PageViewAggregate]: """Get top pages by view count (all dates, ordered by view_count DESC).""" return fraiseql.config(sql_source="v_pageview_aggregate", cache_ttl_seconds=300)
@fraiseql.subscription( entity_type="PageViewAggregate", topic="analytics.aggregate.updated",)def live_page_metrics(page_url: str | None = None) -> PageViewAggregate: """Subscribe to real-time aggregate updates as new events are processed.""" pass
fraiseql.export_schema("schema.json")[project]name = "analytics-platform"version = "1.0.0"
[database]url = "${DATABASE_URL}"
[fraiseql]schema_file = "schema.json"output_file = "schema.compiled.json"
# JWT/OIDC config via environment variables (no [auth] TOML section):# OIDC_ISSUER_URL, OIDC_CLIENT_ID, OIDC_CLIENT_SECRET
# When tb_pageview_aggregate is updated, push live subscription events via NATS.# The Rust runtime receives pg_notify from PostgreSQL and forwards to subscribers.[[observers]]table = "tb_pageview_aggregate"event = "UPDATE"subject = "analytics.aggregate.updated"
[[observers]]table = "tb_pageview_aggregate"event = "INSERT"subject = "analytics.aggregate.updated"
[observers]backend = "nats"nats_url = "${NATS_URL}"
# [caching] is aspirational — this TOML section is unverified and may cause parse errors.# Remove this block until [caching] support is confirmed stable.[caching]enabled = truebackend = "redis"
[security.enterprise]enabled = truelog_level = "info""""Analytics ingestion worker.Subscribes to NATS using nats.py and writes raw events to PostgreSQL.This is a standalone service — not a FraiseQL schema file."""import asyncioimport jsonimport uuidimport asyncpgimport nats
async def ingest_pageview(msg): """ Store a page view event. Raw events are immutable — always INSERT, never UPDATE. Upserts the real-time aggregate for today's date. """ event = json.loads(msg.data)
conn = await asyncpg.connect(POSTGRES_URL) try: # Store raw event identifier = str(uuid.uuid4()) await conn.execute( """ INSERT INTO tb_pageview_event (identifier, fk_user, session_id, page_url, referrer, recorded_at, properties) VALUES ($1, $2, $3::uuid, $4, $5, $6::timestamptz, $7::jsonb) """, identifier, event.get("user_pk"), # INTEGER FK resolved by the sender event["session_id"], event["page_url"], event.get("referrer"), event["timestamp"], json.dumps(event.get("properties", {})), )
# Upsert real-time aggregate for today agg_identifier = f"{event['page_url']}|{event['date']}" await conn.execute( """ INSERT INTO tb_pageview_aggregate (identifier, page_url, date, view_count, unique_visitors, computed_at) VALUES ($1, $2, $3::date, 1, 1, NOW()) ON CONFLICT (page_url, date) DO UPDATE SET view_count = tb_pageview_aggregate.view_count + 1, computed_at = NOW() """, agg_identifier, event["page_url"], event["date"], ) # pg_notify fires here; FraiseQL observer forwards to analytics.aggregate.updated NATS subject # which drives live_page_metrics subscription clients.
await msg.ack()
except Exception as exc: print(f"[ingestion-worker] ERROR: {exc}") await msg.nak(delay=5) finally: await conn.close()
async def main(): nc = await nats.connect(NATS_URL) js = nc.jetstream()
await js.subscribe( "events.pageview", durable="analytics_ingestion", cb=ingest_pageview, ) print("Ingestion worker listening on events.pageview") await asyncio.sleep(float("inf"))
if __name__ == "__main__": import os NATS_URL = os.environ["NATS_URL"] POSTGRES_URL = os.environ["DATABASE_URL"] asyncio.run(main())-- Batch re-aggregate yesterday's data from raw events.-- Called by pg_cron every hour. More accurate than real-time upserts.CREATE OR REPLACE FUNCTION fn_batch_aggregate_pageviews(p_date DATE)RETURNS VOID LANGUAGE plpgsql AS $$BEGIN INSERT INTO tb_pageview_aggregate (identifier, page_url, date, view_count, unique_visitors, bounce_rate, computed_at) SELECT page_url || '|' || p_date::text AS identifier, page_url, p_date, COUNT(*) AS view_count, COUNT(DISTINCT session_id) AS unique_visitors, -- Bounce rate: sessions with exactly one pageview ROUND( COUNT(DISTINCT session_id) FILTER ( WHERE session_id IN ( SELECT session_id FROM tb_pageview_event WHERE DATE(recorded_at) = p_date GROUP BY session_id HAVING COUNT(*) = 1 ) )::numeric / NULLIF(COUNT(DISTINCT session_id), 0) * 100, 2 ) AS bounce_rate, NOW() AS computed_at FROM tb_pageview_event WHERE DATE(recorded_at) = p_date GROUP BY page_url ON CONFLICT (page_url, date) DO UPDATE SET view_count = EXCLUDED.view_count, unique_visitors = EXCLUDED.unique_visitors, bounce_rate = EXCLUDED.bounce_rate, computed_at = EXCLUDED.computed_at;END;$$;
-- Schedule with pg_cron (runs every hour, re-aggregates yesterday)-- Requires pg_cron extension: CREATE EXTENSION pg_cron;SELECT cron.schedule( 'batch-aggregate-pageviews', '0 * * * *', $$SELECT fn_batch_aggregate_pageviews(CURRENT_DATE - 1)$$);# In schema.py — already declared above, shown here for clarity:
@fraiseql.subscription( entity_type="PageViewAggregate", topic="analytics.aggregate.updated",)def live_page_metrics(page_url: str | None = None) -> PageViewAggregate: """ Subscribe to real-time aggregate updates.
Flow: 1. Ingestion worker upserts tb_pageview_aggregate 2. PostgreSQL pg_notify fires on UPDATE/INSERT 3. FraiseQL Rust observer forwards to NATS analytics.aggregate.updated 4. Subscription clients receive updated PageViewAggregate
Pass page_url to filter events to a specific page. """ passGraphQL subscription query for dashboards:
subscription LiveDashboard { livePageMetrics(pageUrl: "/products") { pageUrl date viewCount uniqueVisitors bounceRate avgSessionDuration computedAt }}# Real-time dashboard — cached for 60 secondsquery DashboardMetrics { topPages(limit: 10) { pageUrl viewCount uniqueVisitors bounceRate avgSessionDuration }
pageAnalytics(pageUrl: "/products", date: "2024-01-15") { pageUrl viewCount uniqueVisitors bounceRate avgSessionDuration computedAt }}For high-throughput columnar data export (BI tools, data warehouses), the va_pageview_aggregate view is exposed via Arrow Flight:
"""Query FraiseQL Arrow Flight endpoint for columnar analytics.Uses the va_ view convention for Arrow-native access."""import pyarrow.flight as flightimport pandas as pd
def query_top_pages(limit: int = 100) -> pd.DataFrame: client = flight.connect("grpc://localhost:8815")
descriptor = flight.FlightDescriptor.for_command( b'SELECT page_url, date, view_count, unique_visitors, bounce_rate ' b'FROM va_pageview_aggregate ' b'ORDER BY view_count DESC ' b'LIMIT 100' )
info = client.get_flight_info(descriptor) reader = client.do_get(info.endpoints[0].ticket) table = reader.read_all() return table.to_pandas()
if __name__ == "__main__": df = query_top_pages() print(df.head(20))import asyncioimport jsonimport pytestimport natsimport httpx
@pytest.mark.asyncioasync def test_event_to_aggregate_flow(): """Publish events via NATS; verify aggregate is updated via GraphQL.""" nc = await nats.connect(NATS_URL) js = nc.jetstream()
# Publish 10 test events for i in range(10): await js.publish( "events.pageview", json.dumps({ "user_pk": i % 3 + 1, "session_id": f"session-{i % 5}", "page_url": "/test-page", "date": "2024-01-15", "timestamp": "2024-01-15T10:00:00Z", }).encode(), ) await nc.drain()
# Wait for ingestion worker to process await asyncio.sleep(1.0)
async with httpx.AsyncClient(base_url="http://localhost:8080") as client: response = await client.post( "/graphql", json={ "query": """ query { pageAnalytics(pageUrl: "/test-page", date: "2024-01-15") { viewCount uniqueVisitors } } """ }, headers={"Authorization": f"Bearer {TEST_TOKEN}"}, ) data = response.json() assert data["data"]["pageAnalytics"]["viewCount"] >= 10 assert data["data"]["pageAnalytics"]["uniqueVisitors"] >= 3cache_ttl_seconds tuned per query based on acceptable stalenesstb_pageview_event (e.g. 90-day TTL)Subscriptions Reference
Full subscription configuration options.
Arrow Flight
Columnar analytics via va_ views.
Observers Reference
TOML observer configuration for NATS events.