Runtime & transports
The Go core is a codec, but it also ships an optional, still zero-dependency
App runtime plus a Transport interface and an in-memory transport. Broker
drivers live in separate modules so the core stays dependency-free:
…/redis (built on go-redis), …/amqp (built on amqp091-go), …/sqs (built on
aws-sdk-go-v2), …/azureservicebus (built on azservicebus) and …/pulsar (built on the
pure-Go pulsar-client-go).
The App runtime
func NewApp(transport Transport, opts ...AppOption) *App
Options: WithDefaultQueue(string) (default "default"), WithMaxAttempts(int)
(default 3), WithUnknownURNStrategy(string) (default StrategyFail),
WithDeadLetter(bool), WithDeadLetterQueue(string), WithDeadLetterSuffix(string)
(default ".dlq"), WithPollTimeout(time.Duration).
Methods:
Handle(urn string, handler Handler)— register a handler.Handlerisfunc(ctx, env Envelope) error; returning an error triggers retry/dead-letter,nilacks.Publish(ctx, urn string, data map[string]any, opts ...Option) (string, error)— builds, encodes and publishes the canonical envelope; returnsmeta.id.Consume(ctx, queue ...string) error— blocks, routing by URN untilctxis cancelled (one bad message never stops the loop).Run(ctx)is the alias on the default queue.Drain(ctx, queue string, max int) (int, error)— process up tomax(or until empty), returns the count. Handy for tests / one-shot workers.
Unknown-URN strategies are the string constants StrategyFail, StrategyDelete,
StrategyRelease, StrategyDeadLetter.
In-memory example
package main
import (
"context"
"fmt"
babelqueue "github.com/babelqueue/babelqueue-go"
)
func main() {
ctx := context.Background()
app := babelqueue.NewApp(babelqueue.NewInMemoryTransport(),
babelqueue.WithDefaultQueue("orders"),
babelqueue.WithDeadLetter(true),
)
app.Handle("urn:babel:orders:created", func(ctx context.Context, env babelqueue.Envelope) error {
fmt.Println(env.Data["order_id"], env.TraceID)
return nil
})
app.Publish(ctx, "urn:babel:orders:created", map[string]any{"order_id": 1042})
app.Drain(ctx, "orders", 0) // Consume(ctx) blocks for a long-running worker
}
Redis transport
go get github.com/babelqueue/babelqueue-go/redis
Reliable-queue pattern (RPUSH to produce, BLMOVE to a :processing list to
reserve, LREM to ack). Create it with a URL or your own client:
import (
babelqueue "github.com/babelqueue/babelqueue-go"
"github.com/babelqueue/babelqueue-go/redis"
)
tr, err := redis.New("redis://localhost:6379/0") // or redis.NewWithClient(client)
if err != nil {
return err
}
defer tr.Close()
app := babelqueue.NewApp(tr, babelqueue.WithDefaultQueue("orders"))
app.Handle("urn:babel:orders:created", func(ctx context.Context, env babelqueue.Envelope) error {
return nil // handle env.Data
})
app.Publish(ctx, "urn:babel:orders:created", map[string]any{"order_id": 1042})
return app.Consume(ctx) // blocks
Option: redis.WithProcessingSuffix(string) (default ":processing").
RabbitMQ (AMQP) transport
go get github.com/babelqueue/babelqueue-go/amqp
Durable queue, persistent delivery, basic.get + manual ack (at-least-once). The
connection is lazy — it (re)connects on first use:
import (
babelqueue "github.com/babelqueue/babelqueue-go"
"github.com/babelqueue/babelqueue-go/amqp"
)
tr := amqp.New("amqp://guest:guest@localhost:5672/")
defer tr.Close()
app := babelqueue.NewApp(tr, babelqueue.WithDefaultQueue("orders"))
app.Handle("urn:babel:orders:created", func(ctx context.Context, env babelqueue.Envelope) error {
return nil
})
app.Publish(ctx, "urn:babel:orders:created", map[string]any{"order_id": 1042})
return app.Consume(ctx)
Each published message carries the contract AMQP properties so consumers can route
without parsing the body: type = URN, correlationId = trace_id, messageId =
meta.id, plus x-attempts / x-schema-version / x-source-lang headers,
application/json + persistent delivery.
Amazon SQS transport
go get github.com/babelqueue/babelqueue-go/sqs
A babelqueue.Transport over the official AWS SDK (aws-sdk-go-v2): the canonical
envelope is the MessageBody, projected onto native MessageAttributes; a
visibility-timeout reserve → DeleteMessage ack gives at-least-once. Build it with your
AWS config (region from the standard chain) or your own client:
import (
babelqueue "github.com/babelqueue/babelqueue-go"
"github.com/babelqueue/babelqueue-go/sqs"
)
tr, err := sqs.New(ctx, sqs.WithRegion("eu-central-1")) // or sqs.NewWithClient(client)
if err != nil {
return err
}
app := babelqueue.NewApp(tr, babelqueue.WithDefaultQueue("orders"))
app.Handle("urn:babel:orders:created", func(ctx context.Context, env babelqueue.Envelope) error {
return nil
})
app.Publish(ctx, "urn:babel:orders:created", map[string]any{"order_id": 1042})
return app.Consume(ctx) // blocks
Options: WithEndpoint (LocalStack/ElasticMQ), WithQueueURLPrefix (skip GetQueueUrl),
WithWaitTimeSeconds, WithVisibilityTimeout, WithFIFO, WithMessageGroupID,
WithContentDedup, WithClient. Each message carries the §3 MessageAttributes
(bq-job = URN, bq-trace-id = trace_id, bq-message-id = meta.id, plus
bq-schema-version / bq-source-lang / bq-created-at) so a consumer can route without
decoding the body; attempts is reconciled to ApproximateReceiveCount − 1. See the
SQS binding.
Azure Service Bus transport
go get github.com/babelqueue/babelqueue-go/azureservicebus
A babelqueue.Transport over the official Azure SDK (azservicebus): the canonical
envelope is the message Body, projected onto native fields (Subject = URN,
CorrelationID = trace_id, MessageID = meta.id) plus the bq- application
properties; a PeekLock reserve → CompleteMessage ack gives at-least-once. Build it from a
connection string, your own *azservicebus.Client, or an injected client:
import (
babelqueue "github.com/babelqueue/babelqueue-go"
"github.com/babelqueue/babelqueue-go/azureservicebus"
)
tr, err := azureservicebus.New(azureservicebus.WithConnectionString(connStr))
if err != nil {
return err
}
app := babelqueue.NewApp(tr, babelqueue.WithDefaultQueue("orders"))
app.Handle("urn:babel:orders:created", func(ctx context.Context, env babelqueue.Envelope) error {
return nil
})
app.Publish(ctx, "urn:babel:orders:created", map[string]any{"order_id": 1042})
return app.Consume(ctx) // blocks
Options: WithAzureClient(*azservicebus.Client) (e.g. a namespace + token credential),
WithClient, WithMaxWaitTime. attempts is reconciled to
max(body.attempts, DeliveryCount − 1). Requires Go 1.23+ (the Azure SDK floor). See
the Azure Service Bus binding.
Apache Pulsar transport
go get github.com/babelqueue/babelqueue-go/pulsar
A babelqueue.Transport over the pure-Go apache/pulsar-client-go (no CGo, no
libpulsar): the canonical envelope is the message payload, projected onto native Pulsar
message properties (bq-job = URN, bq-trace-id = trace_id, bq-message-id = meta.id,
plus bq-schema-version / bq-source-lang / bq-attempts, all string→string); a
receive → Ack gives at-least-once. Build it from a service URL, your own pulsar.Client,
or an injected client:
import (
babelqueue "github.com/babelqueue/babelqueue-go"
"github.com/babelqueue/babelqueue-go/pulsar"
)
tr, err := pulsar.New(pulsar.WithURL("pulsar://localhost:6650"))
if err != nil {
return err
}
app := babelqueue.NewApp(tr, babelqueue.WithDefaultQueue("orders"))
app.Handle("urn:babel:orders:created", func(ctx context.Context, env babelqueue.Envelope) error {
return nil
})
app.Publish(ctx, "urn:babel:orders:created", map[string]any{"order_id": 1042})
return app.Consume(ctx) // blocks
Options: WithPulsarClient(pulsar.Client), WithClient, WithSubscription (default
babelqueue), WithSubscriptionType (default Shared), WithTopicPrefix,
WithMaxWaitTime. attempts is reconciled to max(body.attempts, RedeliveryCount) — the
redelivery count is 0-based, so no −1. Requires Go 1.23+. See the
Apache Pulsar binding.
Retry, attempts & dead-letter
On a handler error the top-level attempts counter is incremented and the message
is re-published to the same queue until WithMaxAttempts is reached; after that, if
WithDeadLetter(true) is set, it is routed to the dead-letter queue (default: the
source queue + .dlq) with a dead_letter block — otherwise it is dropped. See the
wire contract.