Time-Series Patterns
Best practices for analytics data models.
A production-grade analytics backend handling millions of events from mobile apps.
Repository: github.com/fraiseql/examples/mobile-analytics-backend
CREATE TABLE tb_app ( pk_app BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, name TEXT NOT NULL, api_key TEXT NOT NULL UNIQUE, fk_owner BIGINT NOT NULL REFERENCES tb_user(pk_user), created_at TIMESTAMPTZ DEFAULT now());
CREATE TABLE tb_event ( pk_event BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE GENERATED ALWAYS AS (id::text) STORED, fk_app BIGINT NOT NULL REFERENCES tb_app(pk_app), event_type TEXT NOT NULL, session_id TEXT NOT NULL, device JSONB NOT NULL DEFAULT '{}', metadata JSONB NOT NULL DEFAULT '{}', timestamp TIMESTAMPTZ NOT NULL DEFAULT now());
CREATE TABLE tb_metric ( pk_metric BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE GENERATED ALWAYS AS ( fk_app::text || ':' || metric_type || ':' || dimension || ':' || date::text ) STORED, fk_app BIGINT NOT NULL REFERENCES tb_app(pk_app), metric_type TEXT NOT NULL, value BIGINT NOT NULL DEFAULT 0, dimension TEXT NOT NULL, date DATE NOT NULL, created_at TIMESTAMPTZ DEFAULT now(), UNIQUE(fk_app, metric_type, dimension, date));
CREATE TABLE tb_cohort ( pk_cohort BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, fk_app BIGINT NOT NULL REFERENCES tb_app(pk_app), name TEXT NOT NULL, filter JSONB NOT NULL DEFAULT '{}', user_count INT NOT NULL DEFAULT 0, created_at TIMESTAMPTZ DEFAULT now());
-- Views with JSONB data columnCREATE VIEW v_app ASSELECT a.id, jsonb_build_object( 'id', a.id::text, 'identifier', a.identifier, 'name', a.name, 'created_at', a.created_at ) AS dataFROM tb_app a;
CREATE VIEW v_metric ASSELECT m.id, jsonb_build_object( 'id', m.id::text, 'identifier', m.identifier, 'metric_type', m.metric_type, 'value', m.value, 'dimension', m.dimension, 'date', m.date ) AS dataFROM tb_metric m;import fraiseqlfrom fraiseql.scalars import IDfrom datetime import datetime
@fraiseql.typeclass App: id: ID name: str created_at: datetime
@fraiseql.typeclass Event: id: ID event_type: str session_id: str device: dict metadata: dict timestamp: datetime
@fraiseql.typeclass Metric: id: ID metric_type: str value: int dimension: str date: datetime created_at: datetime
@fraiseql.typeclass Cohort: id: ID name: str filter: dict user_count: int created_at: datetime
@fraiseql.querydef metrics(app_id: ID, date_from: str, date_to: str) -> list[Metric]: """Get metrics for date range.""" return fraiseql.config(sql_source="v_metric")
@fraiseql.querydef cohort_retention(app_id: ID, cohort_id: ID) -> list[Metric]: """Get retention data for a cohort.""" return fraiseql.config(sql_source="v_cohort_retention")# Batch event ingestion (hundreds of events at once)mutation { trackEvents( appId: "app123" events: [ { type: "page_view" path: "/home" metadata: { referrer: "/search" } }, { type: "button_click" label: "Sign Up" metadata: { screen: "onboarding" } }, { type: "screen_time" duration: 45000 } ] ) { success eventCount }}FraiseQL mutations are compile-time declarations backed by PostgreSQL functions. Batch ingestion, aggregation updates, and any event fan-out are all handled inside the SQL function — no runtime Python needed.
@fraiseql.inputclass EventInput: """A single event from a mobile client.""" event_type: str session_id: str device: dict metadata: dict
@fraiseql.typeclass TrackEventsResult: success: bool event_count: int
@fraiseql.mutation( sql_source="fn_track_events", operation="CREATE", inject={"caller_id": "jwt:sub"})def track_events(app_id: ID, events: list[EventInput]) -> TrackEventsResult: """Ingest a batch of events from a mobile app.""" passThe SQL function performs the batch insert and rolls up aggregations atomically:
CREATE FUNCTION fn_track_events( p_caller_id UUID, p_app_id UUID, p_events JSONB -- array of EventInput objects) RETURNS mutation_response AS $$DECLARE v_fk_app BIGINT; v_event JSONB; v_count INT := 0;BEGIN -- Verify app ownership via API key (app_id is the public UUID) SELECT pk_app INTO v_fk_app FROM tb_app WHERE id = p_app_id; IF v_fk_app IS NULL THEN RETURN ROW('failed:not_found', 'App not found', NULL, 'TrackEventsResult', NULL, NULL, NULL, NULL)::mutation_response; END IF;
-- Batch insert events FOR v_event IN SELECT * FROM jsonb_array_elements(p_events) LOOP INSERT INTO tb_event (fk_app, event_type, session_id, device, metadata) VALUES ( v_fk_app, v_event->>'event_type', v_event->>'session_id', COALESCE(v_event->'device', '{}'), COALESCE(v_event->'metadata', '{}') ); v_count := v_count + 1;
-- Upsert rolling daily metric INSERT INTO tb_metric (fk_app, metric_type, dimension, value, date) VALUES ( v_fk_app, (v_event->>'event_type') || '_count', COALESCE(v_event->'device'->>'os', 'unknown'), 1, CURRENT_DATE ) ON CONFLICT (fk_app, metric_type, dimension, date) DO UPDATE SET value = tb_metric.value + 1; END LOOP;
RETURN ROW( 'success', NULL, NULL, 'TrackEventsResult', jsonb_build_object('success', true, 'event_count', v_count), NULL, NULL, NULL )::mutation_response;END;$$ LANGUAGE plpgsql;# Get live metrics for dashboardquery { metrics( appId: "app123" dateRange: { from: "2024-01-01", to: "2024-01-31" } ) { metric_type value dimension date }}
# Get user cohortsquery { cohorts(appId: "app123", limit: 20) { id name userCount retentionRate }}
# Get funnels (multi-step user journeys)query { funnel( appId: "app123" steps: ["onboarding", "payment_screen", "purchase_complete"] ) { step users dropoffPercent }}# Subscribe to live dashboard metricssubscription { liveMetrics(appId: "app123", interval: 60) { timestamp pageviews uniqueUsers crashCount avgSessionDuration }}The subscription is declared in Python as a compile-time projection:
@fraiseql.subscription( entity_type="Metric", topic="metrics.live")def live_metrics(app_id: ID | None = None) -> Metric: """Subscribe to live metric updates for a dashboard.""" passCohort retention is computed in a PostgreSQL view. The query declaration maps directly to that view — no runtime Python needed.
@fraiseql.querydef cohort_retention(app_id: ID, cohort_id: ID) -> list[Metric]: """Get weekly retention data for a cohort (cached 5 minutes).""" return fraiseql.config(sql_source="v_cohort_retention", cache_ttl_seconds=300)CREATE VIEW v_cohort_retention ASSELECT c.id, jsonb_build_object( 'id', c.id::text, 'cohort_name', c.name, 'day_offset', gs.day_offset, 'returning_users', COUNT(DISTINCT e.id) ) AS dataFROM tb_cohort cCROSS JOIN generate_series(0, 28, 7) AS gs(day_offset)LEFT JOIN tb_event e ON e.fk_app = c.fk_app AND DATE(e.timestamp) = CURRENT_DATE - gs.day_offsetGROUP BY c.id, c.name, c.fk_app, gs.day_offset;Data retention is managed by a scheduled PostgreSQL job. Use pg_cron (available in most managed PostgreSQL services) to run cleanup on a schedule — no application-layer scheduled tasks needed.
-- Install pg_cron extension (once, requires superuser)CREATE EXTENSION IF NOT EXISTS pg_cron;
-- Schedule daily cleanup at 2 AMSELECT cron.schedule( 'cleanup-old-events', '0 2 * * *', $$ DELETE FROM tb_event WHERE timestamp < NOW() - INTERVAL '90 days'; $$);Clone the example
git clone https://github.com/fraiseql/examples/mobile-analytics-backendcd mobile-analytics-backendSet up environment
cp .env.example .envuv syncStart services
docker-compose up -d postgres redis natsRun migrations
fraiseql migrateStart FraiseQL server
fraiseql runGenerate test events
python scripts/generate_test_events.pyTime-Series Patterns
Best practices for analytics data models.
Performance Tuning
Optimize high-throughput ingestion pipelines.
Scaling Guide
Horizontal scaling for millions of events.
Caching Strategy
Redis caching for aggregated metrics.