Adapters & transports

The @babelqueue/core codec is framework-agnostic. Thin packages wire it into common Node stacks: @babelqueue/bullmq (BullMQ jobs) and @babelqueue/nestjs (a NestJS module built on the BullMQ adapter), plus broker transports — @babelqueue/sqs (Amazon SQS), @babelqueue/azure-service-bus (Azure Service Bus) and @babelqueue/pulsar (Apache Pulsar).

BullMQ — @babelqueue/bullmq

npm install @babelqueue/bullmq bullmq

bullmq ^5 is a peer dependency; @babelqueue/core is pulled in for you.

It exports two functions:

  • publish(queue, urn, data, options?) → Promise<string> — adds a BullMQ job whose name is the URN and whose data is the canonical envelope; returns meta.id. options is { traceId?, jobsOptions? } (jobsOptions is BullMQ’s own — delay, attempts, backoff…).
  • processor(handlers, options?) → (job) => Promise — a BullMQ processor function that validates each envelope, resolves its URN and routes to handlers[urn]. options.onUnknownUrn(envelope, job) handles URNs with no mapped handler.

Produce

import { Queue } from "bullmq";
import { publish } from "@babelqueue/bullmq";

const queue = new Queue("orders", { connection: { host: "localhost", port: 6379 } });

const id = await publish(queue, "urn:babel:orders:created", { order_id: 1042 });

Consume

import { Worker } from "bullmq";
import { processor } from "@babelqueue/bullmq";

new Worker(
  "orders",
  processor(
    {
      "urn:babel:orders:created": async (env, job) => {
        console.log(env.data.order_id, env.trace_id);
      },
    },
    { onUnknownUrn: (env, job) => console.warn("no handler for", job.name) },
  ),
  { connection: { host: "localhost", port: 6379 } },
);

A handler is (envelope, job) => unknown | Promise<unknown>. A non-conformant envelope is rejected (BullMQ then retries/fails per its options); an unmapped URN throws unless onUnknownUrn is supplied.

NestJS — @babelqueue/nestjs

npm install @babelqueue/nestjs @nestjs/common bullmq

@nestjs/common ^10 || ^11 and bullmq ^5 are peers; it builds on @babelqueue/bullmq.

Register the module and inject the publisher:

import { Module } from "@nestjs/common";
import { BabelQueueModule } from "@babelqueue/nestjs";

@Module({
  imports: [
    BabelQueueModule.forRoot({
      queue: "orders",
      connection: { host: "localhost", port: 6379 },
    }),
  ],
})
export class AppModule {}
import { Injectable } from "@nestjs/common";
import { BabelQueuePublisher } from "@babelqueue/nestjs";

@Injectable()
export class Orders {
  constructor(private readonly babelQueue: BabelQueuePublisher) {}

  create() {
    return this.babelQueue.publish("urn:babel:orders:created", { order_id: 1042 });
  }
}

forRoot({ queue, connection, queueOptions? }) provides an injectable BabelQueuePublisher (publish(urn, data, { traceId? }) → Promise<string>) over the BullMQ queue. For consuming, build a plain BullMQ Worker with the processor re-exported from @babelqueue/nestjs:

import { Worker } from "bullmq";
import { processor } from "@babelqueue/nestjs";

new Worker(
  "orders",
  processor({ "urn:babel:orders:created": async (env) => { /* ... */ } }),
  { connection: { host: "localhost", port: 6379 } },
);

Amazon SQS — @babelqueue/sqs

npm install @babelqueue/sqs @aws-sdk/client-sqs

@aws-sdk/client-sqs is an optional peer — you provide the SQS client (the aggregated SQS class satisfies the transport structurally). It sends the canonical envelope as the MessageBody with the §3 MessageAttributes, and consumes by routing each message to a handler by URN.

Produce

import { SQS } from "@aws-sdk/client-sqs";
import { SqsPublisher } from "@babelqueue/sqs";

const sqs = new SQS({ region: "eu-central-1" });
const url = "https://sqs.eu-central-1.amazonaws.com/123456789012/orders";

const id = await new SqsPublisher(sqs, url).publish("urn:babel:orders:created", { order_id: 1042 });

publish(urn, data, { traceId? }) returns the message meta.id. FIFO queues: new SqsPublisher(sqs, url, { fifo: true }) (the queue URL must end in .fifo).

Consume

import { SqsConsumer } from "@babelqueue/sqs";

const consumer = new SqsConsumer(
  sqs,
  url,
  {
    "urn:babel:orders:created": async (env, message) => {
      console.log(env.data.order_id, env.trace_id);
    },
  },
  { onUnknownUrn: (env, msg) => {}, onError: (err, env, msg) => {} },
);

await consumer.poll(); // receive one batch, route, delete handled; loop this

A throwing handler leaves the message for SQS to redeliver after the visibility timeout (at-least-once); attempts is reconciled to ApproximateReceiveCount − 1. Point the client’s endpoint at LocalStack/ElasticMQ for local testing.

Azure Service Bus — @babelqueue/azure-service-bus

npm install @babelqueue/azure-service-bus @azure/service-bus

@azure/service-bus is an optional peer — you provide the sender/receiver (a ServiceBusSender / ServiceBusReceiver satisfies the adapter structurally). It sends the canonical envelope as the message body with the native §4 projection (subject = URN, correlationId = trace_id, messageId = meta.id, plus the bq- application properties), and consumes by routing each message to a handler by URN.

import { ServiceBusClient } from "@azure/service-bus";
import { AsbPublisher, AsbConsumer } from "@babelqueue/azure-service-bus";

const client = new ServiceBusClient(connectionString); // or (namespace, credential)

// produce
const id = await new AsbPublisher(client.createSender("orders"))
  .publish("urn:babel:orders:created", { order_id: 1042 });

// consume (PeekLock)
const consumer = new AsbConsumer(
  client.createReceiver("orders"),
  {
    "urn:babel:orders:created": async (env, message) => {
      console.log(env.data.order_id, env.trace_id);
    },
  },
  { onError: (err) => console.error(err) },
);
await consumer.run();

Delayed delivery: publish(urn, data, { delayMs: 300000 }) → native scheduledEnqueueTimeUtc. A throwing handler abandons the message (the broker redelivers, incrementing deliveryCount); attempts is reconciled to max(body.attempts, deliveryCount − 1). See the Azure Service Bus binding.

Apache Pulsar — @babelqueue/pulsar

npm install @babelqueue/pulsar pulsar-client

pulsar-client is an optional peer — you provide the producer/consumer (a Producer / Consumer satisfies the adapter structurally). It sends the canonical envelope as the message payload with the §5 property projection (bq-job = URN, bq-trace-id = trace_id, bq-message-id = meta.id, plus bq-schema-version / bq-source-lang / bq-attempts, all string→string), and consumes by routing each message to a handler by URN.

import Pulsar from "pulsar-client";
import { PulsarPublisher, PulsarConsumer } from "@babelqueue/pulsar";

const client = new Pulsar.Client({ serviceUrl: "pulsar://localhost:6650" });

// produce
const producer = await client.createProducer({ topic: "orders" });
const id = await new PulsarPublisher(producer)
  .publish("urn:babel:orders:created", { order_id: 1042 });

// consume (Shared subscription)
const sub = await client.subscribe({
  topic: "orders",
  subscription: "babelqueue",
  subscriptionType: "Shared",
});
const consumer = new PulsarConsumer(
  sub,
  {
    "urn:babel:orders:created": async (env, message) => {
      console.log(env.data.order_id, env.trace_id);
    },
  },
  { onError: (err) => console.error(err) },
);
await consumer.run();

Delayed delivery: publish(urn, data, { delayMs: 300000 }) → native deliverAfter. A throwing handler negativeAcknowledges the message (the broker redelivers, incrementing getRedeliveryCount()); attempts is reconciled to max(body.attempts, redeliveryCount) — the redelivery count is 0-based, so no −1. See the Apache Pulsar binding.

Whatever a Node service produces is the canonical envelope, so it is consumed natively by any other BabelQueue SDK — see the wire contract.