Skip to content

Arrow Flight Dataplane

FraiseQL includes an Apache Arrow Flight dataplane for high-throughput columnar data access, enabling efficient analytics, OLAP queries, and data science workloads.

Arrow Flight provides:

  • Columnar data transfer: Efficient for analytical queries
  • Zero-copy reads: Minimal memory overhead
  • Streaming: Handle datasets larger than memory
  • Language interop: Python, TypeScript/Node.js, R, Julia, Rust clients

Use this table to decide when to reach for Arrow Flight versus the standard GraphQL endpoint:

DimensionGraphQL EndpointArrow Flight
FormatJSON (row-oriented)Apache Arrow IPC (columnar)
Typical throughput~50 MB/s~500 MB/s
Best forCRUD, real-time queries, web clientsAnalytics, ML, ETL, data science
Client librariesAny HTTP clientArrow Flight SDK required
FilteringGraphQL where argumentsPushed down to PostgreSQL
StreamingNDJSON via Wire protocolNative chunked record batches
AuthenticationJWT / API keyBearer token (same as GraphQL)
Port3000 (default)50051 (default)

Arrow Flight is a compile-time feature — there is no [arrow] TOML configuration section. Enable it by building FraiseQL with the arrow feature flag:

Terminal window
cargo build --release --features arrow

When the arrow feature is compiled in, the Flight gRPC server starts automatically on port 50051 alongside the HTTP server (port 3000). Authentication uses the same JWT/API key tokens as the GraphQL endpoint — pass the token as a gRPC authorization header (Bearer eyJ...). TLS follows the server-level TLS configuration.

Create views optimized for columnar access:

-- Analytics view for Arrow
CREATE VIEW va_orders_daily AS
SELECT
date_trunc('day', created_at) AS order_date,
customer_region,
product_category,
COUNT(*) AS order_count,
SUM(total) AS revenue,
AVG(total) AS avg_order_value
FROM tb_order o
JOIN tb_customer c ON c.pk_customer = o.fk_customer
JOIN tb_product p ON p.pk_product = o.fk_product
GROUP BY 1, 2, 3;

Create tables for columnar storage:

-- Fact table for Arrow analytics
CREATE TABLE ta_events (
event_id UUID,
event_type TEXT,
user_id UUID,
session_id UUID,
page_url TEXT,
referrer TEXT,
device_type TEXT,
country TEXT,
properties JSONB,
created_at TIMESTAMPTZ
);
-- Columnar storage (if using TimescaleDB or similar)
SELECT create_hypertable('ta_events', 'created_at');
import fraiseql
from fraiseql.scalars import ID, DateTime, Decimal
@fraiseql.type(dataplane="arrow")
class OrdersDaily:
"""Daily order aggregates for analytics."""
order_date: DateTime
customer_region: str
product_category: str
order_count: int
revenue: Decimal
avg_order_value: Decimal
@fraiseql.arrow_query(sql_source="va_orders_daily")
def orders_daily(
start_date: DateTime,
end_date: DateTime,
regions: list[str] | None = None
) -> list[OrdersDaily]:
"""Query daily order aggregates."""
pass
import pyarrow.flight as flight
import json
# Connect to FraiseQL Arrow Flight server
client = flight.connect("grpc://localhost:50051")
# Authenticate
token = client.authenticate_basic_token("user", "password")
options = flight.FlightCallOptions(headers=[(b"authorization", token)])
# Execute query
ticket = flight.Ticket(json.dumps({
"query": "orders_daily",
"variables": {"start_date": "2024-01-01", "end_date": "2024-12-31"}
}).encode())
reader = client.do_get(ticket, options)
# Read as Arrow Table
table = reader.read_all()
print(table.schema)
# order_date: timestamp[us, tz=UTC]
# customer_region: string
# product_category: string
# order_count: int64
# revenue: decimal128(18, 2)
# avg_order_value: decimal128(18, 2)
print(table.to_pandas())
# order_date customer_region product_category order_count revenue avg_order_value
# 0 2024-01-01 US Electronics 1234 98765.00 80.04
# 1 2024-01-01 EU Electronics 876 71234.00 81.32
# ...
import pyarrow.flight as flight
import json
from collections.abc import Iterator
import pandas as pd
def stream_large_dataset(query: str, chunk_size: int = 100000) -> Iterator[pd.DataFrame]:
"""Stream large datasets in chunks."""
client = flight.connect("grpc://localhost:50051")
ticket = flight.Ticket(json.dumps({"query": query}).encode())
reader = client.do_get(ticket)
for chunk in reader:
batch = chunk.data
# Process batch
yield batch.to_pandas()
# Usage: process 50M rows without loading all into memory
for df_chunk in stream_large_dataset("large_events_export"):
process(df_chunk)
print(f"Processed {len(df_chunk)} rows")
import pyarrow as pa
import pyarrow.compute as pc
def aggregate_without_loading_all(query: str) -> dict:
"""Sum revenue without loading entire dataset into memory."""
client = flight.connect("grpc://localhost:50051")
ticket = flight.Ticket(json.dumps({"query": query}).encode())
reader = client.do_get(ticket)
total_revenue = 0.0
row_count = 0
for chunk in reader:
batch = chunk.data
revenue_col = batch.column("revenue")
total_revenue += pc.sum(revenue_col).as_py()
row_count += len(batch)
return {"total_revenue": total_revenue, "rows": row_count}
result = aggregate_without_loading_all("orders_daily")
print(result)
# {'total_revenue': 4521890.0, 'rows': 42850}
CREATE VIEW va_metrics_hourly AS
SELECT
date_trunc('hour', created_at) AS hour,
metric_name,
COUNT(*) AS sample_count,
AVG(value) AS avg_value,
MIN(value) AS min_value,
MAX(value) AS max_value,
percentile_cont(0.5) WITHIN GROUP (ORDER BY value) AS p50,
percentile_cont(0.95) WITHIN GROUP (ORDER BY value) AS p95,
percentile_cont(0.99) WITHIN GROUP (ORDER BY value) AS p99
FROM ta_metrics
GROUP BY 1, 2;
CREATE VIEW va_user_cohorts AS
WITH first_purchase AS (
SELECT
user_id,
date_trunc('month', MIN(created_at)) AS cohort_month
FROM tb_order
GROUP BY 1
)
SELECT
fp.cohort_month,
date_trunc('month', o.created_at) AS order_month,
COUNT(DISTINCT o.user_id) AS users,
SUM(o.total) AS revenue
FROM first_purchase fp
JOIN tb_order o ON o.user_id = fp.user_id
GROUP BY 1, 2;
CREATE VIEW va_funnel AS
WITH events AS (
SELECT
session_id,
event_type,
created_at,
ROW_NUMBER() OVER (PARTITION BY session_id ORDER BY created_at) AS event_order
FROM ta_events
WHERE event_type IN ('page_view', 'add_to_cart', 'checkout', 'purchase')
)
SELECT
date_trunc('day', e1.created_at) AS day,
COUNT(DISTINCT e1.session_id) AS page_views,
COUNT(DISTINCT e2.session_id) AS add_to_cart,
COUNT(DISTINCT e3.session_id) AS checkout,
COUNT(DISTINCT e4.session_id) AS purchase
FROM events e1
LEFT JOIN events e2 ON e2.session_id = e1.session_id AND e2.event_type = 'add_to_cart'
LEFT JOIN events e3 ON e3.session_id = e1.session_id AND e3.event_type = 'checkout'
LEFT JOIN events e4 ON e4.session_id = e1.session_id AND e4.event_type = 'purchase'
WHERE e1.event_type = 'page_view'
GROUP BY 1;
Batch SizeUse Case
1,024Low latency, small queries
65,536General purpose (default)
1,000,000Large analytics, high throughput

FraiseQL pushes filters to the database:

# This query
query_fraiseql(
"orders_daily",
{"start_date": "2024-01-01", "regions": ["US", "CA"]}
)
# Generates SQL with WHERE clause:
# SELECT ... FROM va_orders_daily
# WHERE order_date >= '2024-01-01'
# AND customer_region IN ('US', 'CA')

Only requested columns are transferred:

# Request specific columns
ticket = flight.Ticket(json.dumps({
"query": "orders_daily",
"columns": ["order_date", "revenue"] # Only these columns
}).encode())
MetricDescription
fraiseql_arrow_requests_totalTotal Arrow Flight requests
fraiseql_arrow_bytes_sentBytes transferred
fraiseql_arrow_batches_sentNumber of record batches
fraiseql_arrow_query_durationQuery execution time
Terminal window
fraiseql status --arrow
# Arrow Flight Server
# Status: Running
# Port: 50051
# Active connections: 12
# Bytes sent (24h): 1.2 TB
# Queries (24h): 45,230
@fraiseql.arrow_query(
sql_source="va_sensitive_metrics",
requires_scope="analytics:read"
)
def sensitive_metrics() -> list[Metric]:
"""Requires analytics:read scope."""
pass
Data TypeTable PrefixUse Case
Aggregatesva_Pre-computed summaries
Raw eventsta_Event streams, logs
Factstf_Star schema analytics
-- Index on commonly filtered columns
CREATE INDEX idx_ta_events_created ON ta_events(created_at);
CREATE INDEX idx_ta_events_type ON ta_events(event_type);
CREATE INDEX idx_ta_events_user ON ta_events(user_id);
  1. Start FraiseQL with Arrow enabled:

    Terminal window
    fraiseql run

    Check logs for Arrow Flight startup:

    INFO: Arrow Flight dataplane listening on 0.0.0.0:50051
  2. Test with a simple query:

    import pyarrow.flight as flight
    client = flight.connect("grpc://localhost:50051")
    ticket = flight.Ticket(b'{"query": "orders_daily"}')
    reader = client.do_get(ticket)
    table = reader.read_all()
    print(f"Rows returned: {table.num_rows}")
    print(f"Columns: {table.column_names}")

    Expected output:

    Rows returned: 365
    Columns: ['order_date', 'customer_region', 'product_category', 'order_count', 'revenue', 'avg_order_value']
  3. Verify streaming for large datasets:

    # Count rows without loading all into memory
    client = flight.connect("grpc://localhost:50051")
    ticket = flight.Ticket(b'{"query": "large_export"}')
    reader = client.do_get(ticket)
    row_count = 0
    for chunk in reader:
    row_count += chunk.data.num_rows
    print(f"Processed batch: {chunk.data.num_rows} rows")
    print(f"Total rows: {row_count}")

    Expected output:

    Processed batch: 65536 rows
    Processed batch: 65536 rows
    Processed batch: 18928 rows
    Total rows: 150000
  4. Check metrics endpoint:

    Terminal window
    curl http://localhost:9090/metrics | grep fraiseql_arrow

    Expected output:

    fraiseql_arrow_requests_total 42
    fraiseql_arrow_rows_streamed_total 2845000
    fraiseql_arrow_bytes_sent 184320000

If you get Connection refused when connecting:

  1. Verify FraiseQL was compiled with the arrow feature:

    Terminal window
    fraiseql --version # should mention Arrow Flight
  2. Check the port is correct:

    Terminal window
    netstat -tlnp | grep 50051
  3. Check FraiseQL logs for Arrow Flight startup errors

Arrow Flight is only as fast as the underlying SQL query:

-- Check query execution time in PostgreSQL
EXPLAIN ANALYZE SELECT * FROM va_orders_daily;

If the query is slow:

  • Add indexes on filtered columns
  • Consider materialized views for complex aggregations
  • Increase max_batch_size for better throughput

If clients run out of memory:

# Use chunked reading instead of read_all()
for chunk in reader:
process(chunk.data.to_pandas()) # Process one chunk at a time

Analytics

Analytics — Analytics patterns and aggregations

NATS

NATS — Real-time event streaming