Skip to content

Query Builder & ORM Guide

This guide covers querying and manipulating data using surql's type-safe query builder and ORM functions.

Table of Contents

Overview

surql provides two ways to interact with your database:

  1. High-level CRUD functions - Simple async functions for common operations
  2. Query Builder - Composable query construction with type safety

Both approaches integrate with Pydantic for data validation.

Key Features

  • Type-safe queries - Use Pydantic models for validation
  • Async-first - All operations are asynchronous
  • Composable - Build complex queries through composition
  • Graph-aware - Native support for SurrealDB's graph features

CRUD Operations

Setup

from pydantic import BaseModel, EmailStr
from surql.connection.client import get_client
from surql.settings import get_db_config

class User(BaseModel):
  username: str
  email: EmailStr
  age: int

# Get database client
config = get_db_config()

Create Records

Create Single Record

from surql.query.crud import create_record

async def create_user():
  async with get_client(config) as client:
    user = await create_record(
      'user',
      User(
        username='alice',
        email='alice@example.com',
        age=30,
      ),
      client=client,
    )

    print(f"Created user: {user['id']}")
    return user

Create Multiple Records

from surql.query.crud import create_records

async def create_multiple_users():
  async with get_client(config) as client:
    users_data = [
      User(username='alice', email='alice@example.com', age=30),
      User(username='bob', email='bob@example.com', age=25),
      User(username='charlie', email='charlie@example.com', age=35),
    ]

    users = await create_records('user', users_data, client=client)
    print(f"Created {len(users)} users")
    return users

Read Records

Get Single Record by ID

from surql.query.crud import get_record

async def get_user(user_id: str):
  async with get_client(config) as client:
    user = await get_record('user', user_id, User, client=client)

    if user:
      print(f"Found user: {user.username}")
    else:
      print("User not found")

    return user

Query Multiple Records

from surql.query.crud import query_records

async def get_adult_users():
  async with get_client(config) as client:
    users = await query_records(
      'user',
      User,
      conditions=['age >= 18'],
      order_by=('age', 'DESC'),
      limit=10,
      client=client,
    )

    for user in users:
      print(f"{user.username}: {user.age} years old")

    return users

Get First/Last Record

from surql.query.crud import first, last

async def get_newest_user():
  async with get_client(config) as client:
    user = await first(
      'user',
      User,
      order_by=('created_at', 'DESC'),
      client=client,
    )
    return user

async def get_oldest_user():
  async with get_client(config) as client:
    user = await first(
      'user',
      User,
      order_by=('created_at', 'ASC'),
      client=client,
    )
    return user

Count Records

from surql.query.crud import count_records

async def count_active_users():
  async with get_client(config) as client:
    total = await count_records('user', client=client)
    active = await count_records('user', 'is_active = true', client=client)

    print(f"Total users: {total}")
    print(f"Active users: {active}")

    return active

Check if Record Exists

from surql.query.crud import exists

async def user_exists(user_id: str):
  async with get_client(config) as client:
    if await exists('user', user_id, client=client):
      print("User exists")
    else:
      print("User not found")

Update Records

Update Entire Record

from surql.query.crud import update_record

async def update_user(user_id: str):
  async with get_client(config) as client:
    updated = await update_record(
      'user',
      user_id,
      User(
        username='alice',
        email='alice.new@example.com',
        age=31,
      ),
      client=client,
    )

    print(f"Updated user: {updated}")
    return updated

Merge Partial Data

from surql.query.crud import merge_record

async def update_user_email(user_id: str, new_email: str):
  async with get_client(config) as client:
    updated = await merge_record(
      'user',
      user_id,
      {'email': new_email},
      client=client,
    )

    print(f"Updated email: {updated}")
    return updated

Delete Records

Delete Single Record

from surql.query.crud import delete_record

async def delete_user(user_id: str):
  async with get_client(config) as client:
    await delete_record('user', user_id, client=client)
    print("User deleted")

Delete Multiple Records

from surql.query.crud import delete_records

async def delete_inactive_users():
  async with get_client(config) as client:
    await delete_records(
      'user',
      'is_active = false AND last_login < time::now() - 1y',
      client=client,
    )
    print("Inactive users deleted")

Query Builder

The query builder provides a functional approach to building complex queries.

Basic Query Construction

from surql.query.builder import Query

# Build a query
query = (
  Query()
    .select(['username', 'email', 'age'])
    .from_table('user')
    .where('age >= 18')
    .order_by('age', 'DESC')
    .limit(10)
)

# Convert to SurrealQL
sql = query.to_surql()
print(sql)
# SELECT username, email, age FROM user WHERE age >= 18 ORDER BY age DESC LIMIT 10

Query Execution

from surql.query.executor import fetch_all, fetch_one

async def execute_query():
  async with get_client(config) as client:
    query = Query().select().from_table('user').where('age >= 18')

    # Fetch all results
    users = await fetch_all(query, User, client)

    for user in users:
      print(user.username)

Select Queries

# Select all fields
Query().select().from_table('user')

# Select specific fields
Query().select(['username', 'email']).from_table('user')

# Select with alias
Query().select(['username AS name', 'email']).from_table('user')

# Select with function
Query().select(['count()']).from_table('user')

# Select with aggregation
Query().select([
  'category',
  'count() AS total',
  'avg(price) AS avg_price'
]).from_table('product').group_by(['category'])

Insert Queries

# Insert single record
Query().insert('user', {
  'username': 'alice',
  'email': 'alice@example.com',
  'age': 30,
})

# Insert multiple records
Query().insert('user', [
  {'username': 'alice', 'email': 'alice@example.com'},
  {'username': 'bob', 'email': 'bob@example.com'},
])

Update Queries

# Update all records
Query().update('user').set({'is_active': True})

# Update with condition
(
  Query()
    .update('user')
    .set({'is_active': False})
    .where('last_login < time::now() - 30d')
)

# Update specific record
Query().update('user:alice').set({'email': 'new@example.com'})

Delete Queries

# Delete all records
Query().delete('user')

# Delete with condition
Query().delete('user').where('is_active = false')

# Delete specific record
Query().delete('user:alice')

Filtering and Conditions

Simple Conditions

# String comparison
Query().select().from_table('user').where('status = "active"')

# Numeric comparison
Query().select().from_table('user').where('age >= 18')

# Boolean
Query().select().from_table('user').where('is_verified = true')

# NULL check
Query().select().from_table('user').where('deleted_at IS NULL')

Multiple Conditions

# AND conditions
query = (
  Query()
    .select()
    .from_table('user')
    .where('age >= 18')
    .where('is_active = true')
    .where('is_verified = true')
)

# OR conditions (use raw SQL)
query = Query().select().from_table('user').where(
  'age < 18 OR (age >= 18 AND is_verified = true)'
)

Using Operators

from surql.types.operators import eq, gt, lt, gte, lte, contains, in_list

# Equality
Query().select().from_table('user').where(eq('status', 'active'))

# Comparison
Query().select().from_table('user').where(gte('age', 18))

# String contains
Query().select().from_table('user').where(contains('email', '@example.com'))

# IN list
Query().select().from_table('user').where(
  in_list('status', ['active', 'pending'])
)

Range Queries

# Between dates
Query().select().from_table('post').where(
  'created_at >= "2024-01-01" AND created_at <= "2024-12-31"'
)

# Price range
Query().select().from_table('product').where(
  'price >= 10 AND price <= 100'
)

Pattern Matching

# Contains
Query().select().from_table('user').where('email CONTAINS "@example.com"')

# Starts with
Query().select().from_table('user').where('username ~ "^admin"')

# Case-insensitive
Query().select().from_table('user').where('LOWERCASE(email) CONTAINS "gmail"')

Graph Traversal

SurrealDB's graph features allow traversing relationships.

Basic Graph Queries

# Get posts liked by a user
async def get_user_likes(user_id: str):
  async with get_client(config) as client:
    query = f"SELECT * FROM {user_id}->likes->post"
    result = await client.execute(query)
    return result

# Get users who liked a post
async def get_post_likes(post_id: str):
  async with get_client(config) as client:
    query = f"SELECT * FROM {post_id}<-likes<-user"
    result = await client.execute(query)
    return result

Multi-Hop Traversal

# Get followers of followers
async def get_followers_of_followers(user_id: str):
  async with get_client(config) as client:
    query = f"SELECT * FROM {user_id}<-follows<-user<-follows<-user"
    result = await client.execute(query)
    return result

# Get friend suggestions (friends of friends)
async def get_friend_suggestions(user_id: str):
  async with get_client(config) as client:
    query = f"""
      SELECT * FROM {user_id}->follows->user->follows->user
      WHERE id != {user_id}
    """
    result = await client.execute(query)
    return result

Graph with Filters

# Get active followers
async def get_active_followers(user_id: str):
  async with get_client(config) as client:
    query = f"""
      SELECT * FROM {user_id}<-follows<-user
      WHERE is_active = true
      ORDER BY followed_at DESC
    """
    result = await client.execute(query)
    return result

# Get recent likes
async def get_recent_likes(user_id: str):
  async with get_client(config) as client:
    query = f"""
      SELECT * FROM {user_id}->likes->post
      WHERE liked_at > time::now() - 7d
      ORDER BY liked_at DESC
    """
    result = await client.execute(query)
    return result

Creating Relationships

async def create_follow(follower_id: str, followed_id: str):
  async with get_client(config) as client:
    result = await client.execute(
      f"RELATE {follower_id}->follows->{followed_id}"
    )
    return result

async def create_like(user_id: str, post_id: str, reaction: str = 'like'):
  async with get_client(config) as client:
    result = await client.execute(
      f"RELATE {user_id}->likes->{post_id} SET reaction = '{reaction}'"
    )
    return result

Deleting Relationships

async def delete_follow(follower_id: str, followed_id: str):
  async with get_client(config) as client:
    result = await client.execute(
      f"DELETE {follower_id}->follows WHERE out = {followed_id}"
    )
    return result

Result Handling

Understanding SurrealDB Result Formats

SurrealDB returns results in two different formats depending on the method used:

  1. Flat format (from db.select()): [{"id": "...", ...}]
  2. Nested format (from db.query()): [{"result": [{"id": "...", ...}]}]

surql automatically handles both formats in its high-level functions (fetch_one(), fetch_all(), etc.), but provides utilities for manual extraction when needed.

Automatic Result Extraction

High-level query functions handle result extraction automatically:

from surql.query.executor import fetch_all, fetch_one

async def query_users():
  async with get_client(config) as client:
    query = Query().select().from_table('user')

    # Automatically extracts and validates results
    users = await fetch_all(query, User, client)
    # Returns list[User] - no manual extraction needed

    return users

Manual Result Extraction Utilities

For raw queries or when you need manual control:

Extract All Results

from surql.query.results import extract_result

async def raw_query_example():
  async with get_client(config) as client:
    # db.query() returns nested format
    result = await client.execute('SELECT * FROM user WHERE age > 18')

    # Extract to flat list of dicts
    records = extract_result(result)
    # [{"id": "user:123", "name": "Alice", ...}, ...]

    return records

Extract Single Record

from surql.query.results import extract_one

async def get_user_raw(user_id: str):
  async with get_client(config) as client:
    result = await client.execute(f'SELECT * FROM {user_id}')

    # Get first record or None
    user = extract_one(result)
    # {"id": "user:123", "name": "Alice"} or None

    return user

Extract Scalar Values

For aggregate queries (COUNT, SUM, AVG, etc.):

from surql.query.results import extract_scalar

async def count_users():
  async with get_client(config) as client:
    result = await client.execute('SELECT count() as total FROM user GROUP ALL')

    # Extract scalar value with default
    total = extract_scalar(result, 'total', default=0)
    # Returns: 42 (or 0 if no results)

    return total

async def get_stats():
  async with get_client(config) as client:
    result = await client.execute('''
      SELECT
        count() as total,
        avg(age) as avg_age,
        min(age) as min_age,
        max(age) as max_age
      FROM user
      GROUP ALL
    ''')

    # Extract multiple scalar values
    return {
      'total': extract_scalar(result, 'total', 0),
      'avg_age': extract_scalar(result, 'avg_age', 0.0),
      'min_age': extract_scalar(result, 'min_age', 0),
      'max_age': extract_scalar(result, 'max_age', 0),
    }

Check if Results Exist

from surql.query.results import has_results

async def user_exists(email: str):
  async with get_client(config) as client:
    result = await client.execute(
      'SELECT * FROM user WHERE email = $email LIMIT 1',
      {'email': email}
    )

    # Check if any records returned
    return has_results(result)

Complete Example

from surql.query.results import extract_result, extract_one, extract_scalar, has_results

async def comprehensive_query_example():
  async with get_client(config) as client:
    # Check if users exist
    check_result = await client.execute('SELECT * FROM user LIMIT 1')
    if not has_results(check_result):
      print("No users found")
      return

    # Get all active users
    users_result = await client.execute('''
      SELECT * FROM user WHERE is_active = true
    ''')
    users = extract_result(users_result)
    print(f"Found {len(users)} active users")

    # Get specific user
    user_result = await client.execute('SELECT * FROM user:alice')
    user = extract_one(user_result)
    if user:
      print(f"User: {user['name']}")

    # Get user count
    count_result = await client.execute('''
      SELECT count() as total FROM user GROUP ALL
    ''')
    total = extract_scalar(count_result, 'total', 0)
    print(f"Total users: {total}")

When to Use Each Utility

Utility Use Case Returns
extract_result() Get all records from any query list[dict]
extract_one() Get single record or None dict \| None
extract_scalar() Get single value from aggregate Any
has_results() Check if results exist bool

Migration from Direct surrealdb-py

If you're migrating from direct surrealdb-py usage:

# Before (with manual format handling)
result = await db.query('SELECT * FROM user')
if result and isinstance(result[0], dict) and 'result' in result[0]:
  users = result[0]['result']
else:
  users = result

# After (with surql utilities)
from surql.query.results import extract_result

result = await db.query('SELECT * FROM user')
users = extract_result(result)  # Handles both formats

Transactions

Execute multiple operations atomically.

Using Context Manager

from surql.connection.transaction import transaction

async def transfer_credits(from_user: str, to_user: str, amount: int):
  async with get_client(config) as client:
    async with transaction(client):
      # Deduct from sender
      await client.execute(
        f"UPDATE {from_user} SET credits -= {amount}"
      )

      # Add to receiver
      await client.execute(
        f"UPDATE {to_user} SET credits += {amount}"
      )

      # Log transaction
      await client.execute(
        f"""
        CREATE transaction SET
          from = {from_user},
          to = {to_user},
          amount = {amount},
          timestamp = time::now()
        """
      )

Manual Transaction Control

async def manual_transaction():
  async with get_client(config) as client:
    try:
      # Begin transaction
      await client.execute('BEGIN TRANSACTION')

      # Perform operations
      await client.execute('CREATE user SET name = "Alice"')
      await client.execute('CREATE post SET title = "Hello"')

      # Commit
      await client.execute('COMMIT TRANSACTION')

    except Exception as e:
      # Rollback on error
      await client.execute('CANCEL TRANSACTION')
      raise e

Named Connections

The connection registry enables running queries against different database connections for scenarios like read replicas, analytics databases, or multi-tenant architectures.

Query Builder with Named Connections

from surql.connection.registry import get_registry
from surql.query.builder import Query
from surql.query.executor import fetch_all

async def query_with_registry():
  """Execute queries using different named connections."""
  registry = get_registry()

  # Build query
  query = Query().select().from_table('user').where('is_active = true')

  # Execute on read replica for performance
  replica = registry.get('replica')
  users = await fetch_all(query, User, replica)

  return users

Read/Write Splitting Pattern

from surql.connection.registry import get_registry
from surql.query.crud import create_record, query_records

async def read_write_split():
  """Route writes to primary, reads to replica."""
  registry = get_registry()
  primary = registry.get('primary')
  replica = registry.get('replica')

  # Write to primary
  user = await create_record(
    'user',
    User(username='alice', email='alice@example.com', age=30),
    client=primary,
  )

  # Read from replica (may have slight delay)
  all_users = await query_records('user', User, client=replica)

  return user, all_users

Analytics Queries

async def run_analytics():
  """Run heavy queries on dedicated analytics database."""
  registry = get_registry()
  analytics = registry.get('analytics')

  # Complex aggregation on analytics db
  result = await analytics.execute('''
    SELECT
      date::floor(created_at, 1d) as day,
      count() as signups,
      count() WHERE is_active = true as active_signups
    FROM user
    GROUP BY day
    ORDER BY day DESC
    LIMIT 30
  ''')

  return result

Dynamic Connection Selection

async def query_tenant_database(tenant_id: str):
  """Query specific tenant database."""
  registry = get_registry()

  # Connection name based on tenant
  connection_name = f'tenant_{tenant_id}'

  try:
    client = registry.get(connection_name)
  except RegistryError:
    # Register on-demand if not exists
    config = get_tenant_config(tenant_id)
    client = await registry.register(connection_name, config)

  users = await query_records('user', User, client=client)
  return users

Context Override for Testing

from surql.connection.context import connection_override

async def test_with_mock_connection(mock_client):
  """Override connection for testing."""
  async with connection_override(mock_client):
    # All operations use mock_client
    users = await query_records('user', User)
    assert len(users) == 0  # Uses mock data

Type Safety

Using Pydantic Models

from pydantic import BaseModel, Field, field_validator
from typing import Optional
from datetime import datetime

class User(BaseModel):
  username: str = Field(min_length=3, max_length=20)
  email: str
  age: int = Field(ge=0, le=150)
  is_active: bool = True
  created_at: Optional[datetime] = None

  @field_validator('email')
  @classmethod
  def validate_email(cls, v: str) -> str:
    if '@' not in v:
      raise ValueError('Invalid email')
    return v.lower()

# Type-safe CRUD
async def type_safe_create():
  async with get_client(config) as client:
    # Pydantic validates on creation
    user = User(
      username='alice',
      email='alice@example.com',
      age=30,
    )

    # Type-safe create
    result = await create_record('user', user, client=client)

    # Type-safe read
    fetched = await get_record('user', 'alice', User, client=client)

    # TypedDict hints work in IDE
    if fetched:
      print(fetched.username)  # IDE autocomplete works

Generic Queries

from typing import TypeVar, Generic
from pydantic import BaseModel

T = TypeVar('T', bound=BaseModel)

async def get_all_records(table: str, model: type[T]) -> list[T]:
  """Generic function to fetch all records of any type."""
  async with get_client(config) as client:
    return await query_records(table, model, client=client)

# Use with different models
users = await get_all_records('user', User)
posts = await get_all_records('post', Post)

Advanced Queries

Aggregation

# Count by category
async def count_by_category():
  async with get_client(config) as client:
    query = """
      SELECT
        category,
        count() AS total,
        avg(price) AS avg_price,
        min(price) AS min_price,
        max(price) AS max_price
      FROM product
      GROUP BY category
      ORDER BY total DESC
    """
    result = await client.execute(query)
    return result

Subqueries

# Get users with above-average age
async def users_above_avg_age():
  async with get_client(config) as client:
    query = """
      SELECT * FROM user
      WHERE age > (SELECT math::mean(age) FROM user)
    """
    result = await client.execute(query)
    return result

Joins (Graph Relations)

# Get posts with author information
async def posts_with_authors():
  async with get_client(config) as client:
    query = """
      SELECT
        *,
        author.* AS author_info
      FROM post
      FETCH author
    """
    result = await client.execute(query)
    return result
# Search across indexed fields
async def search_posts(search_term: str):
  async with get_client(config) as client:
    query = f"""
      SELECT * FROM post
      WHERE title @@ '{search_term}' OR content @@ '{search_term}'
      ORDER BY created_at DESC
    """
    result = await client.execute(query)
    return result

Pagination

async def paginate_users(page: int = 1, page_size: int = 10):
  async with get_client(config) as client:
    offset = (page - 1) * page_size

    users = await query_records(
      'user',
      User,
      order_by=('created_at', 'DESC'),
      limit=page_size,
      offset=offset,
      client=client,
    )

    total = await count_records('user', client=client)

    return {
      'users': users,
      'page': page,
      'page_size': page_size,
      'total': total,
      'total_pages': (total + page_size - 1) // page_size,
    }

Batch Operations

async def batch_update_status(user_ids: list[str], status: str):
  async with get_client(config) as client:
    for user_id in user_ids:
      await merge_record('user', user_id, {'status': status}, client=client)

# Or use SurrealQL for efficiency
async def batch_update_efficient(status: str):
  async with get_client(config) as client:
    query = f"""
      UPDATE user SET status = '{status}'
      WHERE id IN {user_ids}
    """
    await client.execute(query)

Best Practices

1. Use Type-Safe Models

# Good - Type-safe with Pydantic
class User(BaseModel):
  username: str
  email: str

user = await get_record('user', 'alice', User, client)

# Avoid - Untyped dictionaries
user = await client.select('user:alice')

2. Reuse Database Connections

# Good - Reuse connection
async with get_client(config) as client:
  user1 = await create_record('user', user_data1, client=client)
  user2 = await create_record('user', user_data2, client=client)

# Avoid - Multiple connections
user1 = await create_record('user', user_data1)
user2 = await create_record('user', user_data2)

3. Handle Errors Gracefully

from surql.connection.client import QueryError

async def safe_query():
  try:
    async with get_client(config) as client:
      user = await get_record('user', 'alice', User, client=client)
  except QueryError as e:
    print(f"Query failed: {e}")
    return None

4. Use Indexes for Performance

# Good - Uses index on email
users = await query_records(
  'user',
  User,
  conditions=['email = "alice@example.com"'],
  client=client,
)

# Avoid - Full table scan
users = await query_records(
  'user',
  User,
  conditions=['LOWERCASE(email) = "alice@example.com"'],
  client=client,
)

5. Limit Result Sets

# Good - Limited results
users = await query_records('user', User, limit=100, client=client)

# Avoid - Unbounded queries
users = await query_records('user', User, client=client)  # Could return millions
# Good - Atomic operation
async with transaction(client):
  await create_record('user', user, client=client)
  await create_record('profile', profile, client=client)

# Avoid - Separate operations (can fail partially)
await create_record('user', user, client=client)
await create_record('profile', profile, client=client)

7. Validate Input Data

# Pydantic validates automatically
try:
  user = User(username='ab', email='invalid')  # Raises ValidationError
except ValidationError as e:
  print(f"Invalid data: {e}")

8. Use Context Managers

# Good - Automatic cleanup
async with get_client(config) as client:
  result = await client.execute(query)

# Avoid - Manual cleanup
client = DatabaseClient(config)
await client.connect()
result = await client.execute(query)
await client.disconnect()

Next Steps