Consuming messages

With the runtime

Register a handler for a URN with the @app.handler decorator, then start the worker with app.run(). Your function receives the decoded data dict and the meta block — regardless of which language produced the message. Add a third message parameter to get the full envelope (including trace_id).

from babelqueue import BabelQueue

app = BabelQueue("redis://localhost:6379/0", queue="orders")

@app.handler("urn:babel:orders:created")
def on_order_created(data, meta, message):
    trace_id = message["trace_id"]  # cross-service correlation
    print(f"[{trace_id}] order {data['order_id']} for {data['amount']}")
    # ... run scoring / enrichment / ML pipeline

app.run()  # consume forever (Ctrl-C to stop)

With the codec only

Managing the broker yourself? Pull the bytes from your client, then decode and route on the URN:

from babelqueue import EnvelopeCodec

incoming = EnvelopeCodec.decode(body)   # body pulled from your broker
urn      = incoming["job"]              # "urn:babel:orders:created"
data     = incoming["data"]            # {"order_id": 1042, ...}
trace_id = incoming["trace_id"]        # correlate across services

Python is the AI/ML & data consumer in a polyglot topology: a Laravel app can publish urn:babel:orders:created, and this Python worker enriches it or feeds it into a model — reading the same canonical envelope, carrying the same trace_id end to end.

That’s the whole loop: any producer, any consumer, one schema.