Apache Pulsar transport

BabelQueue.Pulsar is an Apache Pulsar transport on the .NET core. It sends the canonical envelope as the message payload with the §5 property projection, and consumes by routing each message to a handler by URN — so a message it produces is consumed by any other BabelQueue SDK, and vice-versa.

Install

dotnet add package BabelQueue.Pulsar

Requirements: .NET 8. It pulls BabelQueue.Core and DotPulsar (a pure-C# Pulsar client) transitively.

Produce

using DotPulsar;
using DotPulsar.Extensions;
using BabelQueue.Pulsar;

await using var client = PulsarClient.Builder()
    .ServiceUrl(new Uri("pulsar://localhost:6650")).Build();

await using var producer = client.NewProducer(Schema.ByteArray).Topic("orders").Create();
var id = await new PulsarPublisher(producer)
    .PublishAsync("urn:babel:orders:created", new Dictionary<string, object?> { ["order_id"] = 1042 });

PublishAsync returns the message meta.id; pass a traceId to continue a trace, or a delay (TimeSpan) to schedule native delayed delivery (DeliverAtTime).

Consume

await using var consumer = client.NewConsumer(Schema.ByteArray)
    .Topic("orders").SubscriptionName("babelqueue")
    .SubscriptionType(SubscriptionType.Shared).Create();

var handlers = new Dictionary<string, BabelHandler>
{
    ["urn:babel:orders:created"] = (envelope, message, ct) =>
    {
        // envelope.Data, envelope.TraceId, envelope.Attempts ...
        return Task.CompletedTask;
    },
};
var babel = new PulsarConsumer(consumer, handlers, new PulsarConsumerOptions
{
    OnError = (err, env, msg) => Console.Error.WriteLine(err),
});
await babel.RunAsync(cancellationToken);

A throwing handler negativeAcknowledges the message — the broker redelivers it and increments RedeliveryCount (at-least-once); with a native DeadLetterPolicy it eventually moves to the cross-language <queue>.dlq topic. The consumer routes purely on the bq-job property, so it never decodes a message it cannot handle.

Contract mapping (§5)

EnvelopeApache Pulsar
bodymessage payload (byte-identical across SDKs)
job (URN)property bq-job (consumer routes on this)
trace_idproperty bq-trace-id
meta.idproperty bq-message-id
meta.schema_versionproperty bq-schema-version
meta.langproperty bq-source-lang
meta.created_atPublishTime (mirror; body authoritative)
attemptsproperty bq-attempts, reconciled to max(body, RedeliveryCount)
reserve / ack / retryAcknowledge / redeliver

Pulsar properties are string→string, so bq-attempts carries the contract attempts and is authoritative; RedeliveryCount is 0-based, so the reconciliation maps it directly with no −1. The DotPulsar IProducer / IConsumer / IMessage interfaces are mockable, so the unit tests use Moq — no Pulsar, no network. The envelope is unchanged (schema_version stays 1); Apache Pulsar is purely additive.