Apache Pulsar transport
BabelQueue.Pulsar is an Apache Pulsar transport on the .NET core. It sends the
canonical envelope as the message payload with the
§5 property projection, and consumes by
routing each message to a handler by URN — so a message it produces is consumed by any
other BabelQueue SDK, and vice-versa.
Install
dotnet add package BabelQueue.Pulsar
Requirements: .NET 8. It pulls BabelQueue.Core and DotPulsar (a pure-C# Pulsar
client) transitively.
Produce
using DotPulsar;
using DotPulsar.Extensions;
using BabelQueue.Pulsar;
await using var client = PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar://localhost:6650")).Build();
await using var producer = client.NewProducer(Schema.ByteArray).Topic("orders").Create();
var id = await new PulsarPublisher(producer)
.PublishAsync("urn:babel:orders:created", new Dictionary<string, object?> { ["order_id"] = 1042 });
PublishAsync returns the message meta.id; pass a traceId to continue a trace, or a
delay (TimeSpan) to schedule native delayed delivery (DeliverAtTime).
Consume
await using var consumer = client.NewConsumer(Schema.ByteArray)
.Topic("orders").SubscriptionName("babelqueue")
.SubscriptionType(SubscriptionType.Shared).Create();
var handlers = new Dictionary<string, BabelHandler>
{
["urn:babel:orders:created"] = (envelope, message, ct) =>
{
// envelope.Data, envelope.TraceId, envelope.Attempts ...
return Task.CompletedTask;
},
};
var babel = new PulsarConsumer(consumer, handlers, new PulsarConsumerOptions
{
OnError = (err, env, msg) => Console.Error.WriteLine(err),
});
await babel.RunAsync(cancellationToken);
A throwing handler negativeAcknowledges the message — the broker redelivers it and
increments RedeliveryCount (at-least-once); with a native DeadLetterPolicy it eventually
moves to the cross-language <queue>.dlq topic. The consumer routes purely on the bq-job
property, so it never decodes a message it cannot handle.
Contract mapping (§5)
| Envelope | Apache Pulsar |
|---|---|
| body | message payload (byte-identical across SDKs) |
job (URN) | property bq-job (consumer routes on this) |
trace_id | property bq-trace-id |
meta.id | property bq-message-id |
meta.schema_version | property bq-schema-version |
meta.lang | property bq-source-lang |
meta.created_at | PublishTime (mirror; body authoritative) |
attempts | property bq-attempts, reconciled to max(body, RedeliveryCount) |
| reserve / ack / retry | Acknowledge / redeliver |
Pulsar properties are string→string, so bq-attempts carries the contract attempts and is
authoritative; RedeliveryCount is 0-based, so the reconciliation maps it directly with
no −1. The DotPulsar IProducer / IConsumer / IMessage interfaces are mockable, so
the unit tests use Moq — no Pulsar, no network. The envelope is unchanged (schema_version
stays 1); Apache Pulsar is purely additive.