API Gateway ExampleΒΆ
This example builds an API gateway that sits in front of multiple backend services. It handles authentication, rate limiting, circuit breaking, and request routing -- all using Cello's built-in middleware.
ArchitectureΒΆ
ββββββββββββββββββββββββββ
Client ββββββββββββ>β API Gateway β
β β
β 1. Rate Limiting β
β 2. JWT Authentication β
β 3. Circuit Breaker β
β 4. Route to Service β
βββββ¬βββββββββ¬βββββββββ¬βββ
| | |
ββββββββββΌβββ βββββΌβββββ βββΌβββββββββ
β Users β β Orders β β Products β
β Service β β Serviceβ β Service β
βββββββββββββ ββββββββββ ββββββββββββ
- Rate Limiting protects backends from traffic spikes (token bucket, 100 req/min)
- JWT Authentication validates Bearer tokens and injects claims into context
- Circuit Breaker stops forwarding requests to failing services
- Routing maps gateway paths to the correct backend service
Full Source CodeΒΆ
#!/usr/bin/env python3
"""
API Gateway with Cello.
Demonstrates rate limiting, JWT auth, circuit breaking, and
routing to multiple backend services -- all in a single process.
"""
from cello import App, Blueprint, Response
import json
import time
import hashlib
import hmac
import base64
app = App()
app.enable_cors()
app.enable_logging()
app.enable_compression()
# ===================================================================
# Configuration
# ===================================================================
JWT_SECRET = "super-secret-gateway-key"
RATE_LIMIT = 100 # requests per window
RATE_WINDOW = 60 # seconds
# ===================================================================
# In-Memory Rate Limiter
# ===================================================================
rate_limit_store = {}
def check_rate_limit(client_id):
"""Token bucket rate limiter. Returns (allowed, remaining, reset_at)."""
now = time.time()
window_start = int(now / RATE_WINDOW) * RATE_WINDOW
reset_at = window_start + RATE_WINDOW
key = f"{client_id}:{window_start}"
if key not in rate_limit_store:
rate_limit_store[key] = 0
# Cleanup old windows
for k in list(rate_limit_store):
if not k.endswith(f":{window_start}"):
del rate_limit_store[k]
rate_limit_store[key] += 1
count = rate_limit_store[key]
remaining = max(0, RATE_LIMIT - count)
return count <= RATE_LIMIT, remaining, int(reset_at)
# ===================================================================
# Simple JWT Helpers
# ===================================================================
def create_token(user_id, role="user"):
"""Create a simple JWT-like token for demonstration."""
payload = json.dumps({"sub": user_id, "role": role, "exp": int(time.time()) + 3600})
encoded = base64.urlsafe_b64encode(payload.encode()).decode()
sig = hmac.new(JWT_SECRET.encode(), encoded.encode(), hashlib.sha256).hexdigest()[:16]
return f"{encoded}.{sig}"
def verify_token(token):
"""Verify and decode a token. Returns claims dict or None."""
try:
parts = token.split(".")
if len(parts) != 2:
return None
encoded, sig = parts
expected_sig = hmac.new(JWT_SECRET.encode(), encoded.encode(), hashlib.sha256).hexdigest()[:16]
if not hmac.compare_digest(sig, expected_sig):
return None
payload = json.loads(base64.urlsafe_b64decode(encoded))
if payload.get("exp", 0) < time.time():
return None
return payload
except Exception:
return None
# ===================================================================
# Circuit Breaker
# ===================================================================
circuit_state = {} # service -> {"failures": int, "open_since": float | None}
FAILURE_THRESHOLD = 5
RESET_TIMEOUT = 30 # seconds
def circuit_check(service_name):
"""Check if circuit is open. Returns (allowed, state_name)."""
state = circuit_state.setdefault(service_name, {"failures": 0, "open_since": None})
if state["open_since"] is not None:
elapsed = time.time() - state["open_since"]
if elapsed < RESET_TIMEOUT:
return False, "open"
# Half-open: allow one probe
return True, "half-open"
return True, "closed"
def circuit_record_success(service_name):
"""Record a successful response, closing the circuit."""
circuit_state[service_name] = {"failures": 0, "open_since": None}
def circuit_record_failure(service_name):
"""Record a failure, potentially opening the circuit."""
state = circuit_state.setdefault(service_name, {"failures": 0, "open_since": None})
state["failures"] += 1
if state["failures"] >= FAILURE_THRESHOLD:
state["open_since"] = time.time()
# ===================================================================
# Backend Service Simulators
# ===================================================================
users_db = {
"1": {"id": "1", "name": "Alice", "email": "alice@example.com"},
"2": {"id": "2", "name": "Bob", "email": "bob@example.com"},
}
orders_db = {
"1": {"id": "1", "user_id": "1", "product": "Widget", "total": 29.99, "status": "shipped"},
"2": {"id": "2", "user_id": "2", "product": "Gadget", "total": 49.99, "status": "pending"},
}
products_db = {
"1": {"id": "1", "name": "Widget", "price": 29.99, "stock": 150},
"2": {"id": "2", "name": "Gadget", "price": 49.99, "stock": 75},
}
# ===================================================================
# Gateway Middleware
# ===================================================================
@app.before_request
def gateway_middleware(request):
"""Apply rate limiting and auth to all /api/* requests."""
# Skip non-API paths
if not request.path.startswith("/api/"):
return None
# Skip auth for token endpoint
if request.path == "/api/token":
return None
# --- Rate Limiting ---
client_ip = request.get_header("X-Forwarded-For") or "unknown"
allowed, remaining, reset_at = check_rate_limit(client_ip)
if not allowed:
resp = Response.json(
{"error": "Rate limit exceeded", "retry_after": reset_at - int(time.time())},
status=429,
)
resp.set_header("X-RateLimit-Limit", str(RATE_LIMIT))
resp.set_header("X-RateLimit-Remaining", "0")
resp.set_header("X-RateLimit-Reset", str(reset_at))
resp.set_header("Retry-After", str(reset_at - int(time.time())))
return resp
# Store rate limit info for after_request
request.context["rate_limit_remaining"] = remaining
request.context["rate_limit_reset"] = reset_at
# --- JWT Authentication ---
auth_header = request.get_header("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return Response.json({"error": "Missing Bearer token"}, status=401)
claims = verify_token(auth_header[7:])
if not claims:
return Response.json({"error": "Invalid or expired token"}, status=401)
request.context["user_id"] = claims["sub"]
request.context["role"] = claims.get("role", "user")
return None
# ===================================================================
# Token Endpoint (Public)
# ===================================================================
@app.post("/api/token")
def get_token(request):
"""Issue a JWT token (simplified, no password check)."""
data = request.json()
user_id = data.get("user_id", "anonymous")
role = data.get("role", "user")
token = create_token(user_id, role)
return {"token": token, "expires_in": 3600}
# ===================================================================
# Service Routes (Gateway -> Backend)
# ===================================================================
svc = Blueprint("/api/v1")
@svc.get("/users")
def route_users(request):
"""Route to user service."""
allowed, state = circuit_check("users")
if not allowed:
return Response.json({"error": "User service unavailable", "circuit": state}, status=503)
circuit_record_success("users")
return {"users": list(users_db.values())}
@svc.get("/users/{id}")
def route_user(request):
"""Route to user service for a single user."""
allowed, state = circuit_check("users")
if not allowed:
return Response.json({"error": "User service unavailable", "circuit": state}, status=503)
user = users_db.get(request.params["id"])
if not user:
circuit_record_success("users")
return Response.json({"error": "User not found"}, status=404)
circuit_record_success("users")
return user
@svc.get("/orders")
def route_orders(request):
"""Route to order service."""
allowed, state = circuit_check("orders")
if not allowed:
return Response.json({"error": "Order service unavailable", "circuit": state}, status=503)
# Scope to authenticated user (unless admin)
user_id = request.context.get("user_id")
role = request.context.get("role")
if role == "admin":
results = list(orders_db.values())
else:
results = [o for o in orders_db.values() if o["user_id"] == user_id]
circuit_record_success("orders")
return {"orders": results}
@svc.get("/products")
def route_products(request):
"""Route to product service."""
allowed, state = circuit_check("products")
if not allowed:
return Response.json({"error": "Product service unavailable", "circuit": state}, status=503)
circuit_record_success("products")
return {"products": list(products_db.values())}
@svc.get("/products/{id}")
def route_product(request):
"""Route to product service for a single product."""
allowed, state = circuit_check("products")
if not allowed:
return Response.json({"error": "Product service unavailable", "circuit": state}, status=503)
product = products_db.get(request.params["id"])
if not product:
circuit_record_success("products")
return Response.json({"error": "Product not found"}, status=404)
circuit_record_success("products")
return product
# ===================================================================
# Health & Status
# ===================================================================
@app.get("/health")
def health(request):
"""Gateway health check."""
services = {}
for name in ["users", "orders", "products"]:
_, state = circuit_check(name)
services[name] = state
return {"status": "healthy", "circuits": services}
@app.get("/")
def gateway_index(request):
"""Gateway service discovery."""
return {
"gateway": "Cello API Gateway",
"endpoints": [
{"path": "/api/token", "method": "POST", "auth": False},
{"path": "/api/v1/users", "method": "GET", "auth": True},
{"path": "/api/v1/orders", "method": "GET", "auth": True},
{"path": "/api/v1/products", "method": "GET", "auth": True},
{"path": "/health", "method": "GET", "auth": False},
],
}
# ===================================================================
# Register and Run
# ===================================================================
app.register_blueprint(svc)
if __name__ == "__main__":
app.run(host="127.0.0.1", port=8000)
Testing with curlΒΆ
Get a TokenΒΆ
# Obtain an access token
curl -X POST http://127.0.0.1:8000/api/token \
-H "Content-Type: application/json" \
-d '{"user_id": "1", "role": "admin"}'
Authenticated RequestsΒΆ
# Replace TOKEN with the value from the /api/token response
TOKEN="<paste-token-here>"
# List users
curl -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8000/api/v1/users
# Get a specific user
curl -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8000/api/v1/users/1
# List orders (admin sees all; regular user sees own orders)
curl -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8000/api/v1/orders
# List products
curl -H "Authorization: Bearer $TOKEN" http://127.0.0.1:8000/api/v1/products
Health CheckΒΆ
Key PatternsΒΆ
Rate LimitingΒΆ
The gateway enforces a fixed-window rate limit per client IP. Standard X-RateLimit-* headers are included in responses. When the limit is exceeded, a 429 Too Many Requests response is returned with a Retry-After header.
In production, use Cello's built-in RateLimitConfig with the token bucket or sliding window algorithm for more precise control.
JWT AuthenticationΒΆ
All /api/v1/* requests require a Bearer token. The gateway verifies the signature and expiration, then injects the user ID and role into request.context for downstream handlers. The /api/token endpoint is excluded from auth checks.
Circuit BreakerΒΆ
Each backend service has independent circuit breaker state. After 5 consecutive failures the circuit opens for 30 seconds, returning 503 Service Unavailable immediately. After the timeout, one probe request is allowed (half-open state). A successful probe closes the circuit.
Service RoutingΒΆ
The gateway maps public paths (/api/v1/users) to internal service logic. In a distributed deployment, these handlers would forward the request to separate microservices via HTTP or gRPC.
Next StepsΒΆ
- Multi-Tenant SaaS - Add tenant isolation
- Event Sourcing - Event-driven architecture patterns
- Microservices - Deploy services independently