Amazon SQS transport
BabelQueue.Sqs is an Amazon SQS transport on the .NET core. It sends the
canonical envelope as the MessageBody with the
§3 SQS MessageAttributes, and consumes by
routing each message to a handler by URN — so a message it produces is consumed by any
other BabelQueue SDK, and vice-versa.
Install
dotnet add package BabelQueue.Sqs
Requirements: .NET 8. It pulls BabelQueue.Core and the AWS SDK for .NET
(AWSSDK.SQS) transitively. You supply an IAmazonSQS client, so its credentials/region
come from your standard AWS configuration.
Produce
using Amazon.SQS;
using BabelQueue.Sqs;
IAmazonSQS sqs = new AmazonSQSClient(); // your AWS config / credentials
var url = "https://sqs.eu-central-1.amazonaws.com/123456789012/orders";
var publisher = new SqsPublisher(sqs, url);
string id = await publisher.PublishAsync(
"urn:babel:orders:created",
new Dictionary<string, object?> { ["order_id"] = 1042 });
PublishAsync returns the message meta.id; pass a traceId to continue an existing
trace. FIFO queues: new SqsPublisher(sqs, url, fifo: true) (the queue URL must end in
.fifo).
Consume
Map URNs to handlers; PollAsync receives one batch, routes each message, and deletes the
ones handled. A throwing handler leaves the message for SQS to redeliver after the
visibility timeout (at-least-once):
using Amazon.SQS;
using BabelQueue;
using BabelQueue.Sqs;
var handlers = new Dictionary<string, BabelHandler>
{
["urn:babel:orders:created"] = (envelope, message, ct) =>
{
var orderId = envelope.Data["order_id"];
// ... use envelope.TraceId, envelope.Data
return Task.CompletedTask;
},
};
var consumer = new SqsConsumer(sqs, url, handlers, new SqsConsumerOptions
{
OnUnknownUrn = (envelope, message, ct) => Task.CompletedTask, // no handler for this URN
OnError = (error, envelope, message) => { /* loop never stops */ },
});
while (running)
{
await consumer.PollAsync();
}
A handler is BabelHandler — delegate Task BabelHandler(Envelope, Message, CancellationToken).
attempts is reconciled to ApproximateReceiveCount − 1 for the handler (never lowering a
runtime-incremented count). Point the AmazonSQSClient’s endpoint at LocalStack/ElasticMQ
for local testing.
Because the wire format is the canonical envelope, the message a handler receives may have been produced by any BabelQueue SDK.