Runtime & transports

The Go core is a codec, but it also ships an optional, still zero-dependency App runtime plus a Transport interface and an in-memory transport. Broker drivers live in separate modules so the core stays dependency-free: …/redis (built on go-redis), …/amqp (built on amqp091-go), …/sqs (built on aws-sdk-go-v2), …/azureservicebus (built on azservicebus) and …/pulsar (built on the pure-Go pulsar-client-go).

The App runtime

func NewApp(transport Transport, opts ...AppOption) *App

Options: WithDefaultQueue(string) (default "default"), WithMaxAttempts(int) (default 3), WithUnknownURNStrategy(string) (default StrategyFail), WithDeadLetter(bool), WithDeadLetterQueue(string), WithDeadLetterSuffix(string) (default ".dlq"), WithPollTimeout(time.Duration).

Methods:

  • Handle(urn string, handler Handler) — register a handler. Handler is func(ctx, env Envelope) error; returning an error triggers retry/dead-letter, nil acks.
  • Publish(ctx, urn string, data map[string]any, opts ...Option) (string, error) — builds, encodes and publishes the canonical envelope; returns meta.id.
  • Consume(ctx, queue ...string) error — blocks, routing by URN until ctx is cancelled (one bad message never stops the loop). Run(ctx) is the alias on the default queue.
  • Drain(ctx, queue string, max int) (int, error) — process up to max (or until empty), returns the count. Handy for tests / one-shot workers.

Unknown-URN strategies are the string constants StrategyFail, StrategyDelete, StrategyRelease, StrategyDeadLetter.

In-memory example

package main

import (
	"context"
	"fmt"

	babelqueue "github.com/babelqueue/babelqueue-go"
)

func main() {
	ctx := context.Background()
	app := babelqueue.NewApp(babelqueue.NewInMemoryTransport(),
		babelqueue.WithDefaultQueue("orders"),
		babelqueue.WithDeadLetter(true),
	)

	app.Handle("urn:babel:orders:created", func(ctx context.Context, env babelqueue.Envelope) error {
		fmt.Println(env.Data["order_id"], env.TraceID)
		return nil
	})

	app.Publish(ctx, "urn:babel:orders:created", map[string]any{"order_id": 1042})
	app.Drain(ctx, "orders", 0) // Consume(ctx) blocks for a long-running worker
}

Redis transport

go get github.com/babelqueue/babelqueue-go/redis

Reliable-queue pattern (RPUSH to produce, BLMOVE to a :processing list to reserve, LREM to ack). Create it with a URL or your own client:

import (
	babelqueue "github.com/babelqueue/babelqueue-go"
	"github.com/babelqueue/babelqueue-go/redis"
)

tr, err := redis.New("redis://localhost:6379/0") // or redis.NewWithClient(client)
if err != nil {
	return err
}
defer tr.Close()

app := babelqueue.NewApp(tr, babelqueue.WithDefaultQueue("orders"))
app.Handle("urn:babel:orders:created", func(ctx context.Context, env babelqueue.Envelope) error {
	return nil // handle env.Data
})

app.Publish(ctx, "urn:babel:orders:created", map[string]any{"order_id": 1042})
return app.Consume(ctx) // blocks

Option: redis.WithProcessingSuffix(string) (default ":processing").

RabbitMQ (AMQP) transport

go get github.com/babelqueue/babelqueue-go/amqp

Durable queue, persistent delivery, basic.get + manual ack (at-least-once). The connection is lazy — it (re)connects on first use:

import (
	babelqueue "github.com/babelqueue/babelqueue-go"
	"github.com/babelqueue/babelqueue-go/amqp"
)

tr := amqp.New("amqp://guest:guest@localhost:5672/")
defer tr.Close()

app := babelqueue.NewApp(tr, babelqueue.WithDefaultQueue("orders"))
app.Handle("urn:babel:orders:created", func(ctx context.Context, env babelqueue.Envelope) error {
	return nil
})

app.Publish(ctx, "urn:babel:orders:created", map[string]any{"order_id": 1042})
return app.Consume(ctx)

Each published message carries the contract AMQP properties so consumers can route without parsing the body: type = URN, correlationId = trace_id, messageId = meta.id, plus x-attempts / x-schema-version / x-source-lang headers, application/json + persistent delivery.

Amazon SQS transport

go get github.com/babelqueue/babelqueue-go/sqs

A babelqueue.Transport over the official AWS SDK (aws-sdk-go-v2): the canonical envelope is the MessageBody, projected onto native MessageAttributes; a visibility-timeout reserve → DeleteMessage ack gives at-least-once. Build it with your AWS config (region from the standard chain) or your own client:

import (
	babelqueue "github.com/babelqueue/babelqueue-go"
	"github.com/babelqueue/babelqueue-go/sqs"
)

tr, err := sqs.New(ctx, sqs.WithRegion("eu-central-1")) // or sqs.NewWithClient(client)
if err != nil {
	return err
}

app := babelqueue.NewApp(tr, babelqueue.WithDefaultQueue("orders"))
app.Handle("urn:babel:orders:created", func(ctx context.Context, env babelqueue.Envelope) error {
	return nil
})

app.Publish(ctx, "urn:babel:orders:created", map[string]any{"order_id": 1042})
return app.Consume(ctx) // blocks

Options: WithEndpoint (LocalStack/ElasticMQ), WithQueueURLPrefix (skip GetQueueUrl), WithWaitTimeSeconds, WithVisibilityTimeout, WithFIFO, WithMessageGroupID, WithContentDedup, WithClient. Each message carries the §3 MessageAttributes (bq-job = URN, bq-trace-id = trace_id, bq-message-id = meta.id, plus bq-schema-version / bq-source-lang / bq-created-at) so a consumer can route without decoding the body; attempts is reconciled to ApproximateReceiveCount − 1. See the SQS binding.

Azure Service Bus transport

go get github.com/babelqueue/babelqueue-go/azureservicebus

A babelqueue.Transport over the official Azure SDK (azservicebus): the canonical envelope is the message Body, projected onto native fields (Subject = URN, CorrelationID = trace_id, MessageID = meta.id) plus the bq- application properties; a PeekLock reserve → CompleteMessage ack gives at-least-once. Build it from a connection string, your own *azservicebus.Client, or an injected client:

import (
	babelqueue "github.com/babelqueue/babelqueue-go"
	"github.com/babelqueue/babelqueue-go/azureservicebus"
)

tr, err := azureservicebus.New(azureservicebus.WithConnectionString(connStr))
if err != nil {
	return err
}

app := babelqueue.NewApp(tr, babelqueue.WithDefaultQueue("orders"))
app.Handle("urn:babel:orders:created", func(ctx context.Context, env babelqueue.Envelope) error {
	return nil
})

app.Publish(ctx, "urn:babel:orders:created", map[string]any{"order_id": 1042})
return app.Consume(ctx) // blocks

Options: WithAzureClient(*azservicebus.Client) (e.g. a namespace + token credential), WithClient, WithMaxWaitTime. attempts is reconciled to max(body.attempts, DeliveryCount − 1). Requires Go 1.23+ (the Azure SDK floor). See the Azure Service Bus binding.

Apache Pulsar transport

go get github.com/babelqueue/babelqueue-go/pulsar

A babelqueue.Transport over the pure-Go apache/pulsar-client-go (no CGo, no libpulsar): the canonical envelope is the message payload, projected onto native Pulsar message properties (bq-job = URN, bq-trace-id = trace_id, bq-message-id = meta.id, plus bq-schema-version / bq-source-lang / bq-attempts, all string→string); a receive → Ack gives at-least-once. Build it from a service URL, your own pulsar.Client, or an injected client:

import (
	babelqueue "github.com/babelqueue/babelqueue-go"
	"github.com/babelqueue/babelqueue-go/pulsar"
)

tr, err := pulsar.New(pulsar.WithURL("pulsar://localhost:6650"))
if err != nil {
	return err
}

app := babelqueue.NewApp(tr, babelqueue.WithDefaultQueue("orders"))
app.Handle("urn:babel:orders:created", func(ctx context.Context, env babelqueue.Envelope) error {
	return nil
})

app.Publish(ctx, "urn:babel:orders:created", map[string]any{"order_id": 1042})
return app.Consume(ctx) // blocks

Options: WithPulsarClient(pulsar.Client), WithClient, WithSubscription (default babelqueue), WithSubscriptionType (default Shared), WithTopicPrefix, WithMaxWaitTime. attempts is reconciled to max(body.attempts, RedeliveryCount) — the redelivery count is 0-based, so no −1. Requires Go 1.23+. See the Apache Pulsar binding.

Retry, attempts & dead-letter

On a handler error the top-level attempts counter is incremented and the message is re-published to the same queue until WithMaxAttempts is reached; after that, if WithDeadLetter(true) is set, it is routed to the dead-letter queue (default: the source queue + .dlq) with a dead_letter block — otherwise it is dropped. See the wire contract.