Celery adapter

The Celery adapter builds a BabelQueue runtime on your existing Celery broker, so your Celery app can produce and consume canonical envelopes that interoperate with every other BabelQueue SDK — no separate broker, no second worker.

Install

pip install "babelqueue[celery]"

The [celery] extra adds celery>=5. The core stays zero-dependency; celery is imported lazily.

Build a runtime on the Celery app

from_celery(celery_app, **kwargs) reads the broker URL off the Celery app and returns a BabelQueue runtime. Extra kwargs (queue, max_attempts, dead_letter, on_unknown_urn, …) are forwarded to the runtime:

from celery import Celery
from babelqueue.celery import from_celery, install_worker

celery_app = Celery("svc", broker="redis://localhost:6379/0")

bq = from_celery(celery_app, queue="orders", dead_letter=True)

# Consume a canonical envelope (produced by any SDK)
@bq.handler("urn:babel:orders:created")
def on_created(data, meta):
    print("order", data["order_id"])

# Produce a canonical envelope
bq.publish("urn:babel:orders:created", {"order_id": 1042})

A handler is (data, meta) — or (data, meta, message) to receive the full envelope dict (including trace_id).

Drain inside the Celery worker

install_worker(celery_app, babel_app=None, *, queue=None, **kwargs) registers a Celery bootstep that, when celery worker boots, runs the BabelQueue consume loop in a background daemon thread — so envelopes are drained alongside your normal Celery tasks:

install_worker(celery_app, bq, queue="orders")
celery -A svc worker

If you omit babel_app, it is built via from_celery(celery_app, **kwargs) for you.

See the wire contract for the envelope your handlers receive.