Quick Start Tutorial¶
This tutorial will walk you through creating your first surql project, defining schemas, creating migrations, and performing CRUD operations.
Table of Contents¶
- Prerequisites
- Project Setup
- Define Your First Schema
- Create Your First Migration
- Connect to the Database
- Run Migrations
- Perform CRUD Operations
- Working with Relationships
- Complete Example
- Multiple Connections
- Next Steps
Prerequisites¶
Before starting, ensure you have:
- Python 3.12+ installed
- SurrealDB 1.0+ installed and running
- surql installed (
pip install oneiriq-surqloruv add oneiriq-surql)
If you haven't completed these steps, see the Installation Guide.
Project Setup¶
1. Create a New Project¶
2. Create Virtual Environment (Optional but Recommended)¶
3. Install surql¶
4. Create Project Structure¶
Your project structure should look like:
5. Configure Environment¶
Edit .env:
SURREAL_URL=ws://localhost:8000/rpc
SURREAL_NAMESPACE=blog
SURREAL_DATABASE=blog
SURREAL_USERNAME=root
SURREAL_PASSWORD=root
6. Start SurrealDB¶
In a separate terminal:
Define Your First Schema¶
Let's create a blog with users and posts.
Create User Schema¶
Create schemas/user.py:
from surql.schema.fields import string_field, datetime_field
from surql.schema.table import table_schema, unique_index, TableMode
user_schema = table_schema(
'user',
mode=TableMode.SCHEMAFULL,
fields=[
string_field('username', assertion='string::len($value) >= 3'),
string_field('email', assertion='string::is::email($value)'),
string_field('full_name'),
datetime_field('created_at', default='time::now()', readonly=True),
datetime_field('updated_at', default='time::now()'),
],
indexes=[
unique_index('username_idx', ['username']),
unique_index('email_idx', ['email']),
],
)
Create Post Schema¶
Create schemas/post.py:
from surql.schema.fields import string_field, record_field, datetime_field, bool_field
from surql.schema.table import table_schema, search_index, TableMode
post_schema = table_schema(
'post',
mode=TableMode.SCHEMAFULL,
fields=[
string_field('title', assertion='string::len($value) > 0'),
string_field('content'),
string_field('slug', assertion='string::len($value) > 0'),
record_field('author', table='user'),
bool_field('published', default='false'),
datetime_field('created_at', default='time::now()', readonly=True),
datetime_field('updated_at', default='time::now()'),
],
indexes=[
unique_index('slug_idx', ['slug']),
search_index('content_search', ['title', 'content']),
],
)
Create Your First Migration¶
1. Create Migration File¶
This creates a file like migrations/20260102_120000_create_user_and_post_tables.py.
2. Edit the Migration¶
Open the migration file and add your schema definitions:
"""Create user and post tables."""
def up() -> list[str]:
"""Apply migration."""
return [
# Create user table
'DEFINE TABLE user SCHEMAFULL;',
'DEFINE FIELD username ON TABLE user TYPE string ASSERT string::len($value) >= 3;',
'DEFINE FIELD email ON TABLE user TYPE string ASSERT string::is::email($value);',
'DEFINE FIELD full_name ON TABLE user TYPE string;',
'DEFINE FIELD created_at ON TABLE user TYPE datetime DEFAULT time::now() READONLY;',
'DEFINE FIELD updated_at ON TABLE user TYPE datetime DEFAULT time::now();',
'DEFINE INDEX username_idx ON TABLE user COLUMNS username UNIQUE;',
'DEFINE INDEX email_idx ON TABLE user COLUMNS email UNIQUE;',
# Create post table
'DEFINE TABLE post SCHEMAFULL;',
'DEFINE FIELD title ON TABLE post TYPE string ASSERT string::len($value) > 0;',
'DEFINE FIELD content ON TABLE post TYPE string;',
'DEFINE FIELD slug ON TABLE post TYPE string ASSERT string::len($value) > 0;',
'DEFINE FIELD author ON TABLE post TYPE record<user>;',
'DEFINE FIELD published ON TABLE post TYPE bool DEFAULT false;',
'DEFINE FIELD created_at ON TABLE post TYPE datetime DEFAULT time::now() READONLY;',
'DEFINE FIELD updated_at ON TABLE post TYPE datetime DEFAULT time::now();',
'DEFINE INDEX slug_idx ON TABLE post COLUMNS slug UNIQUE;',
'DEFINE INDEX content_search ON TABLE post COLUMNS title, content SEARCH;',
]
def down() -> list[str]:
"""Rollback migration."""
return [
'REMOVE INDEX content_search ON TABLE post;',
'REMOVE INDEX slug_idx ON TABLE post;',
'REMOVE TABLE post;',
'REMOVE INDEX email_idx ON TABLE user;',
'REMOVE INDEX username_idx ON TABLE user;',
'REMOVE TABLE user;',
]
metadata = {
'version': '20260102_120000',
'description': 'Create user and post tables',
'author': 'surql',
'depends_on': [],
}
Connect to the Database¶
Create Connection Helper¶
Create main.py:
import asyncio
from surql.connection.client import get_client
from surql.connection.config import ConnectionConfig
from surql.settings import get_db_config
async def get_db_client():
"""Get database client from configuration."""
config = get_db_config() # Loads from .env
return get_client(config)
Run Migrations¶
1. Check Migration Status¶
Output:
Migration Status
┌─────────────────────────────────────────┬────────┐
│ Version │ Status │
├─────────────────────────────────────────┼────────┤
│ 20260102_120000_create_user_and_post... │ PENDING│
└─────────────────────────────────────────┴────────┘
Total: 1 | Applied: 0 | Pending: 1
2. Apply Migrations¶
Output:
Discovering migrations in migrations
Found 1 pending migration(s):
• 20260102_120000: Create user and post tables
Successfully applied 1 migration(s)
3. Verify¶
Perform CRUD Operations¶
Define Pydantic Models¶
Add to main.py:
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime
class User(BaseModel):
username: str = Field(min_length=3)
email: EmailStr
full_name: str
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Post(BaseModel):
title: str
content: str
slug: str
author: str # Will be record ID like "user:alice"
published: bool = False
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
Create Records¶
Add to main.py:
from surql.query.crud import create_record, query_records, get_record
async def create_user_example():
"""Create a new user."""
async with await get_db_client() as client:
user = await create_record(
'user',
User(
username='alice',
email='alice@example.com',
full_name='Alice Johnson',
),
client=client,
)
print(f"Created user: {user['id']}")
return user
async def create_post_example(author_id: str):
"""Create a new post."""
async with await get_db_client() as client:
post = await create_record(
'post',
Post(
title='My First Blog Post',
content='This is my first post using surql!',
slug='my-first-post',
author=author_id,
published=True,
),
client=client,
)
print(f"Created post: {post['id']}")
return post
Query Records¶
async def query_users_example():
"""Query all users."""
async with await get_db_client() as client:
users = await query_records(
'user',
User,
client=client,
)
for user in users:
print(f"{user.username} - {user.email}")
async def query_published_posts():
"""Query published posts."""
async with await get_db_client() as client:
posts = await query_records(
'post',
Post,
conditions=['published = true'],
order_by=('created_at', 'DESC'),
limit=10,
client=client,
)
for post in posts:
print(f"{post.title} by {post.author}")
Update Records¶
from surql.query.crud import update_record, merge_record
async def update_user_example(user_id: str):
"""Update a user."""
async with await get_db_client() as client:
updated = await update_record(
'user',
user_id,
User(
username='alice',
email='alice.new@example.com',
full_name='Alice Smith',
),
client=client,
)
print(f"Updated user: {updated}")
async def merge_post_example(post_id: str):
"""Merge data into a post."""
async with await get_db_client() as client:
updated = await merge_record(
'post',
post_id,
{'published': True},
client=client,
)
print(f"Published post: {updated}")
Delete Records¶
from surql.query.crud import delete_record
async def delete_post_example(post_id: str):
"""Delete a post."""
async with await get_db_client() as client:
await delete_record('post', post_id, client=client)
print("Post deleted")
Working with Relationships¶
Create Edge Schema¶
Create schemas/likes.py:
from surql.schema.edge import edge_schema
from surql.schema.fields import datetime_field
likes_edge = edge_schema(
'likes',
from_table='user',
to_table='post',
fields=[
datetime_field('liked_at', default='time::now()', readonly=True),
],
)
Create Edge Migration¶
Edit the migration file:
def up() -> list[str]:
return [
'DEFINE TABLE likes SCHEMAFULL;',
'DEFINE FIELD in ON TABLE likes TYPE record<user>;',
'DEFINE FIELD out ON TABLE likes TYPE record<post>;',
'DEFINE FIELD liked_at ON TABLE likes TYPE datetime DEFAULT time::now() READONLY;',
]
def down() -> list[str]:
return [
'REMOVE TABLE likes;',
]
Apply the migration:
Create Relationships¶
async def like_post_example(user_id: str, post_id: str):
"""Create a like relationship."""
async with await get_db_client() as client:
result = await client.execute(
f'RELATE {user_id}->likes->{post_id}'
)
print(f"User liked post: {result}")
async def query_user_likes(user_id: str):
"""Query posts liked by a user."""
async with await get_db_client() as client:
result = await client.execute(
f'SELECT * FROM {user_id}->likes->post'
)
print(f"User's liked posts: {result}")
Complete Example¶
Here's a complete working example in main.py:
import asyncio
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime
from surql.connection.client import get_client
from surql.settings import get_db_config
from surql.query.crud import (
create_record,
query_records,
get_record,
merge_record,
)
class User(BaseModel):
username: str = Field(min_length=3)
email: EmailStr
full_name: str
class Post(BaseModel):
title: str
content: str
slug: str
author: str
published: bool = False
async def main():
"""Main application entry point."""
config = get_db_config()
async with get_client(config) as client:
# Create a user
print("Creating user...")
user = await create_record(
'user',
User(
username='alice',
email='alice@example.com',
full_name='Alice Johnson',
),
client=client,
)
print(f"Created user: {user['id']}")
# Create a post
print("\nCreating post...")
post = await create_record(
'post',
Post(
title='Getting Started with surql',
content='surql makes working with SurrealDB easy!',
slug='getting-started',
author=user['id'],
),
client=client,
)
print(f"Created post: {post['id']}")
# Publish the post
print("\nPublishing post...")
await merge_record(
'post',
post['id'].split(':')[1],
{'published': True},
client=client,
)
# Query published posts
print("\nQuerying published posts...")
posts = await query_records(
'post',
Post,
conditions=['published = true'],
client=client,
)
for p in posts:
print(f" - {p.title} (by {p.author})")
# Create a like relationship
print("\nCreating like relationship...")
await client.execute(
f"RELATE {user['id']}->likes->{post['id']}"
)
# Query liked posts
print("\nQuerying user's liked posts...")
result = await client.execute(
f"SELECT * FROM {user['id']}->likes->post"
)
print(f"Liked posts: {result}")
if __name__ == '__main__':
asyncio.run(main())
Run the Example¶
Expected output:
Creating user...
Created user: user:alice
Creating post...
Created post: post:abc123
Publishing post...
Querying published posts...
- Getting Started with surql (by user:alice)
Creating like relationship...
Querying user's liked posts...
Liked posts: [...]
Next Steps¶
Now that you've completed the quick start, explore these topics:
- Schema Definition Guide - Learn about all schema features:
- Field types and assertions
- Events and triggers
- Permissions
-
Computed fields
-
Migration System - Advanced migration topics:
- Auto-generation from schema changes
- Migration dependencies
-
Rollback strategies
-
Query Builder & ORM - Advanced querying:
- Complex filters and operators
- Graph traversal
- Transactions
-
Aggregations
-
CLI Reference - Complete CLI documentation
-
Examples - More working examples:
- Advanced schema patterns
- Graph queries
- Complex relationships
Common Patterns¶
Multiple Environments¶
Use different .env files for each environment:
Load the appropriate config:
import os
from dotenv import load_dotenv
# Load environment-specific config
env = os.getenv('ENV', 'development')
load_dotenv(f'.env.{env}')
Testing¶
Create a test configuration:
# conftest.py
import pytest
from surql.connection.config import ConnectionConfig
@pytest.fixture
async def test_db():
config = ConnectionConfig(
url='ws://localhost:8000/rpc',
namespace='test',
database='test_db',
username='root',
password='root',
)
async with get_client(config) as client:
yield client
# Cleanup after tests
await client.execute('REMOVE DATABASE test_db')
Context Manager Pattern¶
Use context managers for automatic cleanup:
from surql.connection.context import db_context
async def with_context_example():
async with db_context(get_db_config()) as client:
# Client is automatically connected
user = await create_record('user', user_data, client=client)
# Client is automatically disconnected
Multiple Connections¶
For applications requiring connections to multiple databases (read replicas, analytics, different environments), surql provides a connection registry.
When to Use Multiple Connections¶
- Read replicas - Route read queries to replicas for performance
- Analytics databases - Separate OLAP workloads from OLTP
- Multi-tenant - Connect to different tenant databases
- Blue-green deployments - Maintain connections to multiple environments
Register Named Connections¶
import asyncio
from surql.connection.registry import ConnectionRegistry, get_registry
from surql.connection.config import ConnectionConfig
async def setup_connections():
"""Register multiple database connections."""
registry = get_registry()
# Primary database (writes)
await registry.register(
'primary',
ConnectionConfig(
url='ws://localhost:8000/rpc',
namespace='app',
database='production',
username='root',
password='root',
),
set_default=True, # Use as default connection
)
# Read replica (reads)
await registry.register(
'replica',
ConnectionConfig(
url='ws://replica.internal:8000/rpc',
namespace='app',
database='production',
username='reader',
password='reader_pass',
),
)
# Analytics database
await registry.register(
'analytics',
ConnectionConfig(
url='ws://analytics.internal:8000/rpc',
namespace='app',
database='analytics',
username='analyst',
password='analyst_pass',
),
)
print(f"Registered connections: {registry.list_connections()}")
# ['primary', 'replica', 'analytics']
Use Named Connections¶
from surql.connection.registry import get_registry
from surql.query.crud import create_record, query_records
async def use_connections():
"""Use different connections for different operations."""
registry = get_registry()
# Get default (primary) connection for writes
primary = registry.get() # or registry.get('primary')
# Create record on primary
user = await create_record(
'user',
User(username='alice', email='alice@example.com', full_name='Alice'),
client=primary,
)
# Query from read replica
replica = registry.get('replica')
users = await query_records('user', User, client=replica)
# Run analytics queries on analytics db
analytics = registry.get('analytics')
result = await analytics.execute('''
SELECT category, count() as total
FROM pageviews
GROUP BY category
''')
async def cleanup():
"""Disconnect all connections on shutdown."""
registry = get_registry()
await registry.disconnect_all()
Environment-Based Configuration¶
Configure named connections via environment variables:
# Primary database
SURQL_PRIMARY_DB_URL=ws://localhost:8000/rpc
SURQL_PRIMARY_DB_NS=app
SURQL_PRIMARY_DB=production
SURQL_PRIMARY_DB_USER=root
SURQL_PRIMARY_DB_PASS=root
# Read replica
SURQL_REPLICA_DB_URL=ws://replica:8000/rpc
SURQL_REPLICA_DB_NS=app
SURQL_REPLICA_DB=production
SURQL_REPLICA_DB_USER=reader
SURQL_REPLICA_DB_PASS=reader_pass
Load from environment:
from surql.connection.config import NamedConnectionConfig
from surql.connection.registry import get_registry
async def setup_from_env():
"""Load named connections from environment variables."""
registry = get_registry()
# Load configurations using SURQL_{NAME}_ prefix
primary_config = NamedConnectionConfig.from_env('PRIMARY')
replica_config = NamedConnectionConfig.from_env('REPLICA')
await registry.register(primary_config.name, primary_config.config, set_default=True)
await registry.register(replica_config.name, replica_config.config)
Troubleshooting¶
Migration Not Found¶
If migrations aren't found:
# Verify migrations directory
ls migrations/
# Check migration file naming
# Should be: YYYYMMDD_HHMMSS_description.py
Connection Issues¶
If connection fails:
Schema Validation Errors¶
If you get validation errors:
# Enable verbose logging
import structlog
structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(10))
# Run again to see detailed errors