v0.10.0 Release Notes¶
Release Date: February 2026
This release introduces Advanced Patterns support, bringing first-class Event Sourcing, CQRS, and Saga Pattern capabilities to the Cello framework. These features enable Cello applications to implement sophisticated domain-driven designs and distributed transaction coordination while maintaining Rust-powered performance on the hot path.
Highlights¶
- Event Sourcing - Aggregate roots, event replay, snapshots, and event store integration for event-driven persistence
- CQRS - Command/query separation with dedicated buses, separate read/write models, and event-driven synchronization
- Saga Pattern - Distributed transaction coordination with step-by-step execution, compensation logic, and automatic rollback
New Features¶
Event Sourcing¶
Full-featured event sourcing with aggregate root pattern and event replay:
from cello import App
from cello.eventsourcing import (
Aggregate, Event, event_handler, EventStore,
EventSourcingConfig, Snapshot
)
app = App()
# Configure event store
app.enable_event_sourcing(EventSourcingConfig(
storage="postgresql://localhost/events",
snapshot_interval=100, # Snapshot every 100 events
enable_replay=True,
))
# Define domain events
class OrderCreated(Event):
order_id: str
customer_id: str
items: list
class OrderShipped(Event):
order_id: str
tracking_number: str
class OrderCancelled(Event):
order_id: str
reason: str
# Define aggregate root
class Order(Aggregate):
def __init__(self):
super().__init__()
self.status = None
self.items = []
self.tracking_number = None
@event_handler(OrderCreated)
def on_created(self, event):
self.id = event.order_id
self.customer_id = event.customer_id
self.items = event.items
self.status = "created"
@event_handler(OrderShipped)
def on_shipped(self, event):
self.tracking_number = event.tracking_number
self.status = "shipped"
@event_handler(OrderCancelled)
def on_cancelled(self, event):
self.status = "cancelled"
self.cancel_reason = event.reason
# Use in route handlers
@app.post("/orders")
async def create_order(request):
data = request.json()
store = request.app.event_store
event = OrderCreated(
order_id=generate_id(),
customer_id=data["customer_id"],
items=data["items"],
)
await store.append(event)
return {"order_id": event.order_id, "status": "created"}
# Replay events to rebuild state
@app.get("/orders/{id}")
async def get_order(request):
store = request.app.event_store
order = await store.load(Order, request.params["id"])
return {"id": order.id, "status": order.status, "items": order.items}
# Snapshot support for performance
@app.post("/orders/{id}/snapshot")
async def snapshot_order(request):
store = request.app.event_store
await store.create_snapshot(Order, request.params["id"])
return {"snapshot": "created"}
Features:
Aggregatebase class for defining domain aggregates with event handlersEventbase class for typed domain events with automatic serialization@event_handlerdecorator for mapping events to state transitionsEventStorefor persisting and retrieving events with configurable storage backendsEventSourcingConfigfor storage URL, snapshot interval, and replay settingsSnapshotsupport for optimized aggregate loading at scale- Event replay to rebuild aggregate state from the event log
- Automatic snapshot creation at configurable intervals
- PostgreSQL, MySQL, and in-memory storage backends
- Event versioning and upcasting for schema evolution
CQRS (Command Query Responsibility Segregation)¶
Separate read and write models with dedicated command and query buses:
from cello import App
from cello.cqrs import (
Command, Query, CommandBus, QueryBus,
command_handler, query_handler, CqrsConfig
)
app = App()
# Configure CQRS
app.enable_cqrs(CqrsConfig(
command_timeout=30, # seconds
query_timeout=10,
enable_event_sync=True,
))
# Define commands (write operations)
class CreateOrderCommand(Command):
customer_id: str
items: list
class CancelOrderCommand(Command):
order_id: str
reason: str
# Define queries (read operations)
class GetOrderQuery(Query):
order_id: str
class ListOrdersQuery(Query):
customer_id: str
status: str = None
limit: int = 50
# Command handlers (write side)
@command_handler(CreateOrderCommand)
async def handle_create_order(command, db):
order = Order.create(command.customer_id, command.items)
await db.save(order)
await db.publish_event("order_created", {"order_id": order.id})
return order.id
@command_handler(CancelOrderCommand)
async def handle_cancel_order(command, db):
order = await db.get(Order, command.order_id)
order.cancel(command.reason)
await db.save(order)
await db.publish_event("order_cancelled", {"order_id": order.id})
return True
# Query handlers (read side)
@query_handler(GetOrderQuery)
async def handle_get_order(query, read_db):
return await read_db.get_order(query.order_id)
@query_handler(ListOrdersQuery)
async def handle_list_orders(query, read_db):
return await read_db.list_orders(
customer_id=query.customer_id,
status=query.status,
limit=query.limit,
)
# Use in route handlers
@app.post("/orders")
async def create_order(request):
data = request.json()
command = CreateOrderCommand(
customer_id=data["customer_id"],
items=data["items"],
)
order_id = await request.app.command_bus.dispatch(command)
return {"order_id": order_id, "status": "created"}
@app.get("/orders/{id}")
async def get_order(request):
query = GetOrderQuery(order_id=request.params["id"])
order = await request.app.query_bus.dispatch(query)
return order
Features:
CommandandQuerybase classes for typed operations@command_handlerand@query_handlerdecorators for handler registrationCommandBusandQueryBusfor dispatching operations to registered handlersCqrsConfigfor timeout settings and event synchronization- Separate read/write database support for optimized performance
- Event-driven synchronization between write and read models
- Command validation before execution
- Middleware support on command and query buses
- Timeout configuration per bus type
Saga Pattern¶
Distributed transaction coordination with compensation logic:
from cello import App
from cello.saga import Saga, SagaStep, SagaConfig, SagaResult
app = App()
# Configure saga support
app.enable_sagas(SagaConfig(
storage="postgresql://localhost/sagas",
max_retries=3,
retry_delay=5, # seconds
timeout=300, # 5 minutes total timeout
))
# Define step actions and compensations
async def reserve_inventory(context):
result = await inventory_service.reserve(
context["order_id"], context["items"]
)
context["reservation_id"] = result.reservation_id
return result
async def release_inventory(context):
await inventory_service.release(context["reservation_id"])
async def charge_payment(context):
result = await payment_service.charge(
context["customer_id"], context["total"]
)
context["payment_id"] = result.payment_id
return result
async def refund_payment(context):
await payment_service.refund(context["payment_id"])
async def create_shipment(context):
result = await shipping_service.create(
context["order_id"], context["address"]
)
context["shipment_id"] = result.shipment_id
return result
async def cancel_shipment(context):
await shipping_service.cancel(context["shipment_id"])
# Define the saga
class OrderSaga(Saga):
steps = [
SagaStep(
name="reserve_inventory",
action=reserve_inventory,
compensate=release_inventory,
),
SagaStep(
name="process_payment",
action=charge_payment,
compensate=refund_payment,
),
SagaStep(
name="ship_order",
action=create_shipment,
compensate=cancel_shipment,
),
]
# Use in route handlers
@app.post("/orders/{id}/fulfill")
async def fulfill_order(request):
data = request.json()
context = {
"order_id": request.params["id"],
"customer_id": data["customer_id"],
"items": data["items"],
"total": data["total"],
"address": data["address"],
}
result: SagaResult = await request.app.saga_runner.execute(
OrderSaga, context
)
if result.success:
return {"status": "fulfilled", "steps_completed": result.steps_completed}
else:
return Response.json(
{"status": "failed", "failed_step": result.failed_step, "error": str(result.error)},
status=500,
)
Features:
Sagabase class for defining multi-step distributed transactionsSagaStepfor individual steps with action and compensation functionsSagaConfigfor storage, retries, delay, and timeout settingsSagaResultwith success status, completed steps, and error details- Automatic compensation (rollback) when any step fails
- Persistent saga state for crash recovery
- Configurable retry logic with exponential backoff
- Timeout support for long-running sagas
- Step-by-step execution with context propagation
- PostgreSQL and in-memory storage backends for saga state
Improvements¶
Performance¶
- Event store writes in Rust - Event serialization and persistence runs entirely in Rust for maximum throughput
- Command/query bus dispatching in Rust - Bus routing and handler lookup offloaded to the Rust engine
- Zero-copy event replay - Events passed as
Bytesduring aggregate reconstruction without copying - Saga state machine in Rust - Step execution, compensation, and retry logic handled by the Rust runtime
Observability¶
- Event sourcing metrics - Events appended, replayed, and snapshots created exposed via Prometheus at
/metrics - CQRS metrics - Command and query dispatch counts, latency histograms, and error rates at
/metrics - Saga metrics - Saga execution count, step completion rates, compensation triggers, and timeout counts at
/metrics - Health check integration - Event store connectivity and saga storage status included in
/healthendpoint
Developer Experience¶
- Event browser - Built-in event stream viewer available at
/eventswhen in development mode - Saga dashboard - Visual saga execution status and step progress at
/sagasin development mode - Typed command/query payloads - Full IDE autocompletion for command and query fields
- Clear error messages - Descriptive errors for misconfigured event stores, failed compensations, and timeout conditions
Bug Fixes¶
- Fixed GraphQL subscription disconnects under high message throughput (from v0.9.0)
- Fixed gRPC reflection service not listing all registered methods after hot reload (from v0.9.0)
- Fixed Kafka consumer group rebalancing causing duplicate message processing (from v0.9.0)
- Fixed database connection pool not releasing connections on handler timeout (from v0.8.0)
- Fixed health check endpoint returning 200 when event store is unreachable
- Improved error message when saga compensation fails with a non-retryable error
Example¶
See the complete Advanced patterns demo example for a working application demonstrating all Advanced Patterns features.
from cello import App, Response
from cello.eventsourcing import Aggregate, Event, event_handler, EventSourcingConfig
from cello.cqrs import Command, Query, command_handler, query_handler, CqrsConfig
from cello.saga import Saga, SagaStep, SagaConfig
app = App()
# Enable advanced patterns
app.enable_event_sourcing(EventSourcingConfig(storage="postgresql://localhost/events"))
app.enable_cqrs(CqrsConfig(enable_event_sync=True))
app.enable_sagas(SagaConfig(storage="postgresql://localhost/sagas"))
# Event Sourcing
class OrderCreated(Event):
order_id: str
customer_id: str
class Order(Aggregate):
@event_handler(OrderCreated)
def on_created(self, event):
self.id = event.order_id
self.status = "created"
# CQRS
class CreateOrderCommand(Command):
customer_id: str
items: list
@command_handler(CreateOrderCommand)
async def handle_create(command, db):
order = Order.create(command.customer_id, command.items)
await db.save(order)
return order.id
class GetOrderQuery(Query):
order_id: str
@query_handler(GetOrderQuery)
async def handle_get(query, read_db):
return await read_db.get_order(query.order_id)
# Saga
class OrderSaga(Saga):
steps = [
SagaStep("reserve", reserve_inventory, compensate=release_inventory),
SagaStep("charge", charge_payment, compensate=refund_payment),
]
@app.post("/orders")
async def create_order(request):
data = request.json()
cmd = CreateOrderCommand(customer_id=data["customer_id"], items=data["items"])
order_id = await request.app.command_bus.dispatch(cmd)
return {"order_id": order_id}
@app.get("/orders/{id}")
async def get_order(request):
query = GetOrderQuery(order_id=request.params["id"])
return await request.app.query_bus.dispatch(query)
app.run()
Migration Guide¶
See the Migration Guide for detailed upgrade instructions.
Quick Migration¶
-
Update your dependency:
-
Import new Advanced Pattern classes as needed:
# Event Sourcing from cello.eventsourcing import Aggregate, Event, event_handler, EventStore, EventSourcingConfig, Snapshot # CQRS from cello.cqrs import Command, Query, CommandBus, QueryBus, command_handler, query_handler, CqrsConfig # Saga from cello.saga import Saga, SagaStep, SagaConfig, SagaResult -
Enable patterns on your app (optional - only if you need these features):
-
No breaking changes from v0.9.0 - all existing code continues to work. Event Sourcing, CQRS, and Saga Pattern are purely additive features.
External Dependencies¶
Some v0.10.0 features require external services to be running:
| Feature | Required Service | Default Port |
|---|---|---|
| Event Sourcing | PostgreSQL (or in-memory) | 5432 |
| CQRS | None (built-in) | N/A |
| Saga Pattern | PostgreSQL (or in-memory) | 5432 |
If external services are not available, the framework will log a warning at startup but will not prevent the application from running. REST endpoints remain fully functional.
Contributors¶
Thanks to all contributors who made this release possible!
Full Changelog¶
See the complete changelog for all changes.