Skip to content

Live Queries & Streaming Guide

This guide covers surql's live query and real-time streaming system for building reactive applications with SurrealDB.

Table of Contents

Overview

surql provides a streaming system for subscribing to real-time data changes in SurrealDB. Live queries allow your application to receive instant notifications when records are created, updated, or deleted.

Key Features

  • Real-time notifications - Receive immediate updates on data changes
  • Async iterator support - Consume events with async for loops
  • Callback-based subscriptions - Register functions to handle events
  • Diff mode - Efficient JSON Patch format for change tracking
  • Multiple subscriptions - Monitor multiple tables simultaneously
  • Automatic cleanup - Resource management with context managers

Use Cases

  • Real-time dashboards and analytics
  • Live chat and messaging applications
  • Collaborative editing tools
  • Notification systems
  • Activity feeds and timelines
  • IoT data monitoring
  • Live inventory tracking

Prerequisites

WebSocket Connection Required

Live queries require a WebSocket connection to SurrealDB. HTTP connections do not support real-time streaming.

from surql.connection.config import ConnectionConfig

# WebSocket connection (required for live queries)
config = ConnectionConfig(
  db_url='ws://localhost:8000/rpc',  # WebSocket protocol
  db_ns='development',
  db='main',
  db_user='root',
  db_pass='root',
  enable_live_queries=True,  # Enable streaming support
)

Configuration Validation

The ConnectionConfig automatically validates that live queries are only enabled with WebSocket connections:

# This will raise a validation error
config = ConnectionConfig(
  db_url='http://localhost:8000',  # HTTP protocol
  enable_live_queries=True,  # Cannot use with HTTP
)
# ValueError: Live queries require WebSocket connection (ws:// or wss://)

Quick Start

Basic Live Query Subscription

import asyncio
from surql.connection.client import DatabaseClient
from surql.connection.config import ConnectionConfig
from surql.connection.streaming import StreamingManager

async def watch_users():
  config = ConnectionConfig(
    db_url='ws://localhost:8000/rpc',
    db_ns='development',
    db='main',
    enable_live_queries=True,
  )

  async with DatabaseClient(config) as client:
    # Access the streaming manager
    streaming = client._streaming

    # Start a live query on the 'user' table
    query = await streaming.live('user')

    print(f"Watching for changes on 'user' table...")

    # Subscribe and process events
    async for notification in streaming.subscribe(query):
      action = notification.get('action')
      result = notification.get('result')
      print(f"{action}: {result}")

if __name__ == '__main__':
  asyncio.run(watch_users())

Triggering Events

In another terminal or process, make changes to see real-time notifications:

# Create a user - triggers CREATE event
await client.create('user', {'name': 'Alice', 'email': 'alice@example.com'})

# Update a user - triggers UPDATE event
await client.update('user:alice', {'email': 'alice.new@example.com'})

# Delete a user - triggers DELETE event
await client.delete('user:alice')

Live Query API

StreamingManager

The StreamingManager class handles all live query operations. It is automatically initialized when you connect with enable_live_queries=True.

from surql.connection.streaming import StreamingManager

# Access via DatabaseClient
async with DatabaseClient(config) as client:
  streaming: StreamingManager = client._streaming

Starting a Live Query

Use the live() method to start watching a table:

# Watch all changes on a table
query = await streaming.live('user')

# Watch with diff mode for efficient change tracking
query = await streaming.live('user', diff=True)
Parameter Type Default Description
table str required Table name to watch
diff bool False Return JSON Patch diffs instead of full records

LiveQuery Object

The LiveQuery object represents an active subscription:

query = await streaming.live('user')

# Properties
print(query.query_uuid)  # UUID identifying this subscription
print(query.table)       # Table being watched
print(query.diff)        # Whether diff mode is enabled
print(query.is_active)   # Whether query is still active

Subscription Methods

Async Iterator Subscription

Use subscribe() for async iteration:

query = await streaming.live('user')

async for notification in streaming.subscribe(query):
  handle_notification(notification)

Callback-Based Subscription

Use subscribe_with_callback() for event-driven handling:

def on_change(notification):
  print(f"Change detected: {notification}")

query = await streaming.live('user')
await streaming.subscribe_with_callback(query, on_change)

# Subscription runs in background task
# Do other work while receiving events...

Stopping Live Queries

Kill Single Query

Use kill() to stop a specific subscription:

await streaming.kill(query)

Kill All Queries

Use kill_all() to stop all active subscriptions:

await streaming.kill_all()

Managing Active Queries

Get a list of all active subscriptions:

active = streaming.get_active_queries()
print(f"Active subscriptions: {len(active)}")

for query in active:
  print(f"  - {query.table} (UUID: {query.query_uuid})")

Event Handling Patterns

Notification Structure

Each notification contains:

{
  'action': 'CREATE' | 'UPDATE' | 'DELETE' | 'CLOSE',
  'result': {...}  # The affected record data
}

Handling Different Event Types

async for notification in streaming.subscribe(query):
  action = notification.get('action')
  result = notification.get('result')

  if action == 'CREATE':
    handle_create(result)
  elif action == 'UPDATE':
    handle_update(result)
  elif action == 'DELETE':
    handle_delete(result)
  elif action == 'CLOSE':
    # Query was closed by the server
    break

Using Async Callbacks

The streaming system supports both synchronous and asynchronous callbacks:

# Synchronous callback
def sync_handler(notification):
  print(f"Sync: {notification}")

# Asynchronous callback
async def async_handler(notification):
  await process_notification(notification)
  await update_cache(notification)

query = await streaming.live('user')

# Add multiple callbacks
query.add_callback(sync_handler)
query.add_callback(async_handler)

# Start subscription (callbacks are invoked automatically)
async for _ in streaming.subscribe(query):
  pass

Multiple Table Subscriptions

Subscribe to multiple tables simultaneously:

async def multi_table_watch():
  async with DatabaseClient(config) as client:
    streaming = client._streaming

    # Start watching multiple tables
    user_query = await streaming.live('user')
    post_query = await streaming.live('post')
    comment_query = await streaming.live('comment')

    # Process each stream in separate tasks
    async def watch_table(query, name):
      async for notification in streaming.subscribe(query):
        print(f"[{name}] {notification}")

    await asyncio.gather(
      watch_table(user_query, 'user'),
      watch_table(post_query, 'post'),
      watch_table(comment_query, 'comment'),
    )

Diff Mode

Diff mode returns JSON Patch operations instead of full records, making it more efficient for large documents with small changes.

Enabling Diff Mode

# Enable diff mode
query = await streaming.live('user', diff=True)

JSON Patch Format

With diff mode enabled, notifications contain patch operations:

# Without diff mode (full record)
{
  'action': 'UPDATE',
  'result': {
    'id': 'user:123',
    'name': 'Alice',
    'email': 'alice.new@example.com',
    'age': 30,
    # ... all fields
  }
}

# With diff mode (patch operations)
{
  'action': 'UPDATE',
  'result': [
    {'op': 'replace', 'path': '/email', 'value': 'alice.new@example.com'}
  ]
}

Patch Operations

JSON Patch supports these operations:

Operation Description Example
add Add a new field {'op': 'add', 'path': '/status', 'value': 'active'}
replace Replace existing value {'op': 'replace', 'path': '/email', 'value': 'new@example.com'}
remove Remove a field {'op': 'remove', 'path': '/temporary_field'}

Applying Patches

Use a JSON Patch library to apply diffs to local state:

import jsonpatch

# Local cache of records
cache = {}

async for notification in streaming.subscribe(query):
  action = notification.get('action')
  result = notification.get('result')

  if action == 'CREATE':
    # For creates, result is the full record
    cache[result['id']] = result

  elif action == 'UPDATE':
    # For updates in diff mode, result is patch operations
    record_id = notification.get('id')  # Get record ID from notification
    if record_id in cache:
      patch = jsonpatch.JsonPatch(result)
      cache[record_id] = patch.apply(cache[record_id])

  elif action == 'DELETE':
    record_id = result.get('id')
    cache.pop(record_id, None)

When to Use Diff Mode

Use diff mode when:

  • Records are large with many fields
  • Changes typically affect few fields
  • Bandwidth is limited
  • You maintain local state that needs incremental updates

Avoid diff mode when:

  • Records are small
  • You always need full record state
  • Patch application overhead exceeds bandwidth savings

Connection Requirements

WebSocket Protocol

Live queries require WebSocket connections:

# Supported protocols for live queries
'ws://localhost:8000/rpc'   # WebSocket (development)
'wss://db.example.com/rpc'  # Secure WebSocket (production)

# NOT supported for live queries
'http://localhost:8000'     # HTTP
'https://db.example.com'    # HTTPS

Connection Configuration

from surql.connection.config import ConnectionConfig

config = ConnectionConfig(
  # WebSocket URL (required)
  db_url='ws://localhost:8000/rpc',

  # Database selection
  db_ns='development',
  db='main',

  # Authentication (optional)
  db_user='root',
  db_pass='root',

  # Enable streaming (default: True)
  enable_live_queries=True,

  # Connection pool settings
  db_max_connections=10,    # Max concurrent connections
  db_timeout=30.0,          # Connection timeout in seconds

  # Retry settings for reconnection
  db_retry_max_attempts=3,  # Retry attempts on connection loss
  db_retry_min_wait=1.0,    # Minimum wait between retries
  db_retry_max_wait=10.0,   # Maximum wait between retries
  db_retry_multiplier=2.0,  # Exponential backoff multiplier
)

TLS/SSL Connections

For production, use secure WebSocket connections:

config = ConnectionConfig(
  db_url='wss://db.example.com/rpc',  # Secure WebSocket
  db_ns='production',
  db='main',
  db_user='app_user',
  db_pass='secure_password',
  enable_live_queries=True,
)

Error Handling for Streams

StreamingError Exception

The StreamingError exception is raised for streaming-related failures:

from surql.connection.streaming import StreamingError

try:
  query = await streaming.live('user')
  async for notification in streaming.subscribe(query):
    process(notification)
except StreamingError as e:
  print(f"Streaming error: {e}")
  # Handle connection loss, timeout, etc.

Common Error Scenarios

Live Query Start Failure

try:
  query = await streaming.live('nonexistent_table')
except StreamingError as e:
  print(f"Failed to start live query: {e}")

Subscription Failure

query = await streaming.live('user')

try:
  async for notification in streaming.subscribe(query):
    process(notification)
except StreamingError as e:
  print(f"Subscription failed: {e}")
  print(f"Query active: {query.is_active}")  # Will be False

Kill Query Failure

try:
  await streaming.kill(query)
except StreamingError as e:
  print(f"Failed to kill query: {e}")

Reconnection Handling

Implement reconnection logic for long-running subscriptions:

async def resilient_subscription(config, table):
  max_retries = 5
  retry_count = 0

  while retry_count < max_retries:
    try:
      async with DatabaseClient(config) as client:
        streaming = client._streaming
        query = await streaming.live(table)

        print(f"Connected, watching {table}...")
        retry_count = 0  # Reset on successful connection

        async for notification in streaming.subscribe(query):
          await process_notification(notification)

    except StreamingError as e:
      retry_count += 1
      wait_time = min(2 ** retry_count, 30)  # Exponential backoff
      print(f"Connection lost: {e}. Retrying in {wait_time}s... ({retry_count}/{max_retries})")
      await asyncio.sleep(wait_time)

  print("Max retries reached, giving up.")

Graceful Degradation

Handle streaming errors without crashing the application:

async def watch_with_fallback(config):
  async with DatabaseClient(config) as client:
    streaming = client._streaming

    try:
      query = await streaming.live('user')

      async for notification in streaming.subscribe(query):
        await handle_realtime_update(notification)

    except StreamingError:
      # Fall back to polling
      print("Live queries unavailable, falling back to polling...")
      while True:
        users = await client.select('user')
        await handle_poll_results(users)
        await asyncio.sleep(5)  # Poll every 5 seconds

Cleanup and Resource Management

Automatic Cleanup with Context Managers

Use context managers for automatic resource cleanup:

async with DatabaseClient(config) as client:
  streaming = client._streaming
  query = await streaming.live('user')

  try:
    async for notification in streaming.subscribe(query):
      if should_stop():
        break
      process(notification)
  finally:
    # Always clean up the query
    if query.is_active:
      await streaming.kill(query)
# Client disconnects automatically on context exit

Manual Cleanup

For manual lifecycle management:

client = DatabaseClient(config)
await client.connect()

streaming = client._streaming
query = await streaming.live('user')

try:
  # Process notifications...
  async for notification in streaming.subscribe(query):
    process(notification)
finally:
  # Clean up in reverse order
  await streaming.kill(query)
  await client.disconnect()

Cleanup All Active Queries

Before application shutdown:

async def shutdown(client):
  streaming = client._streaming

  # Kill all active subscriptions
  active_count = len(streaming.get_active_queries())
  await streaming.kill_all()
  print(f"Cleaned up {active_count} live queries")

  await client.disconnect()

Handling Callback Tasks

When using callback subscriptions, ensure tasks are properly cancelled:

async def run_with_cleanup():
  async with DatabaseClient(config) as client:
    streaming = client._streaming
    query = await streaming.live('user')

    await streaming.subscribe_with_callback(query, on_change)

    try:
      # Application runs...
      await run_main_application()
    finally:
      # kill() automatically cancels the subscription task
      await streaming.kill(query)

Best Practices

1. Always Clean Up Subscriptions

# Good - Explicit cleanup
query = await streaming.live('user')
try:
  async for notification in streaming.subscribe(query):
    process(notification)
finally:
  await streaming.kill(query)

# Avoid - Orphaned subscription
query = await streaming.live('user')
async for notification in streaming.subscribe(query):
  if error:
    return  # Query left active!

2. Use Diff Mode for Large Records

# Good - Efficient for large records
query = await streaming.live('document', diff=True)

# Avoid - Full records transferred on every change
query = await streaming.live('document', diff=False)

3. Handle All Event Types

# Good - Handle all cases
async for notification in streaming.subscribe(query):
  action = notification.get('action')
  if action == 'CREATE':
    handle_create(notification)
  elif action == 'UPDATE':
    handle_update(notification)
  elif action == 'DELETE':
    handle_delete(notification)
  elif action == 'CLOSE':
    break

# Avoid - Missing event types
async for notification in streaming.subscribe(query):
  if notification['action'] == 'CREATE':
    handle_create(notification)
  # Updates and deletes silently ignored!

4. Implement Reconnection Logic

# Good - Handles connection issues
async def resilient_watch():
  while True:
    try:
      async with DatabaseClient(config) as client:
        await watch_table(client)
    except (StreamingError, ConnectionError):
      await asyncio.sleep(5)
      continue

# Avoid - Crashes on connection loss
async def fragile_watch():
  async with DatabaseClient(config) as client:
    await watch_table(client)  # Crash if connection drops

5. Limit Concurrent Subscriptions

# Good - Controlled number of subscriptions
MAX_SUBSCRIPTIONS = 10
queries = []

for table in important_tables[:MAX_SUBSCRIPTIONS]:
  queries.append(await streaming.live(table))

# Avoid - Unlimited subscriptions
for table in all_tables:  # Could be thousands!
  await streaming.live(table)

6. Use Async Callbacks for I/O Operations

# Good - Async callback for database operations
async def on_user_created(notification):
  await send_welcome_email(notification['result'])
  await update_analytics(notification)

# Avoid - Blocking sync callback
def on_user_created(notification):
  send_welcome_email_sync(notification['result'])  # Blocks event loop!

7. Log Streaming Events for Debugging

import structlog

logger = structlog.get_logger()

async for notification in streaming.subscribe(query):
  logger.debug(
    'live_query_notification',
    action=notification.get('action'),
    table=query.table,
    query_uuid=str(query.query_uuid),
  )
  await process(notification)

Complete Examples

Real-Time Dashboard

import asyncio
from surql.connection.client import DatabaseClient
from surql.connection.config import ConnectionConfig
from surql.connection.streaming import StreamingError

class RealtimeDashboard:
  def __init__(self, config: ConnectionConfig):
    self.config = config
    self.stats = {
      'users': 0,
      'active_sessions': 0,
      'orders_today': 0,
    }

  async def run(self):
    async with DatabaseClient(self.config) as client:
      streaming = client._streaming

      # Initialize stats
      await self._load_initial_stats(client)

      # Start live queries
      user_query = await streaming.live('user')
      session_query = await streaming.live('session')
      order_query = await streaming.live('order')

      # Process all streams concurrently
      try:
        await asyncio.gather(
          self._watch_users(streaming, user_query),
          self._watch_sessions(streaming, session_query),
          self._watch_orders(streaming, order_query),
          self._display_loop(),
        )
      finally:
        await streaming.kill_all()

  async def _load_initial_stats(self, client):
    self.stats['users'] = len(await client.select('user'))
    self.stats['active_sessions'] = len(await client.select('session'))

  async def _watch_users(self, streaming, query):
    async for notification in streaming.subscribe(query):
      action = notification.get('action')
      if action == 'CREATE':
        self.stats['users'] += 1
      elif action == 'DELETE':
        self.stats['users'] -= 1

  async def _watch_sessions(self, streaming, query):
    async for notification in streaming.subscribe(query):
      action = notification.get('action')
      if action == 'CREATE':
        self.stats['active_sessions'] += 1
      elif action == 'DELETE':
        self.stats['active_sessions'] -= 1

  async def _watch_orders(self, streaming, query):
    async for notification in streaming.subscribe(query):
      if notification.get('action') == 'CREATE':
        self.stats['orders_today'] += 1

  async def _display_loop(self):
    while True:
      print(f"\rUsers: {self.stats['users']} | "
            f"Sessions: {self.stats['active_sessions']} | "
            f"Orders: {self.stats['orders_today']}", end='')
      await asyncio.sleep(1)

# Usage
if __name__ == '__main__':
  config = ConnectionConfig(
    db_url='ws://localhost:8000/rpc',
    db_ns='production',
    db='analytics',
    enable_live_queries=True,
  )
  dashboard = RealtimeDashboard(config)
  asyncio.run(dashboard.run())

Chat Application

import asyncio
from datetime import datetime
from surql.connection.client import DatabaseClient
from surql.connection.config import ConnectionConfig

class ChatRoom:
  def __init__(self, config: ConnectionConfig, room_id: str, user_id: str):
    self.config = config
    self.room_id = room_id
    self.user_id = user_id

  async def join(self):
    async with DatabaseClient(self.config) as client:
      streaming = client._streaming

      # Watch the messages table for this room
      query = await streaming.live('message')

      print(f"Joined room {self.room_id}. Type 'quit' to exit.")

      # Run message receiver and sender concurrently
      try:
        await asyncio.gather(
          self._receive_messages(streaming, query),
          self._send_messages(client),
        )
      finally:
        await streaming.kill(query)

  async def _receive_messages(self, streaming, query):
    async for notification in streaming.subscribe(query):
      if notification.get('action') != 'CREATE':
        continue

      message = notification.get('result', {})

      # Filter to this room
      if message.get('room_id') != self.room_id:
        continue

      # Don't show own messages (already displayed when sent)
      if message.get('sender_id') == self.user_id:
        continue

      timestamp = message.get('created_at', '')
      sender = message.get('sender_id', 'Unknown')
      content = message.get('content', '')

      print(f"\n[{timestamp}] {sender}: {content}")

  async def _send_messages(self, client):
    while True:
      # Get user input (run in executor to not block)
      content = await asyncio.get_event_loop().run_in_executor(
        None, input, ''
      )

      if content.lower() == 'quit':
        break

      if not content.strip():
        continue

      # Send message
      await client.create('message', {
        'room_id': self.room_id,
        'sender_id': self.user_id,
        'content': content,
        'created_at': datetime.now().isoformat(),
      })

      print(f"[You]: {content}")

# Usage
if __name__ == '__main__':
  config = ConnectionConfig(
    db_url='ws://localhost:8000/rpc',
    db_ns='chat',
    db='main',
    enable_live_queries=True,
  )
  chat = ChatRoom(config, room_id='general', user_id='alice')
  asyncio.run(chat.join())

Inventory Sync with Diff Mode

import asyncio
import jsonpatch
from surql.connection.client import DatabaseClient
from surql.connection.config import ConnectionConfig

class InventorySync:
  def __init__(self, config: ConnectionConfig):
    self.config = config
    self.inventory: dict[str, dict] = {}

  async def run(self):
    async with DatabaseClient(self.config) as client:
      streaming = client._streaming

      # Load initial inventory
      products = await client.select('product')
      for product in products:
        self.inventory[product['id']] = product

      print(f"Loaded {len(self.inventory)} products")

      # Watch with diff mode for efficient updates
      query = await streaming.live('product', diff=True)

      try:
        async for notification in streaming.subscribe(query):
          await self._handle_change(notification)
      finally:
        await streaming.kill(query)

  async def _handle_change(self, notification):
    action = notification.get('action')
    result = notification.get('result')

    if action == 'CREATE':
      # Full record on create
      product_id = result['id']
      self.inventory[product_id] = result
      print(f"+ Added: {result.get('name')} (Stock: {result.get('stock', 0)})")

    elif action == 'UPDATE':
      # Apply JSON patch for updates
      product_id = notification.get('id')

      if product_id in self.inventory:
        try:
          patch = jsonpatch.JsonPatch(result)
          self.inventory[product_id] = patch.apply(self.inventory[product_id])

          # Check what changed
          for op in result:
            path = op.get('path', '')
            if '/stock' in path:
              new_stock = self.inventory[product_id].get('stock')
              print(f"~ Stock updated: {product_id} -> {new_stock}")
            elif '/price' in path:
              new_price = self.inventory[product_id].get('price')
              print(f"~ Price updated: {product_id} -> ${new_price}")

        except jsonpatch.JsonPatchException as e:
          print(f"Patch error for {product_id}: {e}")

    elif action == 'DELETE':
      product_id = result.get('id')
      if product_id in self.inventory:
        name = self.inventory[product_id].get('name', 'Unknown')
        del self.inventory[product_id]
        print(f"- Removed: {name}")

  def get_low_stock(self, threshold: int = 10) -> list[dict]:
    return [
      p for p in self.inventory.values()
      if p.get('stock', 0) < threshold
    ]

# Usage
if __name__ == '__main__':
  config = ConnectionConfig(
    db_url='ws://localhost:8000/rpc',
    db_ns='ecommerce',
    db='main',
    enable_live_queries=True,
  )
  sync = InventorySync(config)
  asyncio.run(sync.run())

API Reference

StreamingManager

Method Description
live(table, diff=False) Start a live query on a table
subscribe(query) Subscribe to query as async iterator
subscribe_with_callback(query, callback) Subscribe with callback function
kill(query) Stop a live query
kill_all() Stop all active queries
get_active_queries() Get list of active queries

LiveQuery

Property Type Description
query_uuid UUID Unique identifier for the subscription
table str Table being watched
diff bool Whether diff mode is enabled
is_active bool Whether query is still active
Method Description
add_callback(callback) Add notification callback
remove_callback(callback) Remove notification callback
notify(notification) Notify all callbacks (internal)
deactivate() Mark query as inactive

StreamingError

from surql.connection.streaming import StreamingError

# Raised when:
# - Live query fails to start
# - Subscription encounters an error
# - Kill operation fails

Notification Format

{
  'action': str,  # 'CREATE', 'UPDATE', 'DELETE', or 'CLOSE'
  'result': dict, # Record data or patch operations (diff mode)
}

Next Steps