Real-Time Features
Complete subscriptions reference.
A complete example of a Google Docs-like collaborative editor using FraiseQL and NATS event streaming.
Repository: github.com/fraiseql/examples/realtime-collaboration
CREATE TABLE tb_document ( pk_document BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE, title TEXT NOT NULL, content TEXT NOT NULL DEFAULT '', fk_owner BIGINT NOT NULL REFERENCES tb_user(pk_user), created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now());
CREATE TABLE tb_document_share ( pk_share BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE GENERATED ALWAYS AS (id::text) STORED, fk_document BIGINT NOT NULL REFERENCES tb_document(pk_document), fk_user BIGINT NOT NULL REFERENCES tb_user(pk_user), permission TEXT NOT NULL DEFAULT 'view', -- 'view', 'edit', 'admin' shared_at TIMESTAMPTZ DEFAULT now(), UNIQUE(fk_document, fk_user));
CREATE TABLE tb_document_edit ( pk_edit BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE, identifier TEXT NOT NULL UNIQUE GENERATED ALWAYS AS (id::text) STORED, fk_document BIGINT NOT NULL REFERENCES tb_document(pk_document), fk_user BIGINT NOT NULL REFERENCES tb_user(pk_user), operation JSONB NOT NULL, -- { type: "insert", position: 100, content: "text" } timestamp TIMESTAMPTZ DEFAULT now());
-- Views with JSONB data columnCREATE VIEW v_document ASSELECT d.id, jsonb_build_object( 'id', d.id::text, 'identifier', d.identifier, 'title', d.title, 'content', d.content, 'owner_id', u.id::text, 'created_at', d.created_at, 'updated_at', d.updated_at ) AS dataFROM tb_document dJOIN tb_user u ON u.pk_user = d.fk_owner;
CREATE VIEW v_document_edit ASSELECT e.id, jsonb_build_object( 'id', e.id::text, 'document_id', d.id::text, 'user_id', u.id::text, 'operation', e.operation, 'timestamp', e.timestamp ) AS dataFROM tb_document_edit eJOIN tb_document d ON d.pk_document = e.fk_documentJOIN tb_user u ON u.pk_user = e.fk_user;import fraiseqlfrom fraiseql.scalars import IDfrom datetime import datetime
@fraiseql.typeclass Document: id: ID title: str content: str owner_id: ID created_at: datetime updated_at: datetime
@fraiseql.typeclass DocumentShare: id: ID document_id: ID user_id: ID permission: str # "view", "edit", "admin" shared_at: datetime
@fraiseql.typeclass DocumentEdit: id: ID document_id: ID user_id: ID operation: dict # { type: "insert", position: 100, content: "text" } timestamp: datetime
@fraiseql.typeclass DocumentPresence: user_id: ID cursor_position: int selection_start: int selection_end: int# Subscribe to document changessubscription { documentChanged(documentId: "doc123") { id content updatedAt }}
# Subscribe to presence (who's editing)subscription { presenceUpdated(documentId: "doc123") { userId cursorPosition user { name email } }}
# Subscribe to commentssubscription { commentAdded(documentId: "doc123") { id content author { name } createdAt }}The subscriptions are declared as compile-time projections in Python:
@fraiseql.subscription( entity_type="Document", operation="UPDATE")def document_changed(document_id: ID | None = None) -> Document: """Subscribe to document content changes.""" pass
@fraiseql.subscription( entity_type="DocumentPresence", topic="documents.presence")def presence_updated(document_id: ID | None = None) -> DocumentPresence: """Subscribe to presence updates (cursors, selections).""" pass# Apply an edit operationmutation { applyEdit( documentId: "doc123" operation: { type: "insert" position: 150 content: "new text" } ) { id operation timestamp }}
# Update cursor/selection positionmutation { updatePresence( documentId: "doc123" cursorPosition: 250 selectionStart: 200 selectionEnd: 300 ) { userId cursorPosition }}
# Share document with another usermutation { shareDocument( documentId: "doc123" userId: "user456" permission: "edit" ) { id permission sharedAt }}The apply_edit mutation maps to fn_apply_edit, which performs OT conflict resolution and persists the result in a single atomic transaction:
@fraiseql.mutation( sql_source="fn_apply_edit", operation="CUSTOM", inject={"user_id": "jwt:sub"})def apply_edit(document_id: ID, operation: dict) -> DocumentEdit: """Apply edit with automatic conflict resolution.""" passCREATE FUNCTION fn_apply_edit( p_user_id UUID, p_document_id UUID, p_operation JSONB) RETURNS mutation_response AS $$DECLARE v_fk_doc BIGINT; v_content TEXT; v_edit_id UUID;BEGIN SELECT pk_document, content INTO v_fk_doc, v_content FROM tb_document WHERE id = p_document_id;
IF v_fk_doc IS NULL THEN RETURN ROW('failed:not_found', 'Document not found', NULL, 'DocumentEdit', NULL, NULL, NULL, NULL)::mutation_response; END IF;
-- In a full OT implementation, transform p_operation against any -- operations recorded since p_operation->>'client_timestamp'. -- For simplicity this example appends the operation as-is.
INSERT INTO tb_document_edit (fk_document, fk_user, operation) VALUES ( v_fk_doc, (SELECT pk_user FROM tb_user WHERE id = p_user_id), p_operation ) RETURNING id INTO v_edit_id;
-- Update document content (apply operation result) UPDATE tb_document SET content = p_operation->>'result_content', updated_at = NOW() WHERE pk_document = v_fk_doc;
-- pg_notify triggers the subscription broadcast automatically PERFORM pg_notify( 'documents:changed:' || p_document_id::text, jsonb_build_object( 'document_id', p_document_id, 'edit_id', v_edit_id, 'user_id', p_user_id )::text );
RETURN ROW( 'success', NULL, v_edit_id, 'DocumentEdit', NULL, NULL, NULL, NULL )::mutation_response;END;$$ LANGUAGE plpgsql;Presence (cursor positions and selections) is stored in tb_document_edit as ephemeral update records, or alternatively handled by an external Redis service outside FraiseQL. The mutation declaration is a standard compile-time pattern:
@fraiseql.mutation( sql_source="fn_update_presence", operation="UPDATE", inject={"user_id": "jwt:sub"})def update_presence( document_id: ID, cursor_position: int, selection_start: int = 0, selection_end: int = 0) -> DocumentPresence: """Update user's cursor/selection and broadcast via pg_notify.""" passCREATE FUNCTION fn_update_presence( p_user_id UUID, p_document_id UUID, p_cursor_pos INT, p_sel_start INT DEFAULT 0, p_sel_end INT DEFAULT 0) RETURNS mutation_response AS $$BEGIN -- Broadcast presence via pg_notify; FraiseQL delivers to subscribers. PERFORM pg_notify( 'documents:presence:' || p_document_id::text, jsonb_build_object( 'user_id', p_user_id, 'cursor_position', p_cursor_pos, 'selection_start', p_sel_start, 'selection_end', p_sel_end )::text );
RETURN ROW( 'success', NULL, NULL, 'DocumentPresence', jsonb_build_object( 'user_id', p_user_id, 'cursor_position', p_cursor_pos, 'selection_start', p_sel_start, 'selection_end', p_sel_end ), NULL, NULL, NULL )::mutation_response;END;$$ LANGUAGE plpgsql;@fraiseql.mutation( sql_source="fn_share_document", operation="CREATE", inject={"owner_id": "jwt:sub"})def share_document(document_id: ID, user_id: ID, permission: str) -> DocumentShare: """Share document with another user. Ownership verified in SQL.""" passCREATE FUNCTION fn_share_document( p_owner_id UUID, p_document_id UUID, p_user_id UUID, p_permission TEXT) RETURNS mutation_response AS $$DECLARE v_fk_doc BIGINT; v_share_id UUID;BEGIN -- Verify ownership SELECT pk_document INTO v_fk_doc FROM tb_document WHERE id = p_document_id AND fk_owner = (SELECT pk_user FROM tb_user WHERE id = p_owner_id);
IF v_fk_doc IS NULL THEN RETURN ROW('failed:authorization', 'You can only share documents you own', NULL, 'DocumentShare', NULL, NULL, NULL, NULL)::mutation_response; END IF;
INSERT INTO tb_document_share (fk_document, fk_user, permission) VALUES ( v_fk_doc, (SELECT pk_user FROM tb_user WHERE id = p_user_id), p_permission ) ON CONFLICT (fk_document, fk_user) DO UPDATE SET permission = EXCLUDED.permission RETURNING id INTO v_share_id;
-- Notify recipient via pg_notify PERFORM pg_notify( 'share:received', jsonb_build_object( 'document_id', p_document_id, 'user_id', p_user_id, 'permission', p_permission )::text );
RETURN ROW( 'success', NULL, v_share_id, 'DocumentShare', NULL, NULL, NULL, NULL )::mutation_response;END;$$ LANGUAGE plpgsql;@fraiseql.querydef document_history(document_id: ID, first: int = 50) -> list[DocumentEdit]: """Get edit history for a document in chronological order.""" return fraiseql.config(sql_source="v_document_edit")Clone the example
git clone https://github.com/fraiseql/examples/realtime-collaborationcd realtime-collaborationSet up environment
cp .env.example .envuv syncStart services
docker-compose up -d postgres redis natsRun migrations
fraiseql migrateStart FraiseQL server
fraiseql runVisit the editor
open http://localhost:3000Real-Time Features
Complete subscriptions reference.
NATS Integration
Event streaming and pub/sub patterns.
Performance Optimization
Batching and caching strategies.
Deployment
Kubernetes with sticky WebSocket sessions.