Apache Pulsar transport
com.babelqueue:babelqueue-pulsar is an Apache Pulsar transport on the Java core. It sends
the canonical envelope as the message payload with the
§5 property projection, and consumes by
routing each message to a handler by URN — so a message it produces is consumed by any
other BabelQueue SDK, and vice-versa.
Install
Maven:
<dependency>
<groupId>com.babelqueue</groupId>
<artifactId>babelqueue-pulsar</artifactId>
<version>1.0.0</version>
</dependency>
Requirements: Java 17+. It pulls babelqueue-core and org.apache.pulsar:pulsar-client
transitively. You supply the Pulsar Producer / Consumer.
Produce
import com.babelqueue.pulsar.PulsarPublisher;
import org.apache.pulsar.client.api.*;
import java.util.Map;
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<byte[]> producer = client.newProducer().topic("orders").create();
String id = PulsarPublisher.create(producer)
.publish("urn:babel:orders:created", Map.of("order_id", 1042L));
publish(urn, data) returns the message meta.id; overloads add a traceId and a relative
Duration delay (native deliverAfter).
Consume
import com.babelqueue.pulsar.PulsarConsumer;
import org.apache.pulsar.client.api.*;
Consumer<byte[]> sub = client.newConsumer()
.topic("orders").subscriptionName("babelqueue")
.subscriptionType(SubscriptionType.Shared).subscribe();
PulsarConsumer consumer = PulsarConsumer.builder(sub)
.handler("urn:babel:orders:created", (envelope, message) -> {
// envelope.data(), envelope.traceId(), envelope.attempts() ...
})
.onError((error, envelope, message) -> error.printStackTrace())
.build();
while (running) {
consumer.poll();
}
A throwing handler negativeAcknowledges the message — the broker redelivers it and
increments getRedeliveryCount() (at-least-once); with a native DeadLetterPolicy it
eventually moves to the cross-language <queue>.dlq topic. The consumer routes purely on the
bq-job property, so it never decodes a message it cannot handle.
Contract mapping (§5)
| Envelope | Apache Pulsar |
|---|---|
| body | message payload (byte-identical across SDKs) |
job (URN) | property bq-job (consumer routes on this) |
trace_id | property bq-trace-id |
meta.id | property bq-message-id |
meta.schema_version | property bq-schema-version |
meta.lang | property bq-source-lang |
meta.created_at | publish time (mirror; body authoritative) |
attempts | property bq-attempts, reconciled to max(body, getRedeliveryCount()) |
| reserve / ack / retry | acknowledge / negativeAcknowledge |
Pulsar properties are string→string, so bq-attempts carries the contract attempts and is
authoritative; getRedeliveryCount() is 0-based, so the reconciliation maps it directly
with no −1. The Pulsar Producer / Consumer / Message interfaces are mocked with
Mockito 5 — no Pulsar, no network. The envelope is unchanged (schema_version stays 1);
Apache Pulsar is purely additive.