Skip to content

v0.10.0 Release Notes¶

Release Date: February 2026

This release introduces Advanced Patterns support, bringing first-class Event Sourcing, CQRS, and Saga Pattern capabilities to the Cello framework. These features enable Cello applications to implement sophisticated domain-driven designs and distributed transaction coordination while maintaining Rust-powered performance on the hot path.

Highlights¶

  • Event Sourcing - Aggregate roots, event replay, snapshots, and event store integration for event-driven persistence
  • CQRS - Command/query separation with dedicated buses, separate read/write models, and event-driven synchronization
  • Saga Pattern - Distributed transaction coordination with step-by-step execution, compensation logic, and automatic rollback

New Features¶

Event Sourcing¶

Full-featured event sourcing with aggregate root pattern and event replay:

from cello import App
from cello.eventsourcing import (
    Aggregate, Event, event_handler, EventStore,
    EventSourcingConfig, Snapshot
)

app = App()

# Configure event store
app.enable_event_sourcing(EventSourcingConfig(
    storage="postgresql://localhost/events",
    snapshot_interval=100,  # Snapshot every 100 events
    enable_replay=True,
))

# Define domain events
class OrderCreated(Event):
    order_id: str
    customer_id: str
    items: list

class OrderShipped(Event):
    order_id: str
    tracking_number: str

class OrderCancelled(Event):
    order_id: str
    reason: str

# Define aggregate root
class Order(Aggregate):
    def __init__(self):
        super().__init__()
        self.status = None
        self.items = []
        self.tracking_number = None

    @event_handler(OrderCreated)
    def on_created(self, event):
        self.id = event.order_id
        self.customer_id = event.customer_id
        self.items = event.items
        self.status = "created"

    @event_handler(OrderShipped)
    def on_shipped(self, event):
        self.tracking_number = event.tracking_number
        self.status = "shipped"

    @event_handler(OrderCancelled)
    def on_cancelled(self, event):
        self.status = "cancelled"
        self.cancel_reason = event.reason

# Use in route handlers
@app.post("/orders")
async def create_order(request):
    data = request.json()
    store = request.app.event_store
    event = OrderCreated(
        order_id=generate_id(),
        customer_id=data["customer_id"],
        items=data["items"],
    )
    await store.append(event)
    return {"order_id": event.order_id, "status": "created"}

# Replay events to rebuild state
@app.get("/orders/{id}")
async def get_order(request):
    store = request.app.event_store
    order = await store.load(Order, request.params["id"])
    return {"id": order.id, "status": order.status, "items": order.items}

# Snapshot support for performance
@app.post("/orders/{id}/snapshot")
async def snapshot_order(request):
    store = request.app.event_store
    await store.create_snapshot(Order, request.params["id"])
    return {"snapshot": "created"}

Features:

  • Aggregate base class for defining domain aggregates with event handlers
  • Event base class for typed domain events with automatic serialization
  • @event_handler decorator for mapping events to state transitions
  • EventStore for persisting and retrieving events with configurable storage backends
  • EventSourcingConfig for storage URL, snapshot interval, and replay settings
  • Snapshot support for optimized aggregate loading at scale
  • Event replay to rebuild aggregate state from the event log
  • Automatic snapshot creation at configurable intervals
  • PostgreSQL, MySQL, and in-memory storage backends
  • Event versioning and upcasting for schema evolution

CQRS (Command Query Responsibility Segregation)¶

Separate read and write models with dedicated command and query buses:

from cello import App
from cello.cqrs import (
    Command, Query, CommandBus, QueryBus,
    command_handler, query_handler, CqrsConfig
)

app = App()

# Configure CQRS
app.enable_cqrs(CqrsConfig(
    command_timeout=30,  # seconds
    query_timeout=10,
    enable_event_sync=True,
))

# Define commands (write operations)
class CreateOrderCommand(Command):
    customer_id: str
    items: list

class CancelOrderCommand(Command):
    order_id: str
    reason: str

# Define queries (read operations)
class GetOrderQuery(Query):
    order_id: str

class ListOrdersQuery(Query):
    customer_id: str
    status: str = None
    limit: int = 50

# Command handlers (write side)
@command_handler(CreateOrderCommand)
async def handle_create_order(command, db):
    order = Order.create(command.customer_id, command.items)
    await db.save(order)
    await db.publish_event("order_created", {"order_id": order.id})
    return order.id

@command_handler(CancelOrderCommand)
async def handle_cancel_order(command, db):
    order = await db.get(Order, command.order_id)
    order.cancel(command.reason)
    await db.save(order)
    await db.publish_event("order_cancelled", {"order_id": order.id})
    return True

# Query handlers (read side)
@query_handler(GetOrderQuery)
async def handle_get_order(query, read_db):
    return await read_db.get_order(query.order_id)

@query_handler(ListOrdersQuery)
async def handle_list_orders(query, read_db):
    return await read_db.list_orders(
        customer_id=query.customer_id,
        status=query.status,
        limit=query.limit,
    )

# Use in route handlers
@app.post("/orders")
async def create_order(request):
    data = request.json()
    command = CreateOrderCommand(
        customer_id=data["customer_id"],
        items=data["items"],
    )
    order_id = await request.app.command_bus.dispatch(command)
    return {"order_id": order_id, "status": "created"}

@app.get("/orders/{id}")
async def get_order(request):
    query = GetOrderQuery(order_id=request.params["id"])
    order = await request.app.query_bus.dispatch(query)
    return order

Features:

  • Command and Query base classes for typed operations
  • @command_handler and @query_handler decorators for handler registration
  • CommandBus and QueryBus for dispatching operations to registered handlers
  • CqrsConfig for timeout settings and event synchronization
  • Separate read/write database support for optimized performance
  • Event-driven synchronization between write and read models
  • Command validation before execution
  • Middleware support on command and query buses
  • Timeout configuration per bus type

Saga Pattern¶

Distributed transaction coordination with compensation logic:

from cello import App
from cello.saga import Saga, SagaStep, SagaConfig, SagaResult

app = App()

# Configure saga support
app.enable_sagas(SagaConfig(
    storage="postgresql://localhost/sagas",
    max_retries=3,
    retry_delay=5,  # seconds
    timeout=300,  # 5 minutes total timeout
))

# Define step actions and compensations
async def reserve_inventory(context):
    result = await inventory_service.reserve(
        context["order_id"], context["items"]
    )
    context["reservation_id"] = result.reservation_id
    return result

async def release_inventory(context):
    await inventory_service.release(context["reservation_id"])

async def charge_payment(context):
    result = await payment_service.charge(
        context["customer_id"], context["total"]
    )
    context["payment_id"] = result.payment_id
    return result

async def refund_payment(context):
    await payment_service.refund(context["payment_id"])

async def create_shipment(context):
    result = await shipping_service.create(
        context["order_id"], context["address"]
    )
    context["shipment_id"] = result.shipment_id
    return result

async def cancel_shipment(context):
    await shipping_service.cancel(context["shipment_id"])

# Define the saga
class OrderSaga(Saga):
    steps = [
        SagaStep(
            name="reserve_inventory",
            action=reserve_inventory,
            compensate=release_inventory,
        ),
        SagaStep(
            name="process_payment",
            action=charge_payment,
            compensate=refund_payment,
        ),
        SagaStep(
            name="ship_order",
            action=create_shipment,
            compensate=cancel_shipment,
        ),
    ]

# Use in route handlers
@app.post("/orders/{id}/fulfill")
async def fulfill_order(request):
    data = request.json()
    context = {
        "order_id": request.params["id"],
        "customer_id": data["customer_id"],
        "items": data["items"],
        "total": data["total"],
        "address": data["address"],
    }
    result: SagaResult = await request.app.saga_runner.execute(
        OrderSaga, context
    )
    if result.success:
        return {"status": "fulfilled", "steps_completed": result.steps_completed}
    else:
        return Response.json(
            {"status": "failed", "failed_step": result.failed_step, "error": str(result.error)},
            status=500,
        )

Features:

  • Saga base class for defining multi-step distributed transactions
  • SagaStep for individual steps with action and compensation functions
  • SagaConfig for storage, retries, delay, and timeout settings
  • SagaResult with success status, completed steps, and error details
  • Automatic compensation (rollback) when any step fails
  • Persistent saga state for crash recovery
  • Configurable retry logic with exponential backoff
  • Timeout support for long-running sagas
  • Step-by-step execution with context propagation
  • PostgreSQL and in-memory storage backends for saga state

Improvements¶

Performance¶

  • Event store writes in Rust - Event serialization and persistence runs entirely in Rust for maximum throughput
  • Command/query bus dispatching in Rust - Bus routing and handler lookup offloaded to the Rust engine
  • Zero-copy event replay - Events passed as Bytes during aggregate reconstruction without copying
  • Saga state machine in Rust - Step execution, compensation, and retry logic handled by the Rust runtime

Observability¶

  • Event sourcing metrics - Events appended, replayed, and snapshots created exposed via Prometheus at /metrics
  • CQRS metrics - Command and query dispatch counts, latency histograms, and error rates at /metrics
  • Saga metrics - Saga execution count, step completion rates, compensation triggers, and timeout counts at /metrics
  • Health check integration - Event store connectivity and saga storage status included in /health endpoint

Developer Experience¶

  • Event browser - Built-in event stream viewer available at /events when in development mode
  • Saga dashboard - Visual saga execution status and step progress at /sagas in development mode
  • Typed command/query payloads - Full IDE autocompletion for command and query fields
  • Clear error messages - Descriptive errors for misconfigured event stores, failed compensations, and timeout conditions

Bug Fixes¶

  • Fixed GraphQL subscription disconnects under high message throughput (from v0.9.0)
  • Fixed gRPC reflection service not listing all registered methods after hot reload (from v0.9.0)
  • Fixed Kafka consumer group rebalancing causing duplicate message processing (from v0.9.0)
  • Fixed database connection pool not releasing connections on handler timeout (from v0.8.0)
  • Fixed health check endpoint returning 200 when event store is unreachable
  • Improved error message when saga compensation fails with a non-retryable error

Example¶

See the complete Advanced patterns demo example for a working application demonstrating all Advanced Patterns features.

from cello import App, Response
from cello.eventsourcing import Aggregate, Event, event_handler, EventSourcingConfig
from cello.cqrs import Command, Query, command_handler, query_handler, CqrsConfig
from cello.saga import Saga, SagaStep, SagaConfig

app = App()

# Enable advanced patterns
app.enable_event_sourcing(EventSourcingConfig(storage="postgresql://localhost/events"))
app.enable_cqrs(CqrsConfig(enable_event_sync=True))
app.enable_sagas(SagaConfig(storage="postgresql://localhost/sagas"))

# Event Sourcing
class OrderCreated(Event):
    order_id: str
    customer_id: str

class Order(Aggregate):
    @event_handler(OrderCreated)
    def on_created(self, event):
        self.id = event.order_id
        self.status = "created"

# CQRS
class CreateOrderCommand(Command):
    customer_id: str
    items: list

@command_handler(CreateOrderCommand)
async def handle_create(command, db):
    order = Order.create(command.customer_id, command.items)
    await db.save(order)
    return order.id

class GetOrderQuery(Query):
    order_id: str

@query_handler(GetOrderQuery)
async def handle_get(query, read_db):
    return await read_db.get_order(query.order_id)

# Saga
class OrderSaga(Saga):
    steps = [
        SagaStep("reserve", reserve_inventory, compensate=release_inventory),
        SagaStep("charge", charge_payment, compensate=refund_payment),
    ]

@app.post("/orders")
async def create_order(request):
    data = request.json()
    cmd = CreateOrderCommand(customer_id=data["customer_id"], items=data["items"])
    order_id = await request.app.command_bus.dispatch(cmd)
    return {"order_id": order_id}

@app.get("/orders/{id}")
async def get_order(request):
    query = GetOrderQuery(order_id=request.params["id"])
    return await request.app.query_bus.dispatch(query)

app.run()

Migration Guide¶

See the Migration Guide for detailed upgrade instructions.

Quick Migration¶

  1. Update your dependency:

    pip install --upgrade cello-framework
    

  2. Import new Advanced Pattern classes as needed:

    # Event Sourcing
    from cello.eventsourcing import Aggregate, Event, event_handler, EventStore, EventSourcingConfig, Snapshot
    
    # CQRS
    from cello.cqrs import Command, Query, CommandBus, QueryBus, command_handler, query_handler, CqrsConfig
    
    # Saga
    from cello.saga import Saga, SagaStep, SagaConfig, SagaResult
    

  3. Enable patterns on your app (optional - only if you need these features):

    app.enable_event_sourcing(EventSourcingConfig(storage="postgresql://localhost/events"))
    app.enable_cqrs(CqrsConfig(enable_event_sync=True))
    app.enable_sagas(SagaConfig(storage="postgresql://localhost/sagas"))
    

  4. No breaking changes from v0.9.0 - all existing code continues to work. Event Sourcing, CQRS, and Saga Pattern are purely additive features.

External Dependencies¶

Some v0.10.0 features require external services to be running:

Feature Required Service Default Port
Event Sourcing PostgreSQL (or in-memory) 5432
CQRS None (built-in) N/A
Saga Pattern PostgreSQL (or in-memory) 5432

If external services are not available, the framework will log a warning at startup but will not prevent the application from running. REST endpoints remain fully functional.


Contributors¶

Thanks to all contributors who made this release possible!


Full Changelog¶

See the complete changelog for all changes.