Consuming messages
With the runtime
Register a handler for a URN with the @app.handler decorator, then start the
worker with app.run(). Your function receives the decoded data dict and the
meta block — regardless of which language produced the message. Add a third
message parameter to get the full envelope (including trace_id).
from babelqueue import BabelQueue
app = BabelQueue("redis://localhost:6379/0", queue="orders")
@app.handler("urn:babel:orders:created")
def on_order_created(data, meta, message):
trace_id = message["trace_id"] # cross-service correlation
print(f"[{trace_id}] order {data['order_id']} for {data['amount']}")
# ... run scoring / enrichment / ML pipeline
app.run() # consume forever (Ctrl-C to stop)
With the codec only
Managing the broker yourself? Pull the bytes from your client, then decode and route on the URN:
from babelqueue import EnvelopeCodec
incoming = EnvelopeCodec.decode(body) # body pulled from your broker
urn = incoming["job"] # "urn:babel:orders:created"
data = incoming["data"] # {"order_id": 1042, ...}
trace_id = incoming["trace_id"] # correlate across services
Python is the AI/ML & data consumer in a polyglot topology: a Laravel app can
publish urn:babel:orders:created, and this Python worker enriches it or feeds it
into a model — reading the same canonical envelope, carrying the same trace_id
end to end.
That’s the whole loop: any producer, any consumer, one schema.