Skip to content

v0.9.0 Release Notes¶

Release Date: February 2026

This release introduces API Protocol support, bringing first-class GraphQL, gRPC, and message queue integration to the Cello framework. These features enable Cello applications to serve multiple protocol types from a single codebase while maintaining Rust-powered performance on the hot path.

Highlights¶

  • GraphQL Support - Schema-first and code-first GraphQL with queries, mutations, subscriptions, and DataLoader for N+1 prevention
  • gRPC Support - Protocol buffer integration with bidirectional streaming, gRPC-Web, and reflection service
  • Message Queue Integration - Kafka and RabbitMQ adapters with consumer/producer decorators and dead letter queue handling

New Features¶

GraphQL Support¶

Full-featured GraphQL support with decorator-based schema definition:

from cello import App
from cello.graphql import Query, Mutation, Subscription, Schema, DataLoader

app = App()

# DataLoader for batching (prevents N+1 queries)
async def batch_load_users(ids):
    return [db.get_user(id) for id in ids]

user_loader = DataLoader(batch_fn=batch_load_users)

@Query
def users(info) -> list:
    return db.get_all_users()

@Query
def user(info, id: int) -> dict:
    return db.get_user(id)

@Mutation
def create_user(info, name: str, email: str) -> dict:
    return db.create_user(name, email)

@Subscription
async def user_created(info):
    async for event in event_stream("user_created"):
        yield event

# Build and mount schema
schema = Schema(
    queries=[users, user],
    mutations=[create_user],
    subscriptions=[user_created],
)
app.mount("/graphql", schema)

Features:

  • Decorator-based schema definition (@Query, @Mutation, @Subscription)
  • Schema builder to compose queries, mutations, and subscriptions
  • DataLoader for automatic batching and caching to prevent N+1 queries
  • WebSocket-based subscriptions for real-time updates
  • Schema introspection for tooling compatibility (GraphiQL, Apollo Studio)
  • Type inference from Python type hints
  • Federation support for microservice architectures

gRPC Support¶

Define gRPC services using Python classes with Rust-powered serialization:

from cello import App
from cello.grpc import GrpcService, grpc_method, GrpcRequest, GrpcResponse, GrpcConfig

app = App()

app.enable_grpc(GrpcConfig(
    port=50051,
    max_message_size=4 * 1024 * 1024,
    enable_reflection=True,
    enable_grpc_web=True,
))

class UserService(GrpcService):
    service_name = "myapp.UserService"

    @grpc_method
    async def GetUser(self, request: GrpcRequest) -> GrpcResponse:
        user = await db.get_user(request.get("id"))
        if user is None:
            return GrpcResponse(error="User not found", code=5)
        return GrpcResponse(data={"id": user.id, "name": user.name})

    @grpc_method
    async def ListUsers(self, request: GrpcRequest) -> GrpcResponse:
        users = await db.get_all_users()
        return GrpcResponse(data={"users": users})

app.add_grpc_service(UserService())

Features:

  • Class-based service definitions with GrpcService base class
  • @grpc_method decorator for individual RPC methods
  • GrpcRequest and GrpcResponse wrapper types
  • Configurable via GrpcConfig (port, message size, reflection, gRPC-Web)
  • gRPC-Web support for browser clients without a proxy
  • Reflection service for service discovery (compatible with grpcurl)
  • Bidirectional streaming support
  • gRPC status codes and error handling
  • Protocol buffer serialization handled in Rust for maximum performance

Message Queue Integration¶

Kafka¶

Decorator-based Kafka consumers and producers:

from cello import App
from cello.messaging import KafkaConfig, kafka_consumer, kafka_producer, Message, MessageResult, Producer

app = App()

app.enable_messaging(KafkaConfig(
    bootstrap_servers="localhost:9092",
    group_id="my-service",
    auto_offset_reset="earliest",
    enable_auto_commit=True,
))

# Consumer: process messages from a topic
@kafka_consumer(topic="orders", group="order-processor")
async def process_order(message: Message):
    order = message.json()
    await fulfill_order(order)
    return MessageResult.ACK

# Producer decorator: auto-publish handler response to topic
@app.post("/orders")
@kafka_producer(topic="order-events")
async def create_order(request):
    order = request.json()
    saved = await db.save_order(order)
    return {"order_id": saved.id, "status": "created"}

# Manual producer for fine-grained control
producer = Producer(topic="notifications", config=kafka_config)
await producer.send({"type": "alert", "message": "New order received"})

Features:

  • @kafka_consumer decorator for topic subscriptions
  • @kafka_producer decorator for automatic message publishing
  • Producer class for manual message publishing
  • Message wrapper with .json(), .text(), .raw() accessors
  • MessageResult enum: ACK, NACK, RETRY, DEAD_LETTER
  • Consumer group management with automatic rebalancing
  • Dead letter queue handling for failed messages
  • SSL/SASL authentication support

RabbitMQ¶

from cello.messaging import RabbitMQConfig

app.enable_rabbitmq(RabbitMQConfig(
    url="amqp://guest:guest@localhost:5672/",
    prefetch_count=10,
    exchange="events",
    exchange_type="topic",
))

Features:

  • AMQP 0-9-1 protocol support
  • Exchange types: direct, topic, fanout, headers
  • Configurable prefetch count for flow control
  • Dead letter exchange support
  • Message acknowledgment and rejection
  • Connection pooling and automatic reconnection

Improvements¶

Performance¶

  • gRPC serialization in Rust - Protocol buffer encoding/decoding runs entirely in Rust, bypassing Python overhead
  • GraphQL query parsing in Rust - Query parsing and validation offloaded to the Rust engine
  • Zero-copy message passing - Kafka and RabbitMQ message bodies passed as Bytes without copying
  • Connection pool sharing - gRPC and HTTP share the same Tokio runtime for efficient resource usage

Observability¶

  • GraphQL metrics - Query count, latency, and error rate exposed via Prometheus at /metrics
  • gRPC metrics - Per-method request count, latency histogram, and error codes at /metrics
  • Messaging metrics - Consumer lag, messages processed, and error counts at /metrics
  • Health check integration - Kafka broker connectivity and gRPC server status included in /health endpoint

Developer Experience¶

  • GraphQL playground - Built-in GraphiQL interface available at /graphql when in development mode
  • gRPC reflection - Service discovery via reflection for use with grpcurl and other tools
  • Typed message payloads - Message.json() returns typed dicts with IDE autocompletion
  • Clear error messages - Descriptive errors for misconfigured brokers, invalid schemas, and connection failures

Bug Fixes¶

  • Fixed database connection pool not releasing connections on handler timeout (from v0.8.0)
  • Fixed Redis hgetall returning empty dict for non-existent keys instead of None (from v0.8.0)
  • Fixed @transactional decorator not propagating exceptions correctly in nested async calls (from v0.8.0)
  • Fixed health check endpoint returning 200 when database connection is down
  • Fixed memory leak in long-running WebSocket connections with high message throughput
  • Improved error message when maturin develop is not run before importing cello

Example¶

See the complete API protocols demo example for a working application demonstrating all API Protocol features.

from cello import App, Response
from cello.graphql import Query, Mutation, Schema, DataLoader
from cello.grpc import GrpcService, grpc_method, GrpcConfig, GrpcRequest, GrpcResponse
from cello.messaging import KafkaConfig, kafka_consumer, Message, MessageResult

app = App()

# Enable protocols
app.enable_grpc(GrpcConfig(port=50051, enable_reflection=True))
app.enable_messaging(KafkaConfig(bootstrap_servers="localhost:9092", group_id="demo"))

# GraphQL
@Query
def users(info) -> list:
    return get_all_users()

schema = Schema(queries=[users])
app.mount("/graphql", schema)

# gRPC
class UserService(GrpcService):
    @grpc_method
    async def GetUser(self, request: GrpcRequest) -> GrpcResponse:
        return GrpcResponse(data={"id": 1, "name": "Alice"})

app.add_grpc_service(UserService())

# Kafka consumer
@kafka_consumer(topic="events", group="demo")
async def handle_event(message: Message):
    print(message.json())
    return MessageResult.ACK

app.run()

Migration Guide¶

See the Migration Guide for detailed upgrade instructions.

Quick Migration¶

  1. Update your dependency:

    pip install --upgrade cello-framework
    

  2. Import new API Protocol classes as needed:

    # GraphQL
    from cello.graphql import Query, Mutation, Subscription, Schema, DataLoader
    
    # gRPC
    from cello.grpc import GrpcService, grpc_method, GrpcConfig, GrpcRequest, GrpcResponse
    
    # Messaging
    from cello.messaging import KafkaConfig, kafka_consumer, kafka_producer, Message, MessageResult
    

  3. Enable protocols on your app (optional - only if you need these features):

    app.enable_grpc(GrpcConfig(port=50051))
    app.enable_messaging(KafkaConfig(bootstrap_servers="localhost:9092", group_id="my-app"))
    app.enable_rabbitmq(RabbitMQConfig(url="amqp://localhost:5672/"))
    

  4. No breaking changes from v0.8.0 - all existing code continues to work. GraphQL, gRPC, and messaging are purely additive features.

External Dependencies¶

Some v0.9.0 features require external services to be running:

Feature Required Service Default Port
GraphQL None (built-in) N/A
gRPC None (built-in) 50051
Kafka Apache Kafka broker 9092
RabbitMQ RabbitMQ server 5672

If external services are not available, the framework will log a warning at startup but will not prevent the application from running. REST endpoints remain fully functional.


Contributors¶

Thanks to all contributors who made this release possible!


Full Changelog¶

See the complete changelog for all changes.