Documentation

Live Queries & Streaming

Push database events to applications in real time. LISTEN/NOTIFY, WebSocket live queries at 29.2M+ notifications/sec, CDC streaming, continuous streaming views, and Natural Language SQL.

LISTEN / NOTIFY

LISTEN/NOTIFY is the standard PostgreSQL-wire pub/sub mechanism. Sessions subscribe to named channels; any session can publish a message to a channel. All subscribers receive the message asynchronously. Absolute DB is wire-compatible with the PostgreSQL NotificationResponse message format, so any library that handles PG notifications works without modification.

PropertyValue
Max payload size8,000 bytes
Per-session notification queue256 entries
Channel name max length63 characters
Wire formatPostgreSQL NotificationResponse ('A' message)
Fan-outAll sessions subscribed to a channel receive the message
sql — LISTEN and NOTIFY
-- Session A: subscribe to a channel
LISTEN order_updates;

-- Session B: publish a notification to all listeners
NOTIFY order_updates, '{"order_id": 12345, "status": "shipped"}';

-- Session A receives asynchronously:
-- Notification: channel "order_updates"
--              payload: {"order_id": 12345, "status": "shipped"}

-- Trigger-based notifications (auto-notify on table change)
CREATE OR REPLACE FUNCTION notify_order_change()
RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify('order_updates',
        json_build_object(
            'op',       TG_OP,
            'order_id', NEW.id,
            'status',   NEW.status
        )::text
    );
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_order_notify
AFTER INSERT OR UPDATE ON orders
FOR EACH ROW EXECUTE FUNCTION notify_order_change();

-- Unsubscribe
UNLISTEN order_updates;
UNLISTEN *;  -- unsubscribe from all channels

Python (psycopg2) Integration

python — LISTEN/NOTIFY
import psycopg2, select

conn = psycopg2.connect("postgresql://absdb:pw@localhost:5433/mydb")
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
cur.execute("LISTEN order_updates;")

print("Waiting for notifications...")
while True:
    select.select([conn], [], [])
    conn.poll()
    while conn.notifies:
        n = conn.notifies.pop(0)
        print(f"Got notification: channel={n.channel} payload={n.payload}")

Node.js (node-postgres) Integration

javascript — LISTEN/NOTIFY
const { Client } = require('pg');
const client = new Client({ host: 'localhost', port: 5433, database: 'mydb' });
await client.connect();

client.on('notification', (msg) => {
  console.log('Channel:', msg.channel);
  console.log('Payload:', JSON.parse(msg.payload));
});

await client.query('LISTEN order_updates');

WebSocket Live Queries

Absolute DB exposes a WebSocket endpoint at ws://host:8080/ws (or wss:// with TLS). Clients send SQL queries and receive result-set change events in real time as the underlying data changes. Throughput is 29.2 million+ notifications/sec on a single server via a lock-free write ring.

javascript — WebSocket live query (browser)
const ws = new WebSocket('wss://mydb.example.com:8080/ws');

ws.onopen = () => {
  // Subscribe to live results of a query
  ws.send(JSON.stringify({
    type: 'subscribe',
    query: "SELECT id, status, total FROM orders WHERE status = 'pending'",
    id: 'pending-orders'
  }));
};

ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);
  if (msg.type === 'data') {
    console.log('Initial results:', msg.rows);
  } else if (msg.type === 'change') {
    // op: 'insert' | 'update' | 'delete'
    console.log(`Row ${msg.op}:`, msg.row);
    updateUI(msg.op, msg.row);
  }
};

// Unsubscribe
ws.send(JSON.stringify({ type: 'unsubscribe', id: 'pending-orders' }));
WebSocket live queries are powered by the CDC WAL tap — the same change stream that drives Debezium output. There is no polling; events arrive within milliseconds of the underlying write being committed.

CDC / Streaming SQL

Change Data Capture (CDC) taps the Write-Ahead Log and delivers row-level change events (INSERT, UPDATE, DELETE) in Debezium-compatible JSON or Protobuf binary format. Delivery is via WebSocket or gRPC bidirectional streaming. A 100 MB ring buffer decouples producer and consumer speeds.

sql — CDC subscriptions
-- Subscribe to all changes on a table
SUBSCRIBE TO TABLE orders;

-- Subscribe with a predicate filter
SUBSCRIBE TO TABLE orders
WHERE status IN ('pending', 'processing');

-- Subscribe to all changes in a database from a specific LSN
SUBSCRIBE TO DATABASE mydb
STARTING AT LSN 12345;

-- CDC cursor management
SELECT * FROM _cdc_cursors;

-- Acknowledge processed events (advance cursor)
SELECT absdb_cdc_ack('cursor_id', lsn => 99999);

Debezium JSON Event Format

json — CDC event payload
{
  "op": "u",
  "ts_ms": 1743897600000,
  "source": {
    "db": "mydb",
    "table": "orders",
    "lsn": 98765
  },
  "before": { "id": 42, "status": "pending", "total": 199.99 },
  "after":  { "id": 42, "status": "shipped", "total": 199.99 }
}
PropertyValue
Ring buffer size100 MB per subscription
Delivery semanticsAt-least-once with ACK cursor
Output formatsDebezium JSON, Protobuf binary
Delivery channelsWebSocket (ws://host:8080/cdc), gRPC bidirectional

Real-Time Streaming Views

Streaming views continuously evaluate a SELECT query against an event stream. Each view maintains a 65,536-event ring buffer and a background processor thread. Results are delivered to subscribers via NOTIFY, a result table, or WebSocket — within milliseconds of new events arriving.

Streaming views support sliding windows, tumbling windows, and watermark-based out-of-order event handling. Late-arriving events within the watermark tolerance are included in the correct window.

sql — streaming views
-- Create a streaming view: 1-minute sliding window on order totals
CREATE STREAMING VIEW order_totals_1m AS
SELECT
    date_trunc('minute', created_at) AS minute,
    count(*)                          AS order_count,
    sum(total)                        AS total_revenue
FROM orders
WHERE created_at >= now() - INTERVAL '1 minute'
GROUP BY 1;

-- Create a streaming view with out-of-order watermark (5 seconds)
CREATE STREAMING VIEW sensor_avg AS
SELECT
    device_id,
    avg(value) AS avg_value,
    window_start,
    window_end
FROM sensor_readings
TUMBLING WINDOW '30 seconds'
WATERMARK '5 seconds'
GROUP BY device_id, window_start, window_end;

-- List all active streaming views
SHOW STREAMING VIEWS;

-- Drop a streaming view
DROP STREAMING VIEW order_totals_1m;

-- Subscribe to a streaming view via NOTIFY
LISTEN streaming_view_order_totals_1m;
PropertyValue
Ring buffer per view65,536 events
ProcessorBackground thread, one per view
Window typesSliding, tumbling (configurable interval)
Out-of-order handlingWatermark (configurable tolerance)
Delivery modesNOTIFY channel, result table, WebSocket
Max concurrent views65,536 per server

Natural Language SQL (NL2SQL)

The NL2SQL() SQL function translates a plain English question into a SQL query using a schema-aware rule engine. It runs entirely in-database — no external LLM API call required. The rule engine recognises common patterns: aggregation queries, top-N ranking, JOIN conditions, date phrases, comparison operators, ORDER BY, and more.

sql — NL2SQL
-- Convert a natural language question to SQL
SELECT NL2SQL(
    question       => 'What are the top 10 customers by total order value this month?',
    schema_context => 'orders(id, customer_id, total, created_at), customers(id, name, email)'
);

-- Result: generated SQL query as text
-- SELECT c.name, sum(o.total) AS total_value
-- FROM orders o JOIN customers c ON c.id = o.customer_id
-- WHERE o.created_at >= date_trunc('month', now())
-- GROUP BY c.id, c.name
-- ORDER BY total_value DESC
-- LIMIT 10

-- Execute the NL2SQL result directly
EXECUTE IMMEDIATE NL2SQL(
    question       => 'How many orders were placed yesterday?',
    schema_context => 'orders(id, created_at, status)'
);

For production AI applications, NL2SQL can be combined with an ONNX model backend via the ML serving layer for more sophisticated natural language understanding.

Framework Integrations

Rails ActionCable

Rails ActionCable uses PostgreSQL LISTEN/NOTIFY as its pub/sub adapter. Point ActionCable at Absolute DB's PostgreSQL wire port and it works without modification.

yaml — Rails ActionCable config (config/cable.yml)
production:
  adapter: postgresql
  url: postgresql://absdb:password@localhost:5433/myapp_production

Django Channels

Django Channels supports PostgreSQL as a channel layer backend. Use the channels_postgres package with Absolute DB's PostgreSQL wire port.

python — Django Channels config (settings.py)
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_postgres.db.DatabaseChannelLayer",
        "CONFIG": {
            "ENGINE":   "django.db.backends.postgresql",
            "NAME":     "myapp",
            "USER":     "absdb",
            "PASSWORD": "password",
            "HOST":     "localhost",
            "PORT":     "5433",
        },
    },
}

SQL Reference

Statement / FunctionDescription
LISTEN channelSubscribe to a notification channel
UNLISTEN channelUnsubscribe from a channel (UNLISTEN * = all)
NOTIFY channel [, payload]Send a notification to all channel subscribers
pg_notify(channel, payload)Function form of NOTIFY (usable in triggers)
SUBSCRIBE TO TABLE t [WHERE ...]Start a CDC subscription on a table
SUBSCRIBE TO DATABASE d [STARTING AT LSN n]Start a CDC subscription on a whole database
CREATE STREAMING VIEW name AS SELECT ...Create a continuously evaluated streaming view
SHOW STREAMING VIEWSList all active streaming views with status
DROP STREAMING VIEW nameStop and remove a streaming view
NL2SQL(question, schema_context)Convert a natural language question to SQL
EXECUTE IMMEDIATE sql_textParse and execute a dynamically generated SQL string

Continue Reading

PostgreSQL Wire Protocol CDC & Streaming WebSocket API

Ready to run Absolute DB?

~154 KB binary  ·  zero external dependencies  ·  2,737 tests passing  ·  SQL:2023 100%

Download Free → View Pricing All Docs