Apache Pulsar transport

com.babelqueue:babelqueue-pulsar is an Apache Pulsar transport on the Java core. It sends the canonical envelope as the message payload with the §5 property projection, and consumes by routing each message to a handler by URN — so a message it produces is consumed by any other BabelQueue SDK, and vice-versa.

Install

Maven:

<dependency>
    <groupId>com.babelqueue</groupId>
    <artifactId>babelqueue-pulsar</artifactId>
    <version>1.0.0</version>
</dependency>

Requirements: Java 17+. It pulls babelqueue-core and org.apache.pulsar:pulsar-client transitively. You supply the Pulsar Producer / Consumer.

Produce

import com.babelqueue.pulsar.PulsarPublisher;
import org.apache.pulsar.client.api.*;
import java.util.Map;

PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<byte[]> producer = client.newProducer().topic("orders").create();

String id = PulsarPublisher.create(producer)
    .publish("urn:babel:orders:created", Map.of("order_id", 1042L));

publish(urn, data) returns the message meta.id; overloads add a traceId and a relative Duration delay (native deliverAfter).

Consume

import com.babelqueue.pulsar.PulsarConsumer;
import org.apache.pulsar.client.api.*;

Consumer<byte[]> sub = client.newConsumer()
    .topic("orders").subscriptionName("babelqueue")
    .subscriptionType(SubscriptionType.Shared).subscribe();

PulsarConsumer consumer = PulsarConsumer.builder(sub)
    .handler("urn:babel:orders:created", (envelope, message) -> {
        // envelope.data(), envelope.traceId(), envelope.attempts() ...
    })
    .onError((error, envelope, message) -> error.printStackTrace())
    .build();

while (running) {
    consumer.poll();
}

A throwing handler negativeAcknowledges the message — the broker redelivers it and increments getRedeliveryCount() (at-least-once); with a native DeadLetterPolicy it eventually moves to the cross-language <queue>.dlq topic. The consumer routes purely on the bq-job property, so it never decodes a message it cannot handle.

Contract mapping (§5)

EnvelopeApache Pulsar
bodymessage payload (byte-identical across SDKs)
job (URN)property bq-job (consumer routes on this)
trace_idproperty bq-trace-id
meta.idproperty bq-message-id
meta.schema_versionproperty bq-schema-version
meta.langproperty bq-source-lang
meta.created_atpublish time (mirror; body authoritative)
attemptsproperty bq-attempts, reconciled to max(body, getRedeliveryCount())
reserve / ack / retryacknowledge / negativeAcknowledge

Pulsar properties are string→string, so bq-attempts carries the contract attempts and is authoritative; getRedeliveryCount() is 0-based, so the reconciliation maps it directly with no −1. The Pulsar Producer / Consumer / Message interfaces are mocked with Mockito 5 — no Pulsar, no network. The envelope is unchanged (schema_version stays 1); Apache Pulsar is purely additive.