Message Queue Integration¶
Cello provides first-class support for message queues with decorator-based consumers, producers, and configuration for Kafka, RabbitMQ, and AWS SQS.
Quick Start¶
from cello import App, KafkaConfig, RabbitMQConfig, SqsConfig
from cello.messaging import kafka_consumer, kafka_producer, Message, MessageResult
app = App()
app.enable_messaging(KafkaConfig(brokers="localhost:9092", group_id="my-app"))
@kafka_consumer(topic="orders", group="order-processor")
async def process_order(message: Message):
order = message.json()
await fulfill_order(order)
return MessageResult.ACK
@app.post("/orders")
@kafka_producer(topic="order-events")
def create_order(request):
return {"order_id": 1, "status": "created"}
app.run()
Kafka¶
Consumer Decorator¶
Subscribe to Kafka topics with the @kafka_consumer decorator:
from cello.messaging import kafka_consumer, Message, MessageResult
@kafka_consumer(topic="user-events", group="processors", auto_commit=True)
async def handle_user_event(message: Message):
data = message.json()
print(f"Received: {data}")
return MessageResult.ACK
| Parameter | Default | Description |
|---|---|---|
topic | Required | Kafka topic to consume from |
group | "default" | Consumer group ID |
auto_commit | True | Auto-commit offsets |
Producer Decorator¶
Auto-publish handler return values to a Kafka topic:
from cello.messaging import kafka_producer
@app.post("/events")
@kafka_producer(topic="app-events")
def publish_event(request):
return {"type": "user_signup", "user_id": 42}
KafkaConfig¶
from cello import KafkaConfig
# Full configuration
config = KafkaConfig(
brokers="broker1:9092,broker2:9092",
group_id="my-service",
client_id="cello-app",
auto_commit=True,
session_timeout_ms=30000,
max_poll_records=500
)
# Local development
config = KafkaConfig.local() # localhost:9092
app.enable_messaging(config)
Manual Producer/Consumer¶
from cello.messaging import Producer, Consumer, Message
# Producer
producer = await Producer.connect(config)
await producer.send("my-topic", {"key": "value"})
await producer.send_batch([
{"topic": "t1", "value": "msg1"},
{"topic": "t1", "value": "msg2"},
])
await producer.close()
# Consumer
consumer = await Consumer.connect(config)
await consumer.subscribe(["topic1", "topic2"])
messages = await consumer.poll(timeout_ms=1000)
for msg in messages:
print(msg.text)
await consumer.commit(msg)
await consumer.close()
Message Class¶
Wrapper for consumed messages with convenient accessors:
message = Message(id="1", topic="orders", key="order-1", value='{"id": 1}')
# Access properties
message.id # "1"
message.topic # "orders"
message.key # "order-1"
message.text # '{"id": 1}' (string)
message.json() # {"id": 1} (parsed dict)
# Acknowledgment
message.ack() # Acknowledge message
message.nack() # Negative acknowledge
MessageResult¶
Constants for consumer return values:
from cello.messaging import MessageResult
MessageResult.ACK # "ack" - Successfully processed
MessageResult.NACK # "nack" - Processing failed
MessageResult.REJECT # "reject" - Reject permanently
MessageResult.REQUEUE # "requeue" - Requeue for retry
MessageResult.DEAD_LETTER # "dead_letter" - Send to DLQ
RabbitMQ¶
from cello import RabbitMQConfig
config = RabbitMQConfig(
url="amqp://guest:guest@localhost:5672/",
vhost="/",
prefetch_count=10,
heartbeat=60
)
# Local development
config = RabbitMQConfig.local() # amqp://guest:guest@localhost:5672
app.enable_rabbitmq(config)
| Option | Default | Description |
|---|---|---|
url | amqp://localhost | AMQP connection URL |
vhost | / | Virtual host |
prefetch_count | 10 | Prefetch count for consumers |
heartbeat | 60 | Heartbeat interval in seconds |
AWS SQS¶
from cello import SqsConfig
config = SqsConfig(
region="us-east-1",
queue_url="https://sqs.us-east-1.amazonaws.com/123456789/my-queue",
max_messages=10,
wait_time_secs=20
)
# Local development (LocalStack)
config = SqsConfig.local(queue_url="http://localhost:4566/000000000000/test-queue")
app.enable_sqs(config)
| Option | Default | Description |
|---|---|---|
region | us-east-1 | AWS region |
queue_url | Required | SQS queue URL |
endpoint_url | None | Custom endpoint (for LocalStack) |
max_messages | 10 | Max messages per poll |
wait_time_secs | 20 | Long poll wait time |
API Reference¶
| Class/Function | Description |
|---|---|
kafka_consumer | Decorator to subscribe handler to Kafka topic |
kafka_producer | Decorator to auto-publish handler returns |
KafkaConfig | Kafka broker configuration (Rust-backed) |
RabbitMQConfig | RabbitMQ connection configuration (Rust-backed) |
SqsConfig | AWS SQS configuration (Rust-backed) |
Message | Consumed message wrapper |
MessageResult | Consumer result constants |
Producer | Manual message producer |
Consumer | Manual message consumer |