Amazon SQS transport

BabelQueue.Sqs is an Amazon SQS transport on the .NET core. It sends the canonical envelope as the MessageBody with the §3 SQS MessageAttributes, and consumes by routing each message to a handler by URN — so a message it produces is consumed by any other BabelQueue SDK, and vice-versa.

Install

dotnet add package BabelQueue.Sqs

Requirements: .NET 8. It pulls BabelQueue.Core and the AWS SDK for .NET (AWSSDK.SQS) transitively. You supply an IAmazonSQS client, so its credentials/region come from your standard AWS configuration.

Produce

using Amazon.SQS;
using BabelQueue.Sqs;

IAmazonSQS sqs = new AmazonSQSClient(); // your AWS config / credentials
var url = "https://sqs.eu-central-1.amazonaws.com/123456789012/orders";

var publisher = new SqsPublisher(sqs, url);
string id = await publisher.PublishAsync(
    "urn:babel:orders:created",
    new Dictionary<string, object?> { ["order_id"] = 1042 });

PublishAsync returns the message meta.id; pass a traceId to continue an existing trace. FIFO queues: new SqsPublisher(sqs, url, fifo: true) (the queue URL must end in .fifo).

Consume

Map URNs to handlers; PollAsync receives one batch, routes each message, and deletes the ones handled. A throwing handler leaves the message for SQS to redeliver after the visibility timeout (at-least-once):

using Amazon.SQS;
using BabelQueue;
using BabelQueue.Sqs;

var handlers = new Dictionary<string, BabelHandler>
{
    ["urn:babel:orders:created"] = (envelope, message, ct) =>
    {
        var orderId = envelope.Data["order_id"];
        // ... use envelope.TraceId, envelope.Data
        return Task.CompletedTask;
    },
};

var consumer = new SqsConsumer(sqs, url, handlers, new SqsConsumerOptions
{
    OnUnknownUrn = (envelope, message, ct) => Task.CompletedTask, // no handler for this URN
    OnError = (error, envelope, message) => { /* loop never stops */ },
});

while (running)
{
    await consumer.PollAsync();
}

A handler is BabelHandlerdelegate Task BabelHandler(Envelope, Message, CancellationToken). attempts is reconciled to ApproximateReceiveCount − 1 for the handler (never lowering a runtime-incremented count). Point the AmazonSQSClient’s endpoint at LocalStack/ElasticMQ for local testing.

Because the wire format is the canonical envelope, the message a handler receives may have been produced by any BabelQueue SDK.