Advanced Federation
Deep dive on Apollo Federation v2 patterns.
A complete multi-tenant SaaS application combining per-tenant database isolation, Apollo Federation v2, NATS messaging for cross-service events, and Row Level Security for data isolation.
[project]name = "auth-service"version = "1.0.0"
[database]url = "${AUTH_DATABASE_URL}"
[fraiseql]schema_file = "schema.json"output_file = "schema.compiled.json"
# OIDC config via environment variables — not in [security.pkce] TOML:# OIDC_ISSUER_URL, OIDC_CLIENT_ID, OIDC_CLIENT_SECRET, OIDC_REDIRECT_URI[security.pkce]enabled = true
[federation]enabled = trueservice_name = "auth"subgraph_url = "http://auth-service:8081/graphql"
[observers]backend = "nats"nats_url = "${NATS_URL}"
[[observers]]table = "tb_organization"event = "INSERT"subject = "analytics.organization_created"
[security.enterprise]enabled = truelog_level = "info"[project]name = "content-service"version = "1.0.0"
[database]url = "${CONTENT_DATABASE_URL}"
[fraiseql]schema_file = "schema.json"output_file = "schema.compiled.json"
# OIDC config via environment variables — not in [security.pkce] TOML:# OIDC_ISSUER_URL, OIDC_CLIENT_ID, OIDC_CLIENT_SECRET, OIDC_REDIRECT_URI[security.pkce]enabled = true
[federation]enabled = trueservice_name = "content"subgraph_url = "http://content-service:8082/graphql"
[observers]backend = "nats"nats_url = "${NATS_URL}"
[[observers]]table = "tb_project"event = "INSERT"subject = "analytics.project_created"
[[observers]]table = "tb_task"event = "INSERT"subject = "analytics.task_created"
[security.enterprise]enabled = truelog_level = "info"-- ============================================================-- Auth service database: organizations and members-- ============================================================
CREATE TABLE tb_organization ( pk_org BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, name TEXT NOT NULL, created_at TIMESTAMPTZ DEFAULT now(), subscription_tier TEXT NOT NULL DEFAULT 'free');
CREATE UNIQUE INDEX idx_tb_organization_id ON tb_organization(id);
CREATE TABLE tb_member ( pk_member BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, fk_org BIGINT NOT NULL REFERENCES tb_organization(pk_org), name TEXT NOT NULL, email TEXT NOT NULL UNIQUE, role TEXT NOT NULL DEFAULT 'member', created_at TIMESTAMPTZ DEFAULT now());
CREATE UNIQUE INDEX idx_tb_member_id ON tb_member(id);CREATE INDEX idx_tb_member_fk_org ON tb_member(fk_org);
-- ============================================================-- Content service database: projects and tasks-- ============================================================
CREATE TABLE tb_project ( pk_project BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, -- org_id from JWT injected as RLS context; no FK across service boundaries org_id UUID NOT NULL, name TEXT NOT NULL, created_at TIMESTAMPTZ DEFAULT now());
CREATE UNIQUE INDEX idx_tb_project_id ON tb_project(id);CREATE INDEX idx_tb_project_org_id ON tb_project(org_id);
CREATE TABLE tb_task ( pk_task BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, fk_project BIGINT NOT NULL REFERENCES tb_project(pk_project), title TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'todo', created_at TIMESTAMPTZ DEFAULT now());
CREATE UNIQUE INDEX idx_tb_task_id ON tb_task(id);CREATE INDEX idx_tb_task_fk_project ON tb_task(fk_project);
-- ============================================================-- Read views (JSONB data column — pk_ never exposed)-- ============================================================
CREATE VIEW v_organization ASSELECT o.id, jsonb_build_object( 'id', o.id::text, 'identifier', o.identifier, 'name', o.name, 'subscription_tier', o.subscription_tier, 'created_at', o.created_at ) AS dataFROM tb_organization o;
CREATE VIEW v_member ASSELECT m.id, jsonb_build_object( 'id', m.id::text, 'identifier', m.identifier, 'name', m.name, 'email', m.email, 'role', m.role, 'created_at', m.created_at ) AS dataFROM tb_member m;
CREATE VIEW v_project ASSELECT p.id, jsonb_build_object( 'id', p.id::text, 'identifier', p.identifier, 'name', p.name, 'org_id', p.org_id::text, 'created_at', p.created_at ) AS dataFROM tb_project p;
CREATE VIEW v_task ASSELECT t.id, jsonb_build_object( 'id', t.id::text, 'identifier', t.identifier, 'title', t.title, 'status', t.status, 'created_at', t.created_at ) AS dataFROM tb_task t;
-- ============================================================-- Mutation functions-- ============================================================
CREATE FUNCTION fn_create_organization( p_identifier TEXT, p_name TEXT) RETURNS mutation_response LANGUAGE plpgsql AS $$DECLARE v_id UUID; v_result mutation_response;BEGIN INSERT INTO tb_organization (identifier, name) VALUES (p_identifier, p_name) RETURNING id INTO v_id;
v_result.status := 'success'; v_result.entity_id := v_id; v_result.entity_type := 'Organization'; RETURN v_result;END;$$;
CREATE FUNCTION fn_create_project( p_identifier TEXT, p_org_id UUID, p_name TEXT) RETURNS mutation_response LANGUAGE plpgsql AS $$DECLARE v_id UUID; v_result mutation_response;BEGIN INSERT INTO tb_project (identifier, org_id, name) VALUES (p_identifier, p_org_id, p_name) RETURNING id INTO v_id;
v_result.status := 'success'; v_result.entity_id := v_id; v_result.entity_type := 'Project'; RETURN v_result;END;$$;
CREATE FUNCTION fn_create_task( p_identifier TEXT, p_fk_project BIGINT, p_title TEXT) RETURNS mutation_response LANGUAGE plpgsql AS $$DECLARE v_id UUID; v_result mutation_response;BEGIN INSERT INTO tb_task (identifier, fk_project, title) VALUES (p_identifier, p_fk_project, p_title) RETURNING id INTO v_id;
v_result.status := 'success'; v_result.entity_id := v_id; v_result.entity_type := 'Task'; RETURN v_result;END;$$;-- Enable RLS for tenant isolation in the content serviceALTER TABLE tb_project ENABLE ROW LEVEL SECURITY;ALTER TABLE tb_task ENABLE ROW LEVEL SECURITY;
-- Projects visible only to the org extracted from JWT (set by FraiseQL Rust runtime)CREATE POLICY project_org_isolation ON tb_project USING (org_id = current_setting('app.org_id')::uuid);
-- Tasks inherit project isolation via JOIN — no separate policy needed if accessed via v_task-- filtered by fk_project.
-- For organizations in auth serviceALTER TABLE tb_organization ENABLE ROW LEVEL SECURITY;
CREATE POLICY org_owner_only ON tb_organization USING (id = current_setting('app.org_id')::uuid);import fraiseqlfrom fraiseql.scalars import ID, DateTimefrom typing import Annotated
@fraiseql.typeclass Organization: """A tenant organization.""" id: ID identifier: str name: str subscription_tier: str created_at: DateTime
@fraiseql.typeclass Member: """A member of an organization.""" id: ID identifier: str name: str email: Annotated[str, fraiseql.field(requires_scope="read:Member.email", on_deny="mask")] role: str created_at: DateTime
@fraiseql.inputclass CreateOrganizationInput: identifier: str name: str
@fraiseql.inputclass CreateMemberInput: identifier: str name: str email: str role: str = "member"
@fraiseql.query( sql_source="v_organization", inject={"org_id": "jwt:org_id"},)def organizations(limit: int = 50) -> list[Organization]: """ Get organizations for the authenticated tenant. org_id is injected from the JWT claim and applied as RLS context. """ pass
@fraiseql.query( sql_source="v_member", inject={"org_id": "jwt:org_id"},)def members(limit: int = 100) -> list[Member]: """Get members of the authenticated tenant's organization.""" pass
@fraiseql.mutation( sql_source="fn_create_organization", operation="CREATE", inject={"org_id": "jwt:org_id"},)def create_organization(input: CreateOrganizationInput) -> Organization: """ Create a new organization. Rust runtime calls fn_create_organization(); pg_notify fires; observer publishes analytics.organization_created to NATS. """ pass
@fraiseql.mutation( sql_source="fn_create_member", operation="CREATE", inject={"org_id": "jwt:org_id"},)def create_member(input: CreateMemberInput) -> Member: """Add a member to the authenticated organization.""" pass
fraiseql.export_schema("schema.json")import fraiseqlfrom fraiseql.scalars import ID, DateTime
@fraiseql.typeclass Project: """A project belonging to an organization.""" id: ID identifier: str name: str org_id: ID created_at: DateTime
@fraiseql.typeclass Task: """A task within a project.""" id: ID identifier: str title: str status: str created_at: DateTime
@fraiseql.inputclass CreateProjectInput: identifier: str name: str
@fraiseql.inputclass CreateTaskInput: identifier: str fk_project: int title: str
@fraiseql.query( sql_source="v_project", inject={"org_id": "jwt:org_id"}, cache_ttl_seconds=30,)def projects(limit: int = 50) -> list[Project]: """ Get projects for the authenticated organization. org_id injected from JWT; RLS policy enforces tenant isolation. """ pass
@fraiseql.query( sql_source="v_task", cache_ttl_seconds=30,)def tasks(fk_project: int | None = None, limit: int = 100) -> list[Task]: """Get tasks, optionally filtered by project.""" pass
@fraiseql.mutation( sql_source="fn_create_project", operation="CREATE", inject={"org_id": "jwt:org_id"}, invalidates_views=["v_project"],)def create_project(input: CreateProjectInput) -> Project: """ Create a project for the authenticated organization. Rust runtime calls fn_create_project(); pg_notify fires; observer publishes analytics.project_created to NATS. """ pass
@fraiseql.mutation( sql_source="fn_create_task", operation="CREATE", invalidates_views=["v_task"],)def create_task(input: CreateTaskInput) -> Task: """Create a task within a project.""" pass
@fraiseql.subscription( entity_type="Task", topic="analytics.task_created",)def task_created(fk_project: int | None = None) -> Task: """Subscribe to new task creation events in real time.""" pass
fraiseql.export_schema("schema.json")"""Analytics sync worker.Subscribes to NATS analytics.* events and records them in the shared analytics DB.Standalone service — not a FraiseQL schema file."""import asyncioimport jsonimport asyncpgimport nats
async def sync_analytics_event(msg): """Record analytics events from any service into the shared analytics DB.""" event_type = msg.subject data = json.loads(msg.data)
conn = await asyncpg.connect(ANALYTICS_POSTGRES_URL) try: await conn.execute( """ INSERT INTO tb_analytics_event (identifier, event_type, tenant_id, data, recorded_at) VALUES (gen_random_uuid()::text, $1, $2, $3::jsonb, NOW()) """, event_type, data.get("tenant_id"), json.dumps(data), ) await msg.ack() except Exception as exc: print(f"[analytics-worker] ERROR {event_type}: {exc}") await msg.nak(delay=5) finally: await conn.close()
async def main(): nc = await nats.connect(NATS_URL) js = nc.jetstream()
await js.subscribe( "analytics.>", durable="analytics_sync", cb=sync_analytics_event, ) print("Analytics worker listening on analytics.>") await asyncio.sleep(float("inf"))
if __name__ == "__main__": import os NATS_URL = os.environ["NATS_URL"] ANALYTICS_POSTGRES_URL = os.environ["ANALYTICS_DATABASE_URL"] asyncio.run(main())-- For each tenant, create an isolated database in the content service-- (database-per-tenant strategy)CREATE DATABASE tenant_acme_corp;CREATE USER tenant_acme_corp WITH PASSWORD '...';GRANT CONNECT ON DATABASE tenant_acme_corp TO tenant_acme_corp;
-- Then run migrations for each tenant database:-- fraiseql migrate --database $TENANT_DATABASE_URLfederation_version: =2.0.0
subgraphs: auth: routing_url: http://auth-service:8081/graphql schema: subgraph_url: http://auth-service:8081/graphql
content: routing_url: http://content-service:8082/graphql schema: subgraph_url: http://content-service:8082/graphqlCompose the supergraph:
rover supergraph compose --config gateway/supergraph.yaml > supergraph.graphqlSet up environment:
cp .env.example .env# Edit .env with your database credentials and OIDC configStart infrastructure:
docker-compose up -d postgres natsCreate tenant databases (for database-per-tenant strategy):
docker-compose exec postgres psql -U postgres -c \ "CREATE DATABASE tenant_acme_corp;"docker-compose exec postgres psql -U postgres -c \ "CREATE DATABASE tenant_global_corp;"Run migrations for each tenant:
export TENANT_DATABASE_URL=postgresql://postgres:password@localhost:5432/tenant_acme_corpfraiseql migrate
export TENANT_DATABASE_URL=postgresql://postgres:password@localhost:5432/tenant_global_corpfraiseql migrateStart all FraiseQL services:
docker-compose up -d auth-service content-service billing-service analytics-workerExpected output from auth-service:
FraiseQL v2.0.1 — auth-serviceGraphQL endpoint: http://localhost:8081/graphqlFederation: enabled (subgraph: auth)NATS observers: connected (nats://nats:4222)Create organization as tenant A:
curl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $TOKEN_A" \ -d '{ "query": "mutation { createOrganization(input: { identifier: \"acme-corp\", name: \"Acme Corp\" }) { id name subscriptionTier } }" }'Expected response:
{ "data": { "createOrganization": { "id": "550e8400-e29b-41d4-a716-446655440000", "name": "Acme Corp", "subscriptionTier": "free" } }}Verify tenant isolation (tenant B cannot see tenant A’s data):
curl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $TOKEN_B" \ -d '{"query": "query { organizations { id name } }"}'Expected response (empty list due to RLS):
{ "data": { "organizations": [] }}Create project and verify NATS event is published:
curl -X POST http://localhost:8080/graphql \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $TOKEN_A" \ -d '{ "query": "mutation { createProject(input: { identifier: \"alpha\", name: \"Project Alpha\" }) { id name } }" }'Wait for the observer to publish to NATS and the analytics worker to consume:
sleep 1Verify event in analytics:
docker-compose exec postgres psql -U postgres -d shared_analytics -c \ "SELECT event_type, tenant_id, recorded_at FROM tb_analytics_event ORDER BY recorded_at DESC LIMIT 5;"Expected output:
event_type | tenant_id | recorded_at-------------------------+------------+----------------------------analytics.project_created | acme_corp | 2024-01-15 10:30:01+00analytics.org_created | acme_corp | 2024-01-15 10:30:00+00Run tests:
pytest tests/test_tenant_isolation.py -vpytest tests/test_nats_events.py -vExpected output:
tests/test_tenant_isolation.py::test_tenant_isolation PASSEDtests/test_tenant_isolation.py::test_cross_tenant_access_blocked PASSEDtests/test_nats_events.py::test_project_creation_event PASSEDtests/test_nats_events.py::test_analytics_sync PASSED# Prometheus metrics exported by each FraiseQL service (labels by tenant from JWT claims)fraiseql_graphql_requests_total{service="auth", operation="mutation"}fraiseql_graphql_latency_seconds{service="content", operation="query"}fraiseql_observer_events_published_total{service="content", subject="analytics.project_created"}
# Analytics worker metrics (custom, in the nats.py worker)analytics_worker_events_processed_total{event_type="analytics.project_created"}analytics_worker_consumer_lag{consumer="analytics_sync"}Error: Database tenant_acme_corp does not existSolution:
docker-compose exec postgres psql -U postgres -c "CREATE DATABASE tenant_acme_corp;"./scripts/create-tenant.sh acme_corpError: Cannot connect to NATS at nats://localhost:4222Check:
Verify NATS is running:
docker-compose ps natsCheck [observers] section in fraiseql.toml:
[observers]backend = "nats"nats_url = "nats://nats:4222" # Use service name, not localhost, in DockerTest NATS connection:
docker-compose exec nats nats server infoIf the analytics table remains empty:
Check FraiseQL observer log:
docker-compose logs content-service | grep "observer"Verify the NATS stream exists:
docker-compose exec nats nats stream reportCheck the analytics worker consumer:
docker-compose exec nats nats consumer reportdocker-compose logs analytics-worker | tail -20If tenant B can see tenant A’s data:
Verify RLS is enabled:
\d tb_project-- Look for "Policies:" sectionCheck the policy is active:
SELECT * FROM pg_policies WHERE tablename = 'tb_project';Verify org_id JWT claim is being injected:
# Decode your JWT and verify it contains org_id claimecho $TOKEN_A | cut -d. -f2 | base64 -d | python3 -m json.toolIf cross-service queries fail at the gateway:
Validate each subgraph is reachable:
curl http://localhost:8081/graphql -d '{"query": "{ __typename }"}'curl http://localhost:8082/graphql -d '{"query": "{ __typename }"}'Re-compose the supergraph if schema changed:
rover supergraph compose --config gateway/supergraph.yaml > supergraph.graphqlCheck gateway logs for composition errors:
docker-compose logs gateway | grep "error"If queries are slow with multiple tenants:
Check database connection pool sizes in each service’s fraiseql.toml.
Enable connection pooling at the PostgreSQL level:
ALTER SYSTEM SET max_connections = 200;SELECT pg_reload_conf();Monitor per-service metrics:
curl http://localhost:8081/metrics | grep fraiseql_graphql_latency[observers] configured and tested per serviceAdvanced Federation
Deep dive on Apollo Federation v2 patterns.
Observers Reference
TOML observer configuration for NATS events.
JWT Injection
Using inject= to pass JWT claims to SQL functions.
Multi-Tenancy Guide
RLS and tenant isolation strategies.