Adapters & transports
The @babelqueue/core codec is framework-agnostic. Thin packages wire it into common
Node stacks: @babelqueue/bullmq (BullMQ jobs) and @babelqueue/nestjs (a NestJS
module built on the BullMQ adapter), plus broker transports — @babelqueue/sqs (Amazon
SQS), @babelqueue/azure-service-bus (Azure Service Bus) and @babelqueue/pulsar
(Apache Pulsar).
BullMQ — @babelqueue/bullmq
npm install @babelqueue/bullmq bullmq
bullmq ^5 is a peer dependency; @babelqueue/core is pulled in for you.
It exports two functions:
publish(queue, urn, data, options?) → Promise<string>— adds a BullMQ job whose name is the URN and whose data is the canonical envelope; returnsmeta.id.optionsis{ traceId?, jobsOptions? }(jobsOptionsis BullMQ’s own — delay, attempts, backoff…).processor(handlers, options?) → (job) => Promise— a BullMQ processor function that validates each envelope, resolves its URN and routes tohandlers[urn].options.onUnknownUrn(envelope, job)handles URNs with no mapped handler.
Produce
import { Queue } from "bullmq";
import { publish } from "@babelqueue/bullmq";
const queue = new Queue("orders", { connection: { host: "localhost", port: 6379 } });
const id = await publish(queue, "urn:babel:orders:created", { order_id: 1042 });
Consume
import { Worker } from "bullmq";
import { processor } from "@babelqueue/bullmq";
new Worker(
"orders",
processor(
{
"urn:babel:orders:created": async (env, job) => {
console.log(env.data.order_id, env.trace_id);
},
},
{ onUnknownUrn: (env, job) => console.warn("no handler for", job.name) },
),
{ connection: { host: "localhost", port: 6379 } },
);
A handler is (envelope, job) => unknown | Promise<unknown>. A non-conformant
envelope is rejected (BullMQ then retries/fails per its options); an unmapped URN
throws unless onUnknownUrn is supplied.
NestJS — @babelqueue/nestjs
npm install @babelqueue/nestjs @nestjs/common bullmq
@nestjs/common ^10 || ^11 and bullmq ^5 are peers; it builds on
@babelqueue/bullmq.
Register the module and inject the publisher:
import { Module } from "@nestjs/common";
import { BabelQueueModule } from "@babelqueue/nestjs";
@Module({
imports: [
BabelQueueModule.forRoot({
queue: "orders",
connection: { host: "localhost", port: 6379 },
}),
],
})
export class AppModule {}
import { Injectable } from "@nestjs/common";
import { BabelQueuePublisher } from "@babelqueue/nestjs";
@Injectable()
export class Orders {
constructor(private readonly babelQueue: BabelQueuePublisher) {}
create() {
return this.babelQueue.publish("urn:babel:orders:created", { order_id: 1042 });
}
}
forRoot({ queue, connection, queueOptions? }) provides an injectable
BabelQueuePublisher (publish(urn, data, { traceId? }) → Promise<string>) over the
BullMQ queue. For consuming, build a plain BullMQ Worker with the processor
re-exported from @babelqueue/nestjs:
import { Worker } from "bullmq";
import { processor } from "@babelqueue/nestjs";
new Worker(
"orders",
processor({ "urn:babel:orders:created": async (env) => { /* ... */ } }),
{ connection: { host: "localhost", port: 6379 } },
);
Amazon SQS — @babelqueue/sqs
npm install @babelqueue/sqs @aws-sdk/client-sqs
@aws-sdk/client-sqs is an optional peer — you provide the SQS client (the aggregated
SQS class satisfies the transport structurally). It sends the canonical envelope as the
MessageBody with the §3 MessageAttributes,
and consumes by routing each message to a handler by URN.
Produce
import { SQS } from "@aws-sdk/client-sqs";
import { SqsPublisher } from "@babelqueue/sqs";
const sqs = new SQS({ region: "eu-central-1" });
const url = "https://sqs.eu-central-1.amazonaws.com/123456789012/orders";
const id = await new SqsPublisher(sqs, url).publish("urn:babel:orders:created", { order_id: 1042 });
publish(urn, data, { traceId? }) returns the message meta.id. FIFO queues:
new SqsPublisher(sqs, url, { fifo: true }) (the queue URL must end in .fifo).
Consume
import { SqsConsumer } from "@babelqueue/sqs";
const consumer = new SqsConsumer(
sqs,
url,
{
"urn:babel:orders:created": async (env, message) => {
console.log(env.data.order_id, env.trace_id);
},
},
{ onUnknownUrn: (env, msg) => {}, onError: (err, env, msg) => {} },
);
await consumer.poll(); // receive one batch, route, delete handled; loop this
A throwing handler leaves the message for SQS to redeliver after the visibility timeout
(at-least-once); attempts is reconciled to ApproximateReceiveCount − 1. Point the
client’s endpoint at LocalStack/ElasticMQ for local testing.
Azure Service Bus — @babelqueue/azure-service-bus
npm install @babelqueue/azure-service-bus @azure/service-bus
@azure/service-bus is an optional peer — you provide the sender/receiver (a
ServiceBusSender / ServiceBusReceiver satisfies the adapter structurally). It sends the
canonical envelope as the message body with the native §4 projection (subject = URN,
correlationId = trace_id, messageId = meta.id, plus the bq- application
properties), and consumes by routing each message to a handler by URN.
import { ServiceBusClient } from "@azure/service-bus";
import { AsbPublisher, AsbConsumer } from "@babelqueue/azure-service-bus";
const client = new ServiceBusClient(connectionString); // or (namespace, credential)
// produce
const id = await new AsbPublisher(client.createSender("orders"))
.publish("urn:babel:orders:created", { order_id: 1042 });
// consume (PeekLock)
const consumer = new AsbConsumer(
client.createReceiver("orders"),
{
"urn:babel:orders:created": async (env, message) => {
console.log(env.data.order_id, env.trace_id);
},
},
{ onError: (err) => console.error(err) },
);
await consumer.run();
Delayed delivery: publish(urn, data, { delayMs: 300000 }) → native
scheduledEnqueueTimeUtc. A throwing handler abandons the message (the broker redelivers,
incrementing deliveryCount); attempts is reconciled to
max(body.attempts, deliveryCount − 1). See the
Azure Service Bus binding.
Apache Pulsar — @babelqueue/pulsar
npm install @babelqueue/pulsar pulsar-client
pulsar-client is an optional peer — you provide the producer/consumer (a Producer /
Consumer satisfies the adapter structurally). It sends the canonical envelope as the
message payload with the §5 property projection (bq-job = URN, bq-trace-id = trace_id,
bq-message-id = meta.id, plus bq-schema-version / bq-source-lang / bq-attempts,
all string→string), and consumes by routing each message to a handler by URN.
import Pulsar from "pulsar-client";
import { PulsarPublisher, PulsarConsumer } from "@babelqueue/pulsar";
const client = new Pulsar.Client({ serviceUrl: "pulsar://localhost:6650" });
// produce
const producer = await client.createProducer({ topic: "orders" });
const id = await new PulsarPublisher(producer)
.publish("urn:babel:orders:created", { order_id: 1042 });
// consume (Shared subscription)
const sub = await client.subscribe({
topic: "orders",
subscription: "babelqueue",
subscriptionType: "Shared",
});
const consumer = new PulsarConsumer(
sub,
{
"urn:babel:orders:created": async (env, message) => {
console.log(env.data.order_id, env.trace_id);
},
},
{ onError: (err) => console.error(err) },
);
await consumer.run();
Delayed delivery: publish(urn, data, { delayMs: 300000 }) → native deliverAfter. A
throwing handler negativeAcknowledges the message (the broker redelivers, incrementing
getRedeliveryCount()); attempts is reconciled to max(body.attempts, redeliveryCount) —
the redelivery count is 0-based, so no −1. See the
Apache Pulsar binding.
Whatever a Node service produces is the canonical envelope, so it is consumed natively by any other BabelQueue SDK — see the wire contract.