Skip to content

Real-Time Collaborative Editor

A complete example of a Google Docs-like collaborative editor using FraiseQL and NATS event streaming.

Repository: github.com/fraiseql/examples/realtime-collaboration

  • Real-Time Subscriptions: Live updates as other users edit
  • Conflict Resolution: Operational Transformation (OT) for concurrent edits
  • Presence: See who’s currently editing
  • Permissions: Share documents with specific permissions
  • History: Full edit history with timestamps
  • Cursor Tracking: See other users’ cursors in real-time
  • Comments: Thread-based discussions on document sections
CREATE TABLE tb_document (
pk_document BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT NOT NULL UNIQUE,
title TEXT NOT NULL,
content TEXT NOT NULL DEFAULT '',
fk_owner BIGINT NOT NULL REFERENCES tb_user(pk_user),
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE tb_document_share (
pk_share BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT NOT NULL UNIQUE
GENERATED ALWAYS AS (id::text) STORED,
fk_document BIGINT NOT NULL REFERENCES tb_document(pk_document),
fk_user BIGINT NOT NULL REFERENCES tb_user(pk_user),
permission TEXT NOT NULL DEFAULT 'view', -- 'view', 'edit', 'admin'
shared_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(fk_document, fk_user)
);
CREATE TABLE tb_document_edit (
pk_edit BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() NOT NULL UNIQUE,
identifier TEXT NOT NULL UNIQUE
GENERATED ALWAYS AS (id::text) STORED,
fk_document BIGINT NOT NULL REFERENCES tb_document(pk_document),
fk_user BIGINT NOT NULL REFERENCES tb_user(pk_user),
operation JSONB NOT NULL, -- { type: "insert", position: 100, content: "text" }
timestamp TIMESTAMPTZ DEFAULT now()
);
-- Views with JSONB data column
CREATE VIEW v_document AS
SELECT
d.id,
jsonb_build_object(
'id', d.id::text,
'identifier', d.identifier,
'title', d.title,
'content', d.content,
'owner_id', u.id::text,
'created_at', d.created_at,
'updated_at', d.updated_at
) AS data
FROM tb_document d
JOIN tb_user u ON u.pk_user = d.fk_owner;
CREATE VIEW v_document_edit AS
SELECT
e.id,
jsonb_build_object(
'id', e.id::text,
'document_id', d.id::text,
'user_id', u.id::text,
'operation', e.operation,
'timestamp', e.timestamp
) AS data
FROM tb_document_edit e
JOIN tb_document d ON d.pk_document = e.fk_document
JOIN tb_user u ON u.pk_user = e.fk_user;
import fraiseql
from fraiseql.scalars import ID
from datetime import datetime
@fraiseql.type
class Document:
id: ID
title: str
content: str
owner_id: ID
created_at: datetime
updated_at: datetime
@fraiseql.type
class DocumentShare:
id: ID
document_id: ID
user_id: ID
permission: str # "view", "edit", "admin"
shared_at: datetime
@fraiseql.type
class DocumentEdit:
id: ID
document_id: ID
user_id: ID
operation: dict # { type: "insert", position: 100, content: "text" }
timestamp: datetime
@fraiseql.type
class DocumentPresence:
user_id: ID
cursor_position: int
selection_start: int
selection_end: int
# Subscribe to document changes
subscription {
documentChanged(documentId: "doc123") {
id
content
updatedAt
}
}
# Subscribe to presence (who's editing)
subscription {
presenceUpdated(documentId: "doc123") {
userId
cursorPosition
user { name email }
}
}
# Subscribe to comments
subscription {
commentAdded(documentId: "doc123") {
id
content
author { name }
createdAt
}
}

The subscriptions are declared as compile-time projections in Python:

@fraiseql.subscription(
entity_type="Document",
operation="UPDATE"
)
def document_changed(document_id: ID | None = None) -> Document:
"""Subscribe to document content changes."""
pass
@fraiseql.subscription(
entity_type="DocumentPresence",
topic="documents.presence"
)
def presence_updated(document_id: ID | None = None) -> DocumentPresence:
"""Subscribe to presence updates (cursors, selections)."""
pass
# Apply an edit operation
mutation {
applyEdit(
documentId: "doc123"
operation: {
type: "insert"
position: 150
content: "new text"
}
) {
id
operation
timestamp
}
}
# Update cursor/selection position
mutation {
updatePresence(
documentId: "doc123"
cursorPosition: 250
selectionStart: 200
selectionEnd: 300
) {
userId
cursorPosition
}
}
# Share document with another user
mutation {
shareDocument(
documentId: "doc123"
userId: "user456"
permission: "edit"
) {
id
permission
sharedAt
}
}

The apply_edit mutation maps to fn_apply_edit, which performs OT conflict resolution and persists the result in a single atomic transaction:

@fraiseql.mutation(
sql_source="fn_apply_edit",
operation="CUSTOM",
inject={"user_id": "jwt:sub"}
)
def apply_edit(document_id: ID, operation: dict) -> DocumentEdit:
"""Apply edit with automatic conflict resolution."""
pass
CREATE FUNCTION fn_apply_edit(
p_user_id UUID,
p_document_id UUID,
p_operation JSONB
) RETURNS mutation_response AS $$
DECLARE
v_fk_doc BIGINT;
v_content TEXT;
v_edit_id UUID;
BEGIN
SELECT pk_document, content
INTO v_fk_doc, v_content
FROM tb_document
WHERE id = p_document_id;
IF v_fk_doc IS NULL THEN
RETURN ROW('failed:not_found', 'Document not found',
NULL, 'DocumentEdit', NULL, NULL, NULL, NULL)::mutation_response;
END IF;
-- In a full OT implementation, transform p_operation against any
-- operations recorded since p_operation->>'client_timestamp'.
-- For simplicity this example appends the operation as-is.
INSERT INTO tb_document_edit (fk_document, fk_user, operation)
VALUES (
v_fk_doc,
(SELECT pk_user FROM tb_user WHERE id = p_user_id),
p_operation
)
RETURNING id INTO v_edit_id;
-- Update document content (apply operation result)
UPDATE tb_document
SET content = p_operation->>'result_content',
updated_at = NOW()
WHERE pk_document = v_fk_doc;
-- pg_notify triggers the subscription broadcast automatically
PERFORM pg_notify(
'documents:changed:' || p_document_id::text,
jsonb_build_object(
'document_id', p_document_id,
'edit_id', v_edit_id,
'user_id', p_user_id
)::text
);
RETURN ROW(
'success', NULL, v_edit_id, 'DocumentEdit',
NULL, NULL, NULL, NULL
)::mutation_response;
END;
$$ LANGUAGE plpgsql;

Presence (cursor positions and selections) is stored in tb_document_edit as ephemeral update records, or alternatively handled by an external Redis service outside FraiseQL. The mutation declaration is a standard compile-time pattern:

@fraiseql.mutation(
sql_source="fn_update_presence",
operation="UPDATE",
inject={"user_id": "jwt:sub"}
)
def update_presence(
document_id: ID,
cursor_position: int,
selection_start: int = 0,
selection_end: int = 0
) -> DocumentPresence:
"""Update user's cursor/selection and broadcast via pg_notify."""
pass
CREATE FUNCTION fn_update_presence(
p_user_id UUID,
p_document_id UUID,
p_cursor_pos INT,
p_sel_start INT DEFAULT 0,
p_sel_end INT DEFAULT 0
) RETURNS mutation_response AS $$
BEGIN
-- Broadcast presence via pg_notify; FraiseQL delivers to subscribers.
PERFORM pg_notify(
'documents:presence:' || p_document_id::text,
jsonb_build_object(
'user_id', p_user_id,
'cursor_position', p_cursor_pos,
'selection_start', p_sel_start,
'selection_end', p_sel_end
)::text
);
RETURN ROW(
'success', NULL, NULL, 'DocumentPresence',
jsonb_build_object(
'user_id', p_user_id,
'cursor_position', p_cursor_pos,
'selection_start', p_sel_start,
'selection_end', p_sel_end
),
NULL, NULL, NULL
)::mutation_response;
END;
$$ LANGUAGE plpgsql;
@fraiseql.mutation(
sql_source="fn_share_document",
operation="CREATE",
inject={"owner_id": "jwt:sub"}
)
def share_document(document_id: ID, user_id: ID, permission: str) -> DocumentShare:
"""Share document with another user. Ownership verified in SQL."""
pass
CREATE FUNCTION fn_share_document(
p_owner_id UUID,
p_document_id UUID,
p_user_id UUID,
p_permission TEXT
) RETURNS mutation_response AS $$
DECLARE
v_fk_doc BIGINT;
v_share_id UUID;
BEGIN
-- Verify ownership
SELECT pk_document INTO v_fk_doc
FROM tb_document
WHERE id = p_document_id
AND fk_owner = (SELECT pk_user FROM tb_user WHERE id = p_owner_id);
IF v_fk_doc IS NULL THEN
RETURN ROW('failed:authorization', 'You can only share documents you own',
NULL, 'DocumentShare', NULL, NULL, NULL, NULL)::mutation_response;
END IF;
INSERT INTO tb_document_share (fk_document, fk_user, permission)
VALUES (
v_fk_doc,
(SELECT pk_user FROM tb_user WHERE id = p_user_id),
p_permission
)
ON CONFLICT (fk_document, fk_user)
DO UPDATE SET permission = EXCLUDED.permission
RETURNING id INTO v_share_id;
-- Notify recipient via pg_notify
PERFORM pg_notify(
'share:received',
jsonb_build_object(
'document_id', p_document_id,
'user_id', p_user_id,
'permission', p_permission
)::text
);
RETURN ROW(
'success', NULL, v_share_id, 'DocumentShare',
NULL, NULL, NULL, NULL
)::mutation_response;
END;
$$ LANGUAGE plpgsql;
@fraiseql.query
def document_history(document_id: ID, first: int = 50) -> list[DocumentEdit]:
"""Get edit history for a document in chronological order."""
return fraiseql.config(sql_source="v_document_edit")
  1. Clone the example

    Terminal window
    git clone https://github.com/fraiseql/examples/realtime-collaboration
    cd realtime-collaboration
  2. Set up environment

    Terminal window
    cp .env.example .env
    uv sync
  3. Start services

    Terminal window
    docker-compose up -d postgres redis nats
  4. Run migrations

    Terminal window
    fraiseql migrate
  5. Start FraiseQL server

    Terminal window
    fraiseql run
  6. Visit the editor

    Terminal window
    open http://localhost:3000
  1. Basic: Create/read/share documents
  2. Real-Time: Add subscriptions for live updates
  3. Collaboration: Implement OT for conflict resolution
  4. Presence: Track cursors and selections
  5. Advanced: Performance optimization with batching

Real-Time Features

Complete subscriptions reference.

NATS Integration

Event streaming and pub/sub patterns.

Performance Optimization

Batching and caching strategies.

Deployment

Kubernetes with sticky WebSocket sessions.