Celery adapter
The Celery adapter builds a BabelQueue runtime on your existing Celery broker, so your Celery app can produce and consume canonical envelopes that interoperate with every other BabelQueue SDK — no separate broker, no second worker.
Install
pip install "babelqueue[celery]"
The [celery] extra adds celery>=5. The core stays zero-dependency; celery is
imported lazily.
Build a runtime on the Celery app
from_celery(celery_app, **kwargs) reads the broker URL off the Celery app and
returns a BabelQueue runtime. Extra kwargs (queue, max_attempts,
dead_letter, on_unknown_urn, …) are forwarded to the runtime:
from celery import Celery
from babelqueue.celery import from_celery, install_worker
celery_app = Celery("svc", broker="redis://localhost:6379/0")
bq = from_celery(celery_app, queue="orders", dead_letter=True)
# Consume a canonical envelope (produced by any SDK)
@bq.handler("urn:babel:orders:created")
def on_created(data, meta):
print("order", data["order_id"])
# Produce a canonical envelope
bq.publish("urn:babel:orders:created", {"order_id": 1042})
A handler is (data, meta) — or (data, meta, message) to receive the full envelope
dict (including trace_id).
Drain inside the Celery worker
install_worker(celery_app, babel_app=None, *, queue=None, **kwargs) registers a
Celery bootstep that, when celery worker boots, runs the BabelQueue consume
loop in a background daemon thread — so envelopes are drained alongside your normal
Celery tasks:
install_worker(celery_app, bq, queue="orders")
celery -A svc worker
If you omit babel_app, it is built via from_celery(celery_app, **kwargs) for you.
See the wire contract for the envelope your handlers receive.