Documentation
¶
Overview ¶
Package warren is a modern, ergonomic Go client for AMQP 0-9-1 (RabbitMQ).
It wraps github.com/rabbitmq/amqp091-go with a generics-based, type-safe API that handles the production-grade concerns every team rebuilds on top of the low-level driver: supervised reconnect with publisher confirms, pluggable codecs over typed messages, channel pooling, centralized topology declaration, built-in observability (metrics, logging, OpenTelemetry), and common patterns (RPC, delayed messages, batch consume and publish, dead-letter routing).
See SPEC.md in the repository root for the complete public API surface.
Index ¶
- Variables
- func AMQPCode(err error) (uint16, bool)
- func DefaultConnectionName() string
- func IsPermanent(err error) bool
- func IsTransient(err error) bool
- type Batch
- type BatchConsumer
- type BatchConsumerBuilder
- func (b *BatchConsumerBuilder[M]) Args(args Headers) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) Build() (*BatchConsumer[M], error)
- func (b *BatchConsumerBuilder[M]) ChannelQoS() *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) Codec(c codec.Codec) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) Exclusive() *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) FlushAfter(d time.Duration) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) HandlerTimeout(d time.Duration) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) HandlerTimeoutVerdict(v TimeoutVerdict) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) MaxRedeliveries(n int) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) Metrics(cm metrics.ConsumerMetrics) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) OnCancel(fn func(reason string)) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) Prefetch(count uint16) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) PrefetchBytes(_ uint) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) Priority(p int) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) Queue(name string) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) Size(n uint) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) Tag(consumerTag string) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) TopologyHint(q Queue) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) Tracer(t otel.Tracer) *BatchConsumerBuilder[M]
- func (b *BatchConsumerBuilder[M]) WithoutMetrics() *BatchConsumerBuilder[M]
- type BatchFixture
- type BatchHandler
- type Binding
- type Caller
- type CallerBuilder
- func (b *CallerBuilder[Req, Resp]) Build() (*Caller[Req, Resp], error)
- func (b *CallerBuilder[Req, Resp]) Codec(c codec.Codec) *CallerBuilder[Req, Resp]
- func (b *CallerBuilder[Req, Resp]) Exchange(name string) *CallerBuilder[Req, Resp]
- func (b *CallerBuilder[Req, Resp]) Prefetch(count uint16) *CallerBuilder[Req, Resp]
- func (b *CallerBuilder[Req, Resp]) RoutingKey(rk string) *CallerBuilder[Req, Resp]
- func (b *CallerBuilder[Req, Resp]) UseExclusiveReplyQueue() *CallerBuilder[Req, Resp]
- type Connection
- func (c *Connection) AuthenticatedUser() string
- func (c *Connection) Close(ctx context.Context) error
- func (c *Connection) ConConnAt(idx int) *managedConn
- func (c *Connection) ForceReconnect() error
- func (c *Connection) Health(ctx context.Context) error
- func (c *Connection) NumConConns() int
- func (c *Connection) NumPubConns() int
- func (c *Connection) PubConnAt(idx int) *managedConn
- type Consumer
- func (c *Consumer[M]) Close(_ context.Context) error
- func (c *Consumer[M]) Consume(ctx context.Context, h Handler[M]) error
- func (c *Consumer[M]) ConsumeRaw(ctx context.Context, h RawHandler[M]) error
- func (c *Consumer[M]) Health(ctx context.Context) (*ConsumerHealth, error)
- func (c *Consumer[M]) Pause(ctx context.Context) error
- func (c *Consumer[M]) Resume(ctx context.Context) error
- type ConsumerBuilder
- func (b *ConsumerBuilder[M]) AllowMissingDLX() *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) Args(args Headers) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) AutoAck() *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) Build() (*Consumer[M], error)
- func (b *ConsumerBuilder[M]) ChannelQoS() *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) Codec(c codec.Codec) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) Concurrency(n uint) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) Exclusive() *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) HandlerTimeout(d time.Duration) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) HandlerTimeoutVerdict(v TimeoutVerdict) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) MaxInFlightBytes(n int64) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) MaxRedeliveries(n int) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) Metrics(cm metrics.ConsumerMetrics) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) OnCancel(fn func(reason string)) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) Prefetch(count uint16) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) PrefetchBytes(_ uint) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) Priority(p int) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) Queue(name string) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) Tag(consumerTag string) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) Topology(t *Topology) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) TopologyHint(q Queue) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) Tracer(t otel.Tracer) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) WithDedupe(store DedupeStore, ttl time.Duration) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) WithQueueDepthSampler(interval time.Duration) *ConsumerBuilder[M]
- func (b *ConsumerBuilder[M]) WithoutMetrics() *ConsumerBuilder[M]
- type ConsumerHealth
- type DeadLetter
- type DedupeStore
- type Delivery
- func (d *Delivery[M]) Ack() error
- func (d *Delivery[M]) AckIf(handlerErr error) error
- func (d *Delivery[M]) Body() *M
- func (d *Delivery[M]) CorrelationID() string
- func (d *Delivery[M]) DeathCount() int
- func (d *Delivery[M]) DeathCountByReason(reason string) int
- func (d *Delivery[M]) DeathReasons() []string
- func (d *Delivery[M]) DeliveryTag() uint64
- func (d *Delivery[M]) Headers() Headers
- func (d *Delivery[M]) MessageID() string
- func (d *Delivery[M]) Nack(requeue bool) error
- func (d *Delivery[M]) Redelivered() bool
- func (d *Delivery[M]) Timestamp() time.Time
- type DeliveryFixture
- type DeliveryMode
- type Exchange
- type ExchangeBinding
- type ExchangeKind
- type Handler
- type Headers
- type JitterStrategy
- type Message
- type Option
- func WithAddr(addr string) Option
- func WithAddrs(addrs []string) Option
- func WithAuth(username, password string) Option
- func WithChannelMax(n uint16) Option
- func WithChannelPoolSize(n int) Option
- func WithClientProperties(props map[string]any) Option
- func WithConnectDelay(d time.Duration) Option
- func WithConnectionName(name string) Option
- func WithConsumerConnections(n int) Option
- func WithDialer(fn func(network, addr string) (net.Conn, error)) Option
- func WithFrameMax(n uint32) Option
- func WithHeartbeat(d time.Duration) Option
- func WithLogger(l log.Logger) Option
- func WithMetrics(m metrics.ClientMetrics) Option
- func WithOnBlocked(fn func(reason string)) Option
- func WithOnReconnect(fn func()) Option
- func WithOnResubscribe(fn func(queue string)) Option
- func WithOnTopologyDegraded(fn func(error)) Option
- func WithPublisherConnections(n int) Option
- func WithReconnectBackoff(p RetryPolicy) Option
- func WithReconnectBarrierTimeout(d time.Duration) Option
- func WithSASLMechanism(m SASLMechanism) Option
- func WithTLSConfig(cfg *tls.Config) Option
- func WithTracer(t otel.Tracer) Option
- func WithVHost(vhost string) Option
- func WithoutMetrics() Option
- type OverflowPolicy
- type PublishResult
- type Publisher
- type PublisherBuilder
- func (b *PublisherBuilder[M]) Build() (*Publisher[M], error)
- func (b *PublisherBuilder[M]) Codec(c codec.Codec) *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) ConfirmTimeout(d time.Duration) *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) Exchange(name string) *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) Mandatory() *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) MaxMessageSizeBytes(n int) *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) Metrics(pm metrics.PublisherMetrics) *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) OnReturn(cb func(Return)) *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) PublishBatchMaxSize(n int) *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) PublishRetry(p RetryPolicy) *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) PublishTimeout(d time.Duration) *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) RoutingKey(rk string) *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) StampUserID() *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) Tracer(t otel.Tracer) *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) WithPublishRateLimit(perSec int) *PublisherBuilder[M]
- func (b *PublisherBuilder[M]) WithoutMetrics() *PublisherBuilder[M]
- type Queue
- type QueueType
- type RawHandler
- type Replier
- type ReplierBuilder
- func (b *ReplierBuilder[Req, Resp]) AllowMissingDLX() *ReplierBuilder[Req, Resp]
- func (b *ReplierBuilder[Req, Resp]) Build() (*Replier[Req, Resp], error)
- func (b *ReplierBuilder[Req, Resp]) Codec(c codec.Codec) *ReplierBuilder[Req, Resp]
- func (b *ReplierBuilder[Req, Resp]) ConfirmTimeout(d time.Duration) *ReplierBuilder[Req, Resp]
- func (b *ReplierBuilder[Req, Resp]) Metrics(cm metrics.ConsumerMetrics) *ReplierBuilder[Req, Resp]
- func (b *ReplierBuilder[Req, Resp]) OnError(fn func(ctx context.Context, req Req, err error)) *ReplierBuilder[Req, Resp]
- func (b *ReplierBuilder[Req, Resp]) Queue(name string) *ReplierBuilder[Req, Resp]
- func (b *ReplierBuilder[Req, Resp]) Topology(t *Topology) *ReplierBuilder[Req, Resp]
- type ReplyHandler
- type RetryPolicy
- type Return
- type ReturnedProperties
- type SASLMechanism
- type TimeoutVerdict
- type Topology
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotConnected is returned when Publish or Consume is called before Dial. ErrNotConnected = errors.New("warren: not connected") // ErrAlreadyClosed is returned when an operation is attempted on a resource that // has already been closed — either a Connection closed twice, or Ack/Nack/AckIf // called on a Delivery whose owning Consumer was shut down via Close(ctx). ErrAlreadyClosed = errors.New("warren: already closed") // ErrAlreadyResolved is returned by the second (and any later) Ack/Nack/AckIf // on a Delivery[M] that has already emitted its verdict frame — including a // late handler verdict after a HandlerTimeout fired. The resolved-once guard // is a single atomic CAS: only the winner emits a frame; losers are no-ops // returning this sentinel. It prevents a second basic.ack/nack frame that // would channel-close with PRECONDITION_FAILED (406) and take out every // in-flight handler on that channel. See SPEC §6.3. ErrAlreadyResolved = errors.New("warren: delivery already resolved") // ErrShutdown is returned when an operation is attempted while the connection is shutting down. ErrShutdown = errors.New("warren: client is shutting down") // ErrChannelClosed is returned when the broker closes the channel (e.g. after a protocol error). ErrChannelClosed = errors.New("warren: channel closed") // ErrConnectionBlocked is returned when the broker blocks the connection due to a memory or disk alarm. ErrConnectionBlocked = errors.New("warren: connection blocked by broker") // ErrChannelPoolExhausted is returned when ctx is cancelled before a semaphore // token becomes available. It wraps ctx.Err() so callers can distinguish a // voluntary cancellation (context.Canceled) from a deadline (context.DeadlineExceeded) // via errors.Is. Note: IsTransient treats this error as transient EXCEPT when the // wrapped ctx.Err() is context.Canceled — an upstream cancellation will never // succeed on retry, so IsTransient returns false for it (T54). A deadline // (context.DeadlineExceeded) remains transient. ErrChannelPoolExhausted = errors.New("warren: channel pool exhausted") // ErrConfirmTimeout is returned when the broker does not confirm a publish within the deadline. ErrConfirmTimeout = errors.New("warren: publisher confirm timeout") // ErrUnroutable is returned when a mandatory publish has no matching binding (basic.return received). ErrUnroutable = errors.New("warren: mandatory publish was returned") // ErrPublishNacked is returned when the broker sends basic.nack (e.g. overflow=reject-publish or // reject-publish-dlx). A disk/memory alarm does NOT nack — it raises connection.blocked, surfaced as // ErrConnectionBlocked, not ErrPublishNacked. ErrPublishNacked = errors.New("warren: broker nacked publish") // ErrPartialBatch is returned when one or more messages in a PublishBatch fail. ErrPartialBatch = errors.New("warren: batch publish partially failed") // ErrBatchTooLarge is returned when PublishBatch is called with more messages than PublishBatchMaxSize allows. ErrBatchTooLarge = errors.New("warren: PublishBatch exceeds max in-flight budget") // ErrMessageTooLarge is returned when an encoded message body exceeds the // publisher's MaxMessageSizeBytes guardrail. The publish is rejected locally // before any channel is opened — protecting the publisher from OOM and the // broker from frame fragmentation pressure. Classified as permanent: the same // body will never succeed on retry. The broker-side equivalent (reply code // 311, ErrContentTooLarge) only fires after the payload has been allocated // and partially sent; this local guard avoids that round-trip. ErrMessageTooLarge = errors.New("warren: message body exceeds MaxMessageSizeBytes") // ErrRateLimited is returned when a publish cannot acquire a local rate-limit // token before its context is cancelled (WithPublishRateLimit). It wraps // ctx.Err() so callers can still distinguish a deadline from a cancellation via // errors.Is. Classified transient: the same publish may succeed once the local // token bucket refills. Throttled-but-completed publishes do NOT return this // error — they only increment publisher_rate_limited_total. ErrRateLimited = errors.New("warren: publish rate limited") // ErrRequeue signals the consumer handler that the message should be nacked with requeue=true. ErrRequeue = errors.New("warren: nack with requeue") // ErrPoison signals the consumer handler that the message should be nacked without requeue. ErrPoison = errors.New("warren: poison message (nack no requeue)") // ErrMaxRedeliveries is returned when a message exceeds the MaxRedeliveries limit. ErrMaxRedeliveries = errors.New("warren: max redeliveries exceeded") // ErrConsumerCancelled is returned when the broker cancels the consumer via basic.cancel // (e.g. the queue was deleted or the exclusive lock was revoked). ErrConsumerCancelled = errors.New("warren: consumer cancelled by broker (basic.cancel)") // ErrInvalidMessage is returned when the message payload cannot be encoded or decoded, // or when a Message field value violates a SPEC constraint. ErrInvalidMessage = errors.New("warren: invalid message payload") // ErrTopologyMismatch is returned when Topology.Declare finds an existing queue or exchange // whose properties conflict with the requested declaration. It wraps ErrPreconditionFailed. ErrTopologyMismatch = errors.New("warren: topology mismatch") // ErrTopologyRedeclareFailed is returned when the reconnect barrier cannot redeclare the topology. // The connection enters a degraded state; this error is permanent until a successful redeclare. ErrTopologyRedeclareFailed = errors.New("warren: topology redeclare failed") // ErrReconnecting is returned while the connection is inside the synchronous reconnect barrier // (redeclare → re-subscribe → WithOnReconnect). Publish blocks until the barrier clears. Transient. ErrReconnecting = errors.New("warren: connection reconnecting") // ErrCallTimeout is returned when an RPC call exceeds its deadline. ErrCallTimeout = errors.New("warren: rpc call timed out") // ErrInvalidOptions is returned when a builder option value violates a SPEC constraint. ErrInvalidOptions = errors.New("warren: invalid options") // ErrContentTooLarge wraps AMQP reply code 311 (content-too-large). Channel-level. ErrContentTooLarge = errors.New("warren: content too large (311)") // ErrConnectionForced wraps AMQP reply code 320 (connection-forced). Connection-level. ErrConnectionForced = errors.New("warren: connection forced (320)") // ErrInvalidPath wraps AMQP reply code 402 (invalid-path). Connection-level. Permanent. ErrInvalidPath = errors.New("warren: invalid path (402)") // ErrAccessRefused wraps AMQP reply code 403 (access-refused). Channel-level. Permanent. ErrAccessRefused = errors.New("warren: access refused (403)") // ErrNotFound wraps AMQP reply code 404 (not-found). Channel-level. Permanent. ErrNotFound = errors.New("warren: not found (404)") // ErrResourceLocked wraps AMQP reply code 405 (resource-locked). Channel-level. Permanent. ErrResourceLocked = errors.New("warren: resource locked (405)") // ErrPreconditionFailed wraps AMQP reply code 406 (precondition-failed). Channel-level. Permanent. ErrPreconditionFailed = errors.New("warren: precondition failed (406)") // ErrFrameError wraps AMQP reply code 501 (frame-error). Connection-level. Permanent. ErrFrameError = errors.New("warren: frame error (501)") // ErrSyntaxError wraps AMQP reply code 502 (syntax-error). Connection-level. Permanent. ErrSyntaxError = errors.New("warren: syntax error (502)") // ErrCommandInvalid wraps AMQP reply code 503 (command-invalid). Connection-level. Permanent. ErrCommandInvalid = errors.New("warren: command invalid (503)") // ErrChannelError wraps AMQP reply code 504 (channel-error). Connection-level. Transient. ErrChannelError = errors.New("warren: channel error (504)") // ErrUnexpectedFrame wraps AMQP reply code 505 (unexpected-frame). Connection-level. Permanent. ErrUnexpectedFrame = errors.New("warren: unexpected frame (505)") // ErrResourceError wraps AMQP reply code 506 (resource-error). Connection-level. Permanent by default. // Resource errors cover both transient (disk pressure) and permanent (FD exhaustion) // conditions; retrying blindly amplifies pressure. Callers that know their workload // can re-classify by wrapping with ErrTransient explicitly. ErrResourceError = errors.New("warren: resource error (506)") // ErrNotAllowed wraps AMQP reply code 530 (not-allowed). Connection-level. Permanent. ErrNotAllowed = errors.New("warren: not allowed (530)") // ErrNotImplemented wraps AMQP reply code 540 (not-implemented). Connection-level. Permanent. ErrNotImplemented = errors.New("warren: not implemented (540)") // ErrInternalError wraps AMQP reply code 541 (internal-error). Connection-level. Transient. ErrInternalError = errors.New("warren: internal error (541)") // ErrTransient is a sentinel that callers can wrap around any error to mark it as retryable. // IsTransient returns true for any error in the chain that wraps ErrTransient. ErrTransient = errors.New("warren: transient error") // ErrPermanent is a sentinel that callers can wrap around any error to mark it as non-retryable. // IsPermanent returns true for any error in the chain that wraps ErrPermanent. ErrPermanent = errors.New("warren: permanent error") )
Functions ¶
func AMQPCode ¶
AMQPCode returns the AMQP reply code embedded in err (if any) and true on success. Returns (0, false) otherwise.
Recognised codes:
- Channel/connection close codes: 311, 320, 402-406, 501-506, 530, 540, 541.
- basic.return codes: 312 (NO_ROUTE), 313 (NO_CONSUMERS). These are NOT channel-close codes; the library surfaces them by wrapping ErrUnroutable with an internal codeError carrying the originating code.
func DefaultConnectionName ¶
func DefaultConnectionName() string
DefaultConnectionName returns the default connection name in the format "<binary>-<hostname>-<pid>". It is called by Dial when WithConnectionName is not provided. Exported so tests can verify the format without dialling.
func IsPermanent ¶
IsPermanent reports whether err is classified as non-retryable. True for: ErrPermanent wraps; ErrTopologyRedeclareFailed; ErrMessageTooLarge; AMQP codes 311, 402, 403, 404, 405, 406, 501, 502, 503, 505, 506, 530, 540.
func IsTransient ¶
IsTransient reports whether err is classified as retryable. True for: ErrTransient wraps; ErrChannelPoolExhausted; ErrPublishNacked; ErrConnectionBlocked; ErrConfirmTimeout; ErrChannelClosed; ErrReconnecting; ErrRateLimited; AMQP codes 320, 504, 541.
Note on ErrContentTooLarge (311): NOT transient. A payload that exceeds frame-max will fail on every retry unchanged — retrying it burns connections without any chance of success.
Note on ErrResourceError (506): NOT transient by default — see its godoc.
Note on context.Canceled: NEVER transient, even when the error also wraps a transient sentinel (e.g. ErrChannelPoolExhausted observed mid-cancellation). An upstream request cancellation will fail identically on every retry, so a PublishRetry would burn connections without any chance of success (T54). context.DeadlineExceeded is deliberately NOT special-cased — a timeout may succeed on a subsequent attempt.
Types ¶
type Batch ¶
type Batch[M any] struct { // contains filtered or unexported fields }
Batch holds a set of decoded messages accumulated before being dispatched to a BatchHandler. The framework emits a single acknowledgement frame covering all messages in the batch (via AMQP multiple=true) after the handler returns.
Auto-verdict semantics ¶
After the handler returns the framework checks whether the handler (or any delivery within the batch) already issued an ack/nack. If not, it applies the auto-verdict:
- nil return → single basic.ack(multiple=true) on the highest delivery-tag
- ErrRequeue-wrapped error → single basic.nack(multiple=true, requeue=true)
- any other error → single basic.nack(multiple=true, requeue=false)
If the handler calls Batch.Ack, Batch.Nack, or Delivery.Ack/Nack from Deliveries(), the auto-verdict is suppressed (idempotent guard).
Tests fabricate a fake batch with NewBatchFixture.
func NewBatchFixture ¶
func NewBatchFixture[M any](f BatchFixture[M]) *Batch[M]
NewBatchFixture builds a *Batch[M] from f for unit tests. Each entry is constructed with NewDeliveryFixture, so the same acknowledgement caveat applies: the batch's Ack/Nack do not reach a broker.
func (*Batch[M]) Ack ¶
Ack acknowledges all messages in the batch with a single AMQP basic.ack (multiple=true) on the highest delivery-tag. Subsequent calls to Ack, Nack, or the auto-verdict are no-ops (idempotent guard).
func (*Batch[M]) Deliveries ¶
Deliveries returns the slice of *Delivery[M] for per-message inspection or manual acknowledgement. Calling Ack or Nack on any returned delivery suppresses the batch-level auto-verdict (idempotent guard).
func (*Batch[M]) Messages ¶
func (b *Batch[M]) Messages() []M
Messages returns the decoded payloads for all messages in the batch.
func (*Batch[M]) Nack ¶
Nack negatively acknowledges all messages in the batch with a single AMQP basic.nack (multiple=true) on the highest delivery-tag. requeue=true re-queues all messages; requeue=false routes them to the DLX (or drops them). Subsequent calls to Ack, Nack, or the auto-verdict are no-ops.
type BatchConsumer ¶
type BatchConsumer[M any] struct { // contains filtered or unexported fields }
BatchConsumer consumes AMQP messages from a single queue in batches, decoding each payload to M via the configured codec, and dispatching accumulated groups to a BatchHandler[M].
Batches are flushed when Size messages have accumulated or when the FlushAfter timer fires (whichever comes first). Each batch is dispatched sequentially; run multiple BatchConsumer[M] instances for parallelism.
Use BatchConsumerFor[M](conn) to build a batch consumer.
func (*BatchConsumer[M]) Close ¶
func (c *BatchConsumer[M]) Close(_ context.Context) error
Close signals the batch consumer to stop accepting new deliveries.
func (*BatchConsumer[M]) Consume ¶
func (c *BatchConsumer[M]) Consume(ctx context.Context, h BatchHandler[M]) error
Consume starts accumulating messages from the configured queue and dispatching batches to h. It blocks until ctx is cancelled.
Cancelling ctx flushes any pending batch before returning; set HandlerTimeout to bound shutdown latency when batch handlers may block indefinitely.
May only be called once per consumer; create a new consumer via Build() to restart.
type BatchConsumerBuilder ¶
type BatchConsumerBuilder[M any] struct { // contains filtered or unexported fields }
BatchConsumerBuilder configures and builds a BatchConsumer[M].
All option methods follow a last-wins policy: calling the same method twice keeps only the final value.
func BatchConsumerFor ¶
func BatchConsumerFor[M any](conn *Connection) *BatchConsumerBuilder[M]
BatchConsumerFor returns a builder for a BatchConsumer[M] tied to conn.
func (*BatchConsumerBuilder[M]) Args ¶
func (b *BatchConsumerBuilder[M]) Args(args Headers) *BatchConsumerBuilder[M]
Args sets extra arguments forwarded in the basic.consume frame. When Priority is also set, the typed x-priority value is layered on top (Priority wins).
func (*BatchConsumerBuilder[M]) Build ¶
func (b *BatchConsumerBuilder[M]) Build() (*BatchConsumer[M], error)
Build constructs and returns a BatchConsumer[M]. Returns an error if the builder state is invalid.
func (*BatchConsumerBuilder[M]) ChannelQoS ¶
func (b *BatchConsumerBuilder[M]) ChannelQoS() *BatchConsumerBuilder[M]
ChannelQoS applies QoS per channel (global=false) rather than per consumer.
func (*BatchConsumerBuilder[M]) Codec ¶
func (b *BatchConsumerBuilder[M]) Codec(c codec.Codec) *BatchConsumerBuilder[M]
Codec sets the message codec. Default: JSON (lax).
func (*BatchConsumerBuilder[M]) Exclusive ¶
func (b *BatchConsumerBuilder[M]) Exclusive() *BatchConsumerBuilder[M]
Exclusive requests exclusive consumer access to the queue (the basic.consume exclusive flag). While set, the broker refuses any other consumer on the same queue with ACCESS_REFUSED (surfaced as ErrAccessRefused).
func (*BatchConsumerBuilder[M]) FlushAfter ¶
func (b *BatchConsumerBuilder[M]) FlushAfter(d time.Duration) *BatchConsumerBuilder[M]
FlushAfter sets a time-based flush trigger. When the first message of a new batch arrives the timer starts; when it fires the batch is dispatched even if fewer than Size messages have accumulated. Default: 0 (no timer-based flush).
func (*BatchConsumerBuilder[M]) HandlerTimeout ¶
func (b *BatchConsumerBuilder[M]) HandlerTimeout(d time.Duration) *BatchConsumerBuilder[M]
HandlerTimeout sets a per-batch ctx deadline. Zero (default) means no deadline. When the deadline expires the handler ctx is cancelled and the batch verdict is determined by HandlerTimeoutVerdict (default: TimeoutNackNoRequeue).
func (*BatchConsumerBuilder[M]) HandlerTimeoutVerdict ¶
func (b *BatchConsumerBuilder[M]) HandlerTimeoutVerdict(v TimeoutVerdict) *BatchConsumerBuilder[M]
HandlerTimeoutVerdict sets the ack/nack action when HandlerTimeout fires. Default: TimeoutNackNoRequeue (message goes to DLX or is dropped).
func (*BatchConsumerBuilder[M]) MaxRedeliveries ¶
func (b *BatchConsumerBuilder[M]) MaxRedeliveries(n int) *BatchConsumerBuilder[M]
MaxRedeliveries caps the number of times a message can be redelivered. Default 0 = unbounded. Counter B increments per message when the whole batch verdict is Nack(requeue=true).
func (*BatchConsumerBuilder[M]) Metrics ¶
func (b *BatchConsumerBuilder[M]) Metrics(cm metrics.ConsumerMetrics) *BatchConsumerBuilder[M]
Metrics sets the ConsumerMetrics recorder. Default: NoOp.
func (*BatchConsumerBuilder[M]) OnCancel ¶
func (b *BatchConsumerBuilder[M]) OnCancel(fn func(reason string)) *BatchConsumerBuilder[M]
OnCancel registers a callback invoked when the broker cancels the consumer via basic.cancel. The reason is the cancelled consumer's tag (the only datum the frame carries). After OnCancel fires, Consume returns ErrConsumerCancelled; the library does not auto-redeclare the queue. When unset, a warning is logged.
func (*BatchConsumerBuilder[M]) Prefetch ¶
func (b *BatchConsumerBuilder[M]) Prefetch(count uint16) *BatchConsumerBuilder[M]
Prefetch sets the per-channel prefetch count (basic.qos count). Default: 64.
func (*BatchConsumerBuilder[M]) PrefetchBytes ¶
func (b *BatchConsumerBuilder[M]) PrefetchBytes(_ uint) *BatchConsumerBuilder[M]
PrefetchBytes is a no-op on RabbitMQ; preserved for AMQP 0-9-1 protocol parity.
func (*BatchConsumerBuilder[M]) Priority ¶
func (b *BatchConsumerBuilder[M]) Priority(p int) *BatchConsumerBuilder[M]
Priority sets the x-priority consumer argument.
func (*BatchConsumerBuilder[M]) Queue ¶
func (b *BatchConsumerBuilder[M]) Queue(name string) *BatchConsumerBuilder[M]
Queue sets the AMQP queue name to consume from.
func (*BatchConsumerBuilder[M]) Size ¶
func (b *BatchConsumerBuilder[M]) Size(n uint) *BatchConsumerBuilder[M]
Size sets the maximum number of messages accumulated before a batch is flushed. Default: 100. A flush also fires if FlushAfter elapses before Size is reached.
func (*BatchConsumerBuilder[M]) Tag ¶
func (b *BatchConsumerBuilder[M]) Tag(consumerTag string) *BatchConsumerBuilder[M]
Tag sets the consumer tag. Default: auto-generated "ctag-<uuidv7>" at Build time.
func (*BatchConsumerBuilder[M]) TopologyHint ¶
func (b *BatchConsumerBuilder[M]) TopologyHint(q Queue) *BatchConsumerBuilder[M]
TopologyHint provides queue metadata that modifies counter B behaviour.
func (*BatchConsumerBuilder[M]) Tracer ¶
func (b *BatchConsumerBuilder[M]) Tracer(t otel.Tracer) *BatchConsumerBuilder[M]
Tracer sets the OTel tracer for consume spans.
func (*BatchConsumerBuilder[M]) WithoutMetrics ¶
func (b *BatchConsumerBuilder[M]) WithoutMetrics() *BatchConsumerBuilder[M]
WithoutMetrics disables all consumer metrics (last-wins against Metrics).
type BatchFixture ¶
type BatchFixture[M any] struct { // Deliveries are the per-message fixtures composing the batch, in order. Deliveries []DeliveryFixture[M] // contains filtered or unexported fields }
BatchFixture is the keyed-literal input to NewBatchFixture. It fabricates a Batch from a slice of per-message fixtures, in order.
type BatchHandler ¶
BatchHandler is the function signature for batch message handlers.
Return nil to ack the whole batch (single basic.ack, multiple=true). Return a wrapped ErrRequeue to nack with requeue=true. Return any other error to nack without requeue (DLX-bound).
Manual acking via Batch.Ack, Batch.Nack, or individual Delivery.Ack/Nack suppresses the auto-verdict: the framework will not emit a second acknowledgement frame.
type Binding ¶
type Binding struct {
Exchange string
Queue string
RoutingKey string
// NoWait sends the bind without waiting for the broker's reply. This
// downgrades mismatch detection to asynchronous: Declare returns nil even
// on a conflicting bind, and the broker reports the conflict (e.g.
// ErrPreconditionFailed) out-of-band on a channel Declare has already
// closed, so it is generally not observable by the caller. Leave
// NoWait=false if you rely on Declare surfacing ErrTopologyMismatch.
NoWait bool
Args map[string]any
}
Binding declares an AMQP queue binding.
type Caller ¶
type Caller[Req, Resp any] struct { // contains filtered or unexported fields }
Caller performs synchronous request/reply RPC over AMQP. Req is the request payload type, Resp the response payload type; both are encoded/decoded with the configured codec.
Channel ownership (SPEC §6.7). Because the direct reply-to pseudo-queue is channel-scoped — replies are delivered only on the channel that issued the basic.consume — a Caller does NOT use the rotating publisher channel pool. It holds one dedicated channel (pinned to a consumer-role TCP connection, like a Consumer) that both consumes amq.rabbitmq.reply-to and publishes the requests, so the reply routes back to it. Concurrent Call invocations share that channel and are demultiplexed by CorrelationID. If the channel closes (reconnect), in-flight calls resolve with ErrChannelClosed and the next Call transparently reopens and re-subscribes.
Use CallerFor[Req, Resp](conn) to build a Caller.
func (*Caller[Req, Resp]) Call ¶
Call publishes req and blocks until the matching reply arrives, the ctx deadline fires (ErrCallTimeout), or the dedicated channel closes (ErrChannelClosed). It auto-stamps a fresh CorrelationID and the reply address on the request, encodes req with the configured codec, and decodes the reply body into Resp.
At-least-once: a Replier may send a reply more than once (it acks the request only after the reply confirms; a crash in that window causes a redelivery and a second reply). Treat responses as at-least-once and dedupe by CorrelationID if your handler is not naturally idempotent.
type CallerBuilder ¶
type CallerBuilder[Req, Resp any] struct { // contains filtered or unexported fields }
CallerBuilder configures and builds a Caller[Req, Resp].
All option methods follow a last-wins policy: calling the same method twice keeps only the final value.
func CallerFor ¶
func CallerFor[Req, Resp any](conn *Connection) *CallerBuilder[Req, Resp]
CallerFor returns a builder for a Caller[Req, Resp] tied to conn.
func (*CallerBuilder[Req, Resp]) Build ¶
func (b *CallerBuilder[Req, Resp]) Build() (*Caller[Req, Resp], error)
Build constructs and returns a Caller[Req, Resp]. It pins the Caller to a consumer-role TCP connection (by stable hash of a generated caller id, like a Consumer) and validates the option combination. Returns ErrInvalidOptions on an invalid configuration.
func (*CallerBuilder[Req, Resp]) Codec ¶
func (b *CallerBuilder[Req, Resp]) Codec(c codec.Codec) *CallerBuilder[Req, Resp]
Codec sets the message codec used to encode requests and decode replies. Default: JSON (lax — accepts unknown fields per Postel's Law).
func (*CallerBuilder[Req, Resp]) Exchange ¶
func (b *CallerBuilder[Req, Resp]) Exchange(name string) *CallerBuilder[Req, Resp]
Exchange sets the AMQP exchange the request is published to. Default: "" (the default exchange, which routes by queue name via RoutingKey).
func (*CallerBuilder[Req, Resp]) Prefetch ¶
func (b *CallerBuilder[Req, Resp]) Prefetch(count uint16) *CallerBuilder[Req, Resp]
Prefetch sets the basic.qos prefetch count on the reply consumer. It is only honoured with UseExclusiveReplyQueue: RabbitMQ rejects basic.qos on the direct reply-to pseudo-queue, so Build returns ErrInvalidOptions if Prefetch is set without UseExclusiveReplyQueue.
func (*CallerBuilder[Req, Resp]) RoutingKey ¶
func (b *CallerBuilder[Req, Resp]) RoutingKey(rk string) *CallerBuilder[Req, Resp]
RoutingKey sets the routing key every request is published with. For the common case (default exchange) this is the request queue name the Replier consumes.
func (*CallerBuilder[Req, Resp]) UseExclusiveReplyQueue ¶
func (b *CallerBuilder[Req, Resp]) UseExclusiveReplyQueue() *CallerBuilder[Req, Resp]
UseExclusiveReplyQueue switches from the channel-scoped direct reply-to pseudo-queue to a real exclusive, auto-delete reply queue declared per Caller, with regular ack semantics. It costs one extra declare per Caller but re-enables Prefetch and survives more failure modes. Default: direct reply-to.
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection manages a pool of supervised AMQP TCP connections split by role: publisher connections and consumer connections.
Each TCP socket has its own reconnect supervisor (exponential backoff), synchronous reconnect barrier (topology + consumer state fully restored before traffic resumes), and degraded-state machine (persistent topology failures).
Default pool sizes: 2 publisher connections, 2 consumer connections. Configure with WithPublisherConnections / WithConsumerConnections.
func Dial ¶
func Dial(ctx context.Context, options ...Option) (*Connection, error)
Dial establishes a supervised pool of AMQP connections and returns the Connection. It opens WithPublisherConnections + WithConsumerConnections TCP sockets (default 2+2), validates options, and attempts each connection with the configured backoff policy.
Partial-pool-connect policy (T67): Dial succeeds once at least ONE connection per role connects; sockets that fail their initial connect are brought up by their supervisor under the reconnect backoff, and the reduced-capacity boot is surfaced via connection_degraded_total{reason="boot_reduced_capacity"} plus a warning log. Dial fails only when an entire role gets zero connections, or when ctx is cancelled.
Validation errors (ErrInvalidOptions) are returned synchronously. Network errors cause Dial to retry up to the configured Retries limit per socket; when an entire role is exhausted (or ctx cancelled), the last network error is returned.
func (*Connection) AuthenticatedUser ¶
func (c *Connection) AuthenticatedUser() string
AuthenticatedUser returns the identity authenticated during Dial.
For SASLPlain this is the username from WithAuth. For SASLExternal it is the Common Name of the first client certificate. The value is set before Dial returns and does not change.
func (*Connection) Close ¶
func (c *Connection) Close(ctx context.Context) error
Close drains in-flight work and shuts down all TCP connections in the pool. Returns ErrAlreadyClosed if called more than once.
func (*Connection) ConConnAt ¶
func (c *Connection) ConConnAt(idx int) *managedConn
ConConnAt returns the consumer-role managed connection at the given index. Used by Consumer (T18) to pin subscriptions.
func (*Connection) ForceReconnect ¶
func (c *Connection) ForceReconnect() error
ForceReconnect triggers a manual reconnect cycle on every socket without restarting the process. Intended as an operator escape hatch for recovering from a degraded state. Returns ErrAlreadyClosed if the connection is shut down.
func (*Connection) Health ¶
func (c *Connection) Health(ctx context.Context) error
Health verifies broker responsiveness by opening and immediately closing a temporary AMQP channel on the first publisher connection (SPEC §6.1).
It returns, in precedence order: ErrAlreadyClosed if the Connection is shut down; ErrNotConnected if the connection pool is empty; ctx.Err() if ctx is already done; ErrReconnecting if a reconnect barrier is active; ErrTopologyRedeclareFailed if the connection is in a degraded topology state; ErrNotConnected if the socket is not yet established; otherwise any error from the channel open/close round-trip, classified through the AMQP reply-code sentinels (errors.Is against ErrAccessRefused, ErrNotFound, etc. — SPEC §6.3).
func (*Connection) NumConConns ¶
func (c *Connection) NumConConns() int
NumConConns returns the number of consumer TCP connections in the pool.
func (*Connection) NumPubConns ¶
func (c *Connection) NumPubConns() int
NumPubConns returns the number of publisher TCP connections in the pool.
func (*Connection) PubConnAt ¶
func (c *Connection) PubConnAt(idx int) *managedConn
PubConnAt returns the publisher-role managed connection at the given index. Used by Publisher (T12) to acquire channels.
type Consumer ¶
type Consumer[M any] struct { // contains filtered or unexported fields }
Consumer receives AMQP messages from a single queue, decodes each payload to M via the configured codec, and dispatches to a Handler[M] or RawHandler[M].
Use ConsumerFor[M](conn) to build a consumer. Each consumer may only be started once; create a new consumer via Build() to restart.
func (*Consumer[M]) Consume ¶
Consume starts consuming from the configured queue, decoding each message and dispatching to h. It blocks until ctx is cancelled.
The consumer automatically acks (nil return), nacks without requeue (any non-ErrRequeue error), or nacks with requeue (errors.Is(err, ErrRequeue)). May only be called once per consumer; create a new consumer to restart. Cancelling ctx waits for all in-flight handlers to return; set HandlerTimeout to bound shutdown latency when handlers may block indefinitely.
func (*Consumer[M]) ConsumeRaw ¶
func (c *Consumer[M]) ConsumeRaw(ctx context.Context, h RawHandler[M]) error
ConsumeRaw starts consuming, passing the full Delivery envelope to h. The raw handler is responsible for calling d.Ack(), d.Nack(), or d.AckIf() to acknowledge the delivery. The consumer does NOT auto-ack on handler return.
Exception — HandlerTimeout: if HandlerTimeout is configured and the deadline fires, the consumer still issues a Nack automatically to prevent unacknowledged deliveries from accumulating. The handler is free to call Ack/Nack before the deadline; if it does so, the library's Nack on timeout will be a no-op (broker de-duplicates).
Use ConsumeRaw to access envelope fields (Headers, Redelivered, DeathCount) or to implement custom ack strategies. For most workloads, Consume is simpler.
May only be called once per consumer; create a new consumer to restart. Cancelling ctx waits for all in-flight handlers to return; set HandlerTimeout to bound shutdown latency when handlers may block indefinitely.
func (*Consumer[M]) Health ¶
func (c *Consumer[M]) Health(ctx context.Context) (*ConsumerHealth, error)
Health verifies the consumer's pinned connection and, when healthy, returns a snapshot of the consumer's runtime state. On a connection error it returns (nil, err): the connection liveness check is the gate, and a zeroed snapshot alongside an error would be misleading (T53).
func (*Consumer[M]) Pause ¶
Pause issues a local basic.cancel so the broker stops delivering to this consumer, without closing the channel — in-flight handlers and their acks on that channel are unaffected, and the broker holds subsequent messages on the queue. Use it for graceful draining (e.g. a Kubernetes preStop hook) ahead of Close. Pause is idempotent: a second call while paused is a no-op. It errors before Consume/ConsumeRaw has started or after Close. Resume undoes it (T53).
func (*Consumer[M]) Resume ¶
Resume re-issues basic.consume on the consumer's existing channel after a Pause, handing the running loop a fresh subscription. It is idempotent: a call while not paused is a no-op. It errors before Consume/ConsumeRaw has started or after Close. The ctx scopes only this call (its cancellation aborts the re-subscribe handshake); the resulting subscription is bound to the consumer lifetime — the ctx passed to Consume/ConsumeRaw — not to this ctx, so a request-scoped Resume ctx cannot silently stop delivery (T53).
If the ctx is cancelled mid-handshake (after the basic.consume is issued but before the loop adopts it), Resume rolls the subscription back with a local basic.cancel and leaves the consumer paused, so the call is a clean no-op-retry rather than leaving an orphaned broker subscription.
type ConsumerBuilder ¶
type ConsumerBuilder[M any] struct { // contains filtered or unexported fields }
ConsumerBuilder configures and builds a Consumer[M].
All option methods follow a last-wins policy: calling the same method twice keeps only the final value.
func ConsumerFor ¶
func ConsumerFor[M any](conn *Connection) *ConsumerBuilder[M]
ConsumerFor returns a builder for a Consumer[M] tied to conn.
func (*ConsumerBuilder[M]) AllowMissingDLX ¶
func (b *ConsumerBuilder[M]) AllowMissingDLX() *ConsumerBuilder[M]
AllowMissingDLX opts out of the Topology DLX-presence warning, acknowledging that a poison drop on this queue (after MaxRedeliveries) is intentional and silent (still surfaced via consumer_drop_no_dlx_total). Use it when the source queue is intentionally declared without a dead-letter exchange.
func (*ConsumerBuilder[M]) Args ¶
func (b *ConsumerBuilder[M]) Args(args Headers) *ConsumerBuilder[M]
Args sets extra arguments forwarded in the basic.consume frame (the consumer argument table, e.g. broker-specific consumer options). When Priority is also set, the typed x-priority value is layered on top, so Priority wins over any x-priority slipped through Args.
func (*ConsumerBuilder[M]) AutoAck ¶
func (b *ConsumerBuilder[M]) AutoAck() *ConsumerBuilder[M]
AutoAck enables the AMQP no-ack flag on basic.consume, which tells the broker to consider every delivery already acknowledged before the client sees it. This is a real AMQP feature, exposed for protocol fidelity, but it changes critical semantics:
- Handler error semantics are bypassed. nil/error/ErrRequeue/ErrPoison returns all become no-ops. A handler that panics or errors silently drops the message.
- No redelivery on consumer crash. If the consumer dies mid-handle, the broker has already removed the message; it will not be redelivered to another consumer. Use only when at-most-once delivery is acceptable.
- No backpressure via prefetch. With AutoAck, prefetch loses its ack-gating effect. The broker streams as fast as the channel will carry, and slow handlers can OOM the consumer.
- DLX / MaxRedeliveries do not engage. Both depend on Nacks the client never sends.
Use AutoAck only for genuinely fire-and-forget streams (e.g., high-volume telemetry where occasional drops are acceptable). For everything else, leave it off and let the error-driven semantics work.
func (*ConsumerBuilder[M]) Build ¶
func (b *ConsumerBuilder[M]) Build() (*Consumer[M], error)
Build constructs and returns a Consumer[M]. Returns an error if the builder state is invalid.
func (*ConsumerBuilder[M]) ChannelQoS ¶
func (b *ConsumerBuilder[M]) ChannelQoS() *ConsumerBuilder[M]
ChannelQoS applies QoS at channel scope (basic.qos global=true) rather than per consumer. This is the RabbitMQ-recommended setting; the broker ignores the per-consumer distinction and applies prefetch at channel scope in any case.
func (*ConsumerBuilder[M]) Codec ¶
func (b *ConsumerBuilder[M]) Codec(c codec.Codec) *ConsumerBuilder[M]
Codec sets the message codec. Default: JSON (strict).
func (*ConsumerBuilder[M]) Concurrency ¶
func (b *ConsumerBuilder[M]) Concurrency(n uint) *ConsumerBuilder[M]
Concurrency sets the number of handler goroutines run in parallel. Default: 1.
func (*ConsumerBuilder[M]) Exclusive ¶
func (b *ConsumerBuilder[M]) Exclusive() *ConsumerBuilder[M]
Exclusive requests exclusive consumer access to the queue (the basic.consume exclusive flag). While an exclusive consumer is attached, the broker refuses any other consumer on the same queue with ACCESS_REFUSED (surfaced as ErrAccessRefused). Use this for active/standby topologies where exactly one worker must hold the queue.
func (*ConsumerBuilder[M]) HandlerTimeout ¶
func (b *ConsumerBuilder[M]) HandlerTimeout(d time.Duration) *ConsumerBuilder[M]
HandlerTimeout sets a per-message ctx deadline. Zero (default) means no deadline. When the deadline expires the handler ctx is cancelled and HandlerTimeoutVerdict decides whether to nack with or without requeue.
func (*ConsumerBuilder[M]) HandlerTimeoutVerdict ¶
func (b *ConsumerBuilder[M]) HandlerTimeoutVerdict(v TimeoutVerdict) *ConsumerBuilder[M]
HandlerTimeoutVerdict sets the ack/nack action when HandlerTimeout fires. Default: TimeoutNackNoRequeue (message goes to DLX or is dropped).
func (*ConsumerBuilder[M]) MaxInFlightBytes ¶
func (b *ConsumerBuilder[M]) MaxInFlightBytes(n int64) *ConsumerBuilder[M]
MaxInFlightBytes caps the sum of in-flight message body sizes (the local memory guardrail, T50). Once concurrently-dispatched handlers hold n bytes of payload, the consumer stops pulling new deliveries — pausing prefetch refill — until a handler returns and frees its bytes. This bounds heap use where prefetch × concurrency × body-size would otherwise risk an OOM, independent of the message-count backpressure that Prefetch provides.
n <= 0 (the default) disables the guardrail. n is a soft ceiling, not a hard reject: a single message larger than n is dispatched alone when nothing else is in flight (rather than deadlocking forever), so peak resident payload memory can briefly reach max(n, largest single body) — size n with that headroom in mind. The current reserved total is exported as the consumer_inflight_bytes{queue} gauge.
func (*ConsumerBuilder[M]) MaxRedeliveries ¶
func (b *ConsumerBuilder[M]) MaxRedeliveries(n int) *ConsumerBuilder[M]
MaxRedeliveries caps the number of times a message can be redelivered before it is dead-lettered. Default 0 = unbounded.
Two complementary counters enforce the ceiling:
Counter A (cross-process): reads x-death headers; bounds DLX-bounce loops that survive consumer restarts. Fires BEFORE the handler is called. With MaxRedeliveries(n), counter A short-circuits when death_count >= n, so the handler is invoked for death_count = 0, 1, …, n-1 (exactly n times).
Counter B (in-process, process-local): counts consecutive ErrRequeue returns for the same MessageID on the current channel. Resets on channel close. Fires AFTER the handler returns ErrRequeue for the (n+1)-th time, rewriting the verdict to Nack(false). The handler is therefore called n+1 times before counter B dead-letters the message (one more than counter A, because the final ErrRequeue return triggers the rewrite after the call).
Example with MaxRedeliveries(3):
Counter A: handler called for death_count=0,1,2; short-circuit on death_count=3. Counter B: handler called 4 times (3 ErrRequeue stored, fires on 4th return).
When the source queue is a quorum queue with DeliveryLimit > 0 (declared via TopologyHint), counter B is auto-disabled: the broker is authoritative. Counter A still runs as a safety net. See TopologyHint.
func (*ConsumerBuilder[M]) Metrics ¶
func (b *ConsumerBuilder[M]) Metrics(cm metrics.ConsumerMetrics) *ConsumerBuilder[M]
Metrics sets the ConsumerMetrics recorder. Default: NoOp.
func (*ConsumerBuilder[M]) OnCancel ¶
func (b *ConsumerBuilder[M]) OnCancel(fn func(reason string)) *ConsumerBuilder[M]
OnCancel registers a callback invoked when the broker cancels the consumer via basic.cancel (e.g. the queue was deleted or an exclusive lock was revoked). The reason is the cancelled consumer's tag — the only datum the AMQP basic.cancel frame carries; it is not a free-form description. After OnCancel fires, Consume returns ErrConsumerCancelled (the library does not auto-redeclare the queue). When OnCancel is unset, the library logs a warning instead. Always wire OnCancel in production code: a silently dying consumer is worse than a leaked deletion.
func (*ConsumerBuilder[M]) Prefetch ¶
func (b *ConsumerBuilder[M]) Prefetch(count uint16) *ConsumerBuilder[M]
Prefetch sets the per-channel prefetch count (basic.qos count). Default: 64.
func (*ConsumerBuilder[M]) PrefetchBytes ¶
func (b *ConsumerBuilder[M]) PrefetchBytes(_ uint) *ConsumerBuilder[M]
PrefetchBytes is a no-op on RabbitMQ; preserved for AMQP 0-9-1 protocol parity.
func (*ConsumerBuilder[M]) Priority ¶
func (b *ConsumerBuilder[M]) Priority(p int) *ConsumerBuilder[M]
Priority sets the x-priority consumer argument. Higher values are preferred when multiple consumers are attached to the same queue (active/standby topology).
func (*ConsumerBuilder[M]) Queue ¶
func (b *ConsumerBuilder[M]) Queue(name string) *ConsumerBuilder[M]
Queue sets the AMQP queue name to consume from.
func (*ConsumerBuilder[M]) Tag ¶
func (b *ConsumerBuilder[M]) Tag(consumerTag string) *ConsumerBuilder[M]
Tag sets the consumer tag. Default: auto-generated "ctag-<uuidv7>" at Build time.
func (*ConsumerBuilder[M]) Topology ¶
func (b *ConsumerBuilder[M]) Topology(t *Topology) *ConsumerBuilder[M]
Topology wires the declared Topology so Build can validate that the source queue has a dead-letter exchange when MaxRedeliveries > 0. Without a DLX, a poison message that exceeds MaxRedeliveries is Nack(false)'d and silently dropped by the broker; Build warns (and consumer_drop_no_dlx_total counts the drops) unless AllowMissingDLX is set — parity with the Replier check (T65).
func (*ConsumerBuilder[M]) TopologyHint ¶
func (b *ConsumerBuilder[M]) TopologyHint(q Queue) *ConsumerBuilder[M]
TopologyHint provides queue metadata that modifies counter B behaviour. Currently used to detect quorum queues with DeliveryLimit > 0, which disable the in-process counter B (broker handles redelivery bounding via x-delivery-limit).
Call TopologyHint after MaxRedeliveries so the carve-out takes effect.
func (*ConsumerBuilder[M]) Tracer ¶
func (b *ConsumerBuilder[M]) Tracer(t otel.Tracer) *ConsumerBuilder[M]
Tracer sets the OTel tracer for consume spans.
func (*ConsumerBuilder[M]) WithDedupe ¶
func (b *ConsumerBuilder[M]) WithDedupe(store DedupeStore, ttl time.Duration) *ConsumerBuilder[M]
WithDedupe enables native consumer-side deduplication keyed by MessageID, backed by store (T55). It abstracts the manual dedupe pattern (SPEC §6.2.1) off the handler: before each delivery, the middleware asks store.Seen(id) and, on a hit, acks the message WITHOUT invoking the handler; after the handler returns nil (success), it calls store.Mark(id, ttl) so future redeliveries of the same MessageID are recognised. A handler error is never marked, so the message is reprocessed on redelivery.
Failure mode is fail-OPEN: if store.Seen or store.Mark returns an error, the middleware logs a warning and processes the message anyway, trading a possible duplicate for availability — consistent with the at-least-once contract. The store must therefore be treated as a best-effort cache, not a correctness gate; handlers with non-idempotent side-effects should still guard themselves.
Deliveries without a MessageID cannot be deduped and are passed straight to the handler. ttl is the retention window passed to store.Mark; size it to cover the maximum plausible duplicate gap (broker outage + reconnect + retry budget — 15 minutes suits most workloads, SPEC §6.2.1). Only the Consume path is wrapped; ConsumeRaw handlers manage their own acks and are unaffected. store==nil (the default) disables the middleware.
Seen runs before the handler under the per-delivery handler context, so a configured HandlerTimeout bounds it. Mark runs after a successful handler under a context DETACHED from that deadline (handler trace/span values are preserved, but cancellation and the handler deadline are replaced with a fixed grace bound), so a near-exhausted HandlerTimeout — or a shutdown that already cancelled the handler context — cannot silently skip recording the id and fail open to a future duplicate. The grace bound still caps Mark so a wedged store cannot block consumer shutdown. Keep store calls fast regardless of which side of the handler they run on.
func (*ConsumerBuilder[M]) WithQueueDepthSampler ¶
func (b *ConsumerBuilder[M]) WithQueueDepthSampler(interval time.Duration) *ConsumerBuilder[M]
WithQueueDepthSampler enables a background goroutine that periodically reads the broker-side message backlog for the consumer's source queue, and for its conventional "<queue>.dlq" dead-letter queue, via a passive queue declare — exporting the native queue_depth{queue} and dlq_depth{dlq} gauges. These are the leading "work is piling up" / "poison is accumulating" signals that the per-message handler metrics cannot show.
interval is the polling period; interval <= 0 (the default) disables the sampler. Each sample runs on its own short-lived channel per probe, so a passive declare on a missing queue — which the broker answers by closing the channel — never disturbs the delivery channel. The dlq_depth gauge is emitted only when "<queue>.dlq" actually exists (a 404 is skipped, not reported as zero); the source queue_depth is likewise skipped if the queue itself is gone. Sampling stops when the Consume / ConsumeRaw context is cancelled.
Size interval against broker load: each tick costs one or two lightweight queue.declare-passive round-trips on the consumer's connection. A few seconds is typical; sub-second polling on a large cluster adds avoidable broker load, so an interval below 100ms is clamped to a 100ms floor with a one-time warning. While a whole sample fails to reach the broker — the source queue is gone, or the socket is down mid-reconnect — the sampler backs off exponentially (capped at 30s, never below interval) and returns to interval on the first sample that emits a gauge, so a permanently-missing queue does not probe at full rate forever. The DLQ name follows warren's own DeadLetter convention ("<queue>.dlq"); a DLQ under a different name is not sampled.
The gauges hold their last sampled value while the consumer runs: a sample that cannot reach the broker (e.g. mid-reconnect) is skipped rather than zeroed. When the consumer stops, both series are removed from the registry, so a long-lived process that cycles consumers over distinct queue names does not accumulate stale frozen series. Alert on rate/derivative or pair with consumer liveness rather than reading a single point as "current".
func (*ConsumerBuilder[M]) WithoutMetrics ¶
func (b *ConsumerBuilder[M]) WithoutMetrics() *ConsumerBuilder[M]
WithoutMetrics disables all consumer metrics (last-wins against Metrics).
type ConsumerHealth ¶
type ConsumerHealth struct {
// Active is true when the consumer is started, its consume loop has not exited,
// it is not closed, and it is not paused — i.e. it is receiving and dispatching
// deliveries. It flips to false when the loop exits for any reason (ctx cancel,
// a broker basic.cancel, or a fatal subscribe error), so a probe wired to Active
// will not keep a silently-dead consumer in rotation.
Active bool
// Paused is true between Pause and Resume.
Paused bool
// LastDeliveryAt is the wall-clock time the most recent delivery was received
// from the broker; the zero Time if none has arrived yet. A LastDeliveryAt that
// stops advancing on a queue that should be busy is a liveness signal.
LastDeliveryAt time.Time
// InFlightHandlers is the number of handler invocations currently executing.
InFlightHandlers int
}
ConsumerHealth is a point-in-time snapshot of a consumer's runtime state, suitable for building Kubernetes liveness/readiness probes (T53). Health returns it only when the pinned connection is healthy; on a connection error Health returns (nil, err), since a snapshot would carry no meaningful state.
type DeadLetter ¶
type DeadLetter struct {
// Source is the name of the source queue that routes dead letters.
Source string
// Exchange is the name of the dead-letter exchange. Defaults to "<Source>.dlx".
Exchange string
// RoutingKey is the routing key for dead letters. Empty means the original key.
RoutingKey string
// TTL is a per-message TTL (x-message-ttl) applied to the source queue.
TTL time.Duration
// MaxLength is the max number of messages (x-max-length) on the source queue.
MaxLength int
// MaxLengthBytes is the max byte capacity (x-max-length-bytes).
MaxLengthBytes int
// Overflow controls what happens when the queue is full (x-overflow).
Overflow OverflowPolicy
// DLQMaxLength caps the auto-declared <Source>.dlq by message count
// (x-max-length). Zero applies defaultDLQMaxLength unless DLQUnbounded.
DLQMaxLength int
// DLQMessageTTL caps the auto-declared <Source>.dlq message lifetime
// (x-message-ttl) — the personal-data retention control (GDPR 5(1)(e) /
// LGPD Art. 16). Zero applies defaultDLQMessageTTL unless DLQUnbounded.
DLQMessageTTL time.Duration
// DLQOverflow sets x-overflow on the auto-declared DLQ. Empty defaults to
// OverflowDropHead (keep the most recent failures when full).
DLQOverflow OverflowPolicy
// DLQUnbounded opts the auto-declared <Source>.dlq OUT of all default bounds
// (no x-max-length, no x-message-ttl, no x-overflow). Use ONLY when an
// external retention policy manages the DLQ — an unbounded DLQ can fill disk
// and trip a broker-wide connection.blocked alarm (SRE-03 / ST-08).
DLQUnbounded bool
}
DeadLetter describes a dead-letter topology entry. Topology.Declare expands it into the required x-dead-letter-* queue args, a DLX exchange, and a DLQ during the in-memory pre-pass (Step 1), so the broker sees the args on the source queue's first declare and never needs a re-declare.
When the source queue is a quorum queue, Declare also injects x-dead-letter-strategy=at-least-once so messages are preserved during dead-lettering (SPEC §10 decision 52). Set x-dead-letter-strategy in the source Queue.Args to override this default.
at-least-once requires x-overflow=reject-publish; the broker silently accepts any overflow but does not honour at-least-once otherwise, so Declare couples the two client-side: Overflow left empty is auto-set to reject-publish (with a warning), and an Overflow of drop-head or reject-publish-dlx is rejected with ErrInvalidOptions. Note the cost: reject-publish blocks publishers (ErrPublishNacked / ErrConnectionBlocked) when the source queue is full, and at-least-once retains each dead-lettered message in SOURCE-QUEUE memory until the DLX consumer acknowledges it — an unconsumed DLX can therefore grow the source queue's memory footprint. Size the source queue (DeliveryLimit, TTL, MaxLength) and the DLQ, and keep a consumer attached to the DLX.
type DedupeStore ¶
type DedupeStore interface {
// Seen reports whether id was already recorded via Mark and is still within
// its retention window. A true result acks the delivery without invoking the
// handler.
Seen(ctx context.Context, id string) (bool, error)
// Mark records id as processed, retaining it for at least ttl. It is called
// only after the handler returns nil (success).
Mark(ctx context.Context, id string, ttl time.Duration) error
}
DedupeStore is the backing store for the WithDedupe consumer middleware (T55). Implementations may be an in-memory LRU, Redis, or any TTL cache; they must be safe for concurrent use.
The middleware fails OPEN: any error returned from Seen or Mark causes the message to be processed anyway (with a logged warning), so a store outage degrades to plain at-least-once rather than dropping or stalling deliveries. See ConsumerBuilder.WithDedupe and SPEC §6.2.1.
type Delivery ¶
type Delivery[M any] struct { // contains filtered or unexported fields }
Delivery wraps a broker-delivered message with its decoded payload [M].
Tests fabricate a fake delivery with NewDeliveryFixture.
func NewDeliveryFixture ¶
func NewDeliveryFixture[M any](f DeliveryFixture[M]) *Delivery[M]
NewDeliveryFixture builds a *Delivery[M] from f for unit tests. The returned delivery is not bound to a live channel, so Ack/Nack/AckIf return an error rather than reaching a broker; fixtures are for exercising Body, Headers, and the metadata/x-death accessors, not acknowledgement mechanics.
func (*Delivery[M]) Ack ¶
Ack acknowledges the delivery to the broker. Returns ErrAlreadyClosed if the owning consumer was shut down, ErrAlreadyResolved if a verdict (Ack/Nack/AckIf or a HandlerTimeout verdict) was already emitted for this delivery (a no-op — no second frame), ErrChannelClosed if the underlying channel closed before the ack reached the broker.
func (*Delivery[M]) AckIf ¶
AckIf applies the standard handler error-mapping semantics:
- nil → Ack
- errors.Is(err, ErrRequeue) → Nack(requeue=true)
- any other error → Nack(requeue=false)
func (*Delivery[M]) Body ¶
func (d *Delivery[M]) Body() *M
Body returns a pointer to the decoded message payload.
func (*Delivery[M]) CorrelationID ¶
CorrelationID returns the correlation identifier from the AMQP properties.
func (*Delivery[M]) DeathCount ¶
DeathCount returns the sum of x-death counts for reason ∈ {rejected, delivery-limit} matching the delivery's current queue. Returns 0 if the header is absent or malformed.
The count is scoped to the queue this consumer reads from. x-death entries are keyed on the queue where each death occurred, so on a dead-letter queue (where the entries are keyed on the source queue) this returns 0 — read the raw x-death header via Headers() for cross-queue death inspection. It is non-zero on a same-queue retry loop, where the message is dead-lettered from and routed back to this queue.
func (*Delivery[M]) DeathCountByReason ¶
DeathCountByReason returns the total x-death count for a specific reason string (e.g. "rejected", "expired", "maxlen", "delivery-limit") for the current queue. Reason separators are normalised, so the delivery-limit count resolves whether you pass the documented "delivery-limit" or the broker's raw "delivery_limit".
func (*Delivery[M]) DeathReasons ¶
DeathReasons returns the unique x-death reasons in declaration order for the current queue. Useful for custom redelivery policies that need all reasons.
func (*Delivery[M]) DeliveryTag ¶
DeliveryTag is the broker-assigned sequential identifier for this delivery.
func (*Delivery[M]) MessageID ¶
MessageID returns the application-level message identifier from the AMQP properties.
func (*Delivery[M]) Nack ¶
Nack negatively acknowledges the delivery. requeue=true re-queues the message; requeue=false routes it to the DLX (or drops it). Returns ErrAlreadyClosed if the owning consumer was shut down, ErrAlreadyResolved if a verdict was already emitted for this delivery (a no-op — no second frame), ErrChannelClosed if the underlying channel closed before the nack reached the broker.
func (*Delivery[M]) Redelivered ¶
Redelivered reports whether the broker has previously attempted to deliver this message.
type DeliveryFixture ¶
type DeliveryFixture[M any] struct { // Body is the decoded payload returned by Delivery[M].Body(). A nil Body // is allowed and mirrors a decode that produced no value. Body *M // Queue is the queue the delivery is attributed to; it scopes x-death // accounting (DeathCount, DeathCountByReason, DeathReasons). Queue string // Headers populates the AMQP header table (Delivery.Headers()), including // any "x-death" entries the fixture wants the Death* accessors to observe. Headers Headers // MessageID maps to the AMQP message-id property (Delivery.MessageID()). MessageID string // CorrelationID maps to the AMQP correlation-id property (Delivery.CorrelationID()). CorrelationID string // ContentType maps to the AMQP content-type property. ContentType string // Timestamp maps to the AMQP timestamp property (Delivery.Timestamp()). Timestamp time.Time // Redelivered sets the redelivered flag (Delivery.Redelivered()). Redelivered bool // DeliveryTag sets the broker delivery tag (Delivery.DeliveryTag()). DeliveryTag uint64 // contains filtered or unexported fields }
DeliveryFixture is the keyed-literal input to NewDeliveryFixture. It fabricates a Delivery for unit tests — including consumer/raw/batch handler tests, in this package or downstream — without a live broker (SPEC §10 decision 9, GA-09).
Only keyed literals compile from outside the package; the trailing guard field rejects positional literals so future fields are non-breaking.
type DeliveryMode ¶
type DeliveryMode uint8
DeliveryMode controls AMQP delivery persistence. The zero value is DeliveryModePersistent so a zero-valued Message[M] defaults to durable.
const ( // DeliveryModePersistent is the default; messages survive broker restarts. DeliveryModePersistent DeliveryMode = iota // DeliveryModeTransient messages are kept only in-memory and are lost on broker restart. DeliveryModeTransient )
type Exchange ¶
type Exchange struct {
Name string
Kind ExchangeKind
Durable bool
AutoDelete bool
Internal bool
// NoWait sends the declare without waiting for the broker's reply. This
// downgrades mismatch detection to asynchronous: Declare returns nil even
// on a conflicting redeclare, and the broker reports the conflict (e.g.
// ErrPreconditionFailed) out-of-band on a channel Declare has already
// closed, so it is generally not observable by the caller. Leave
// NoWait=false if you rely on Declare surfacing ErrTopologyMismatch.
NoWait bool
// AlternateExchange names the server-side catch-all exchange for messages
// this exchange cannot route (the broker's `alternate-exchange` argument).
// It is the platform-level unroutable safety net (T68 / EDA-01): a mis-routed
// publish WITHOUT Mandatory() vanishes silently, and per-publish discipline
// does not scale across many producers — the alternate exchange catches the
// unroutable message server-side regardless. The zero value (empty) preserves
// today's behaviour. Declare the named exchange (and bind a catch-all queue
// to it) in the same Topology. Set the field, not Args["alternate-exchange"].
AlternateExchange string
Args map[string]any
}
Exchange declares an AMQP exchange.
func DelayedTopic ¶
DelayedTopic returns the Exchange literal for a topic-routed delayed-message exchange: Kind=ExchangeDelayed with the x-delayed-type=topic argument the rabbitmq_delayed_message_exchange plugin requires. Declare it via a Topology and publish to it with Message[M].Delay set to schedule delivery.
Plugin requirement. ExchangeDelayed needs the rabbitmq_delayed_message_exchange plugin enabled on every broker node; declaring one against a broker without the plugin fails with a command-invalid channel error.
Durability caveat (load-bearing). The exchange is declared Durable so its definition survives a broker restart, but the plugin stores SCHEDULED messages in a node-local, non-replicated table. A publisher confirm means "accepted for scheduling", not "will be delivered": if the owning node fails before the delay elapses, the scheduled message is lost silently — even with durable topology and confirms on. This is the one path where a confirmed publish can still be lost. For delays that must survive node failure, prefer a durable (ideally quorum) queue with x-message-ttl plus a DLX (see Message.Delay).
type ExchangeBinding ¶
type ExchangeBinding struct {
// Source is the upstream exchange messages are published to / arrive at.
Source string
// Destination is the downstream exchange that receives matching messages.
Destination string
// RoutingKey filters which messages are forwarded (matched per Source's kind).
RoutingKey string
// NoWait skips the broker confirmation (see the NoWait caveat on Binding).
NoWait bool
Args map[string]any
}
ExchangeBinding declares an exchange-to-exchange binding (exchange.bind): messages routed to Source that match RoutingKey are forwarded to Destination. This enables layered ingest→per-domain fan-out without flattening the topology.
type ExchangeKind ¶
type ExchangeKind string
ExchangeKind is the AMQP exchange type string passed to exchange.declare.
const ( // ExchangeDirect routes messages to queues whose binding key matches the routing key exactly. ExchangeDirect ExchangeKind = "direct" // ExchangeFanout routes messages to all bound queues regardless of routing key. ExchangeFanout ExchangeKind = "fanout" // ExchangeTopic routes messages to queues whose binding key pattern matches the routing key. ExchangeTopic ExchangeKind = "topic" // ExchangeHeaders routes messages based on header attributes instead of routing key. ExchangeHeaders ExchangeKind = "headers" // ExchangeDelayed routes messages via the rabbitmq_delayed_message_exchange // plugin (requires the plugin; set Args["x-delayed-type"] to the underlying // routing kind). Durability caveat (load-bearing): the plugin stores scheduled // messages in a node-local, non-replicated table that is NOT a quorum/durable // queue, so a confirmed delayed publish can still be lost if the owning node // fails before the delay elapses — even with durable topology and confirms on. // For delays that must survive node failure, prefer a durable (ideally quorum) // queue with x-message-ttl plus a DLX. See Message.Delay and SPEC §6.5. ExchangeDelayed ExchangeKind = "x-delayed-message" )
type Handler ¶
Handler is the function signature for typed message handlers. Return nil to ack, ErrRequeue to nack with requeue, or any other error to nack without requeue.
type Headers ¶
Headers is an AMQP field-table. Values must be one of the types supported by the amqp091-go encoder: bool, byte, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time, map[string]any, []any, or nil. int and uint literals auto-coerce to int64/uint64. Any other Go type causes Publish to return ErrInvalidMessage.
type JitterStrategy ¶
type JitterStrategy int
JitterStrategy selects how NextBackoff perturbs the exponential delay to de-synchronize a fleet that failed together — the defence against the "thundering herd" that hammers a recovering broker when every client retries in lockstep. The zero value is JitterFull, the SRE-recommended default, so a RetryPolicy that does not set Jitter gets the strongest spreading for free.
const ( // JitterFull spreads each delay uniformly across the whole exponential // window: random(0, exp), clamped to [Min, Max]. This is the default (zero // value) and the AWS-recommended strategy ("Exponential Backoff And Jitter") // — two clients that failed at the same instant pick independent delays // across the entire window instead of clustering near one value, so a // recovering broker sees retries arrive smoothly rather than in a spike. JitterFull JitterStrategy = iota // JitterEqual keeps half the exponential delay and jitters the other half: // exp/2 + random(0, exp/2), clamped to [Min, Max]. A tighter spread than full // jitter — choose it when a guaranteed minimum progress per attempt matters // more than maximal de-correlation. JitterEqual // JitterNone disables jitter: NextBackoff returns the pure exponential delay, // deterministic for a given attempt. Intended for tests and reproductions; // avoid it in production, where a synchronized fleet retrying in lockstep can // stampede a recovering broker. JitterNone )
type Message ¶
type Message[M any] struct { Body *M // MessageID identifies the message for at-least-once dedupe. Left empty, it // defaults to a UUID v7 (RFC 9562) generated at publish time. It is // load-bearing: PublishRetry, the reconnect barrier, and confirm timeouts can // all redeliver, so consumers MUST dedupe by MessageID (SPEC §6.2.1). Do not // disable it to save the per-publish UUID generation. MessageID string CorrelationID string ReplyTo string Type string AppID string // UserID, when set, must equal the connection's authenticated user: RabbitMQ // closes the channel with a 406 (PRECONDITION_FAILED) if it does not. To turn // that footgun into a local error, Publish validates UserID client-side — a // non-empty value that differs from the authenticated user returns // ErrInvalidMessage without writing the publish frame. Leave it empty, or use // the publisher's StampUserID() option to stamp the authenticated user for you. UserID string // ContentType is the MIME type of the body (e.g. "application/json"). // Default: set from codec.ContentType() when empty. // See ContentEncoding for the transfer-encoding counterpart. ContentType string // ContentEncoding is the transfer encoding applied on top of the codec output // (e.g. "gzip", "deflate"). Default: "" (identity). Set only when you wrap the // codec's output with a compressor or similar transform. ContentEncoding string // Headers is the AMQP field-table. Supported value types: // bool, int8/16/32/64, uint8/16/32/64, float32/64, string, []byte, // time.Time, nil, Headers (nested), []any. // int and uint literals auto-coerce to int64/uint64. // Any other Go type returns ErrInvalidMessage at publish time. Headers Headers // Priority is the AMQP basic.properties.priority octet (wire range 0–255). // A priority queue's effective range is its x-max-priority (1–255; RabbitMQ // recommends <=10); a Priority above that maximum is silently clamped down to // it. Values 0–9 are the common convention, not a protocol limit. Priority on a // non-priority queue has no effect, and quorum queues do not support priorities // at all (x-max-priority is rejected on a quorum declare). Priority uint8 Timestamp time.Time // Expiration is the per-message TTL. The publisher serialises it as ASCII // milliseconds in the AMQP shortstr wire format. The broker interprets "0" as // "expire immediately", so a non-zero duration shorter than 1ms (which would // round to "0") is rejected at publish time with ErrInvalidMessage; the minimum // non-zero TTL is 1ms. A negative duration is likewise rejected with // ErrInvalidMessage rather than silently published with no TTL. A zero // Expiration means "no per-message TTL". Expiration time.Duration // DeliveryMode controls AMQP delivery persistence. The zero value is // DeliveryModePersistent so Message[M]{} defaults to durable delivery. DeliveryMode DeliveryMode // RabbitMQ extensions. // // Delay schedules the message for future delivery via the // rabbitmq_delayed_message_exchange plugin: a positive value is emitted as the // x-delay header (milliseconds, signed 32-bit) and is honored only when the // message is published to an ExchangeDelayed exchange (see DelayedTopic). A zero // Delay means no delay; a negative Delay (≤ -1 ms) or one above the ~24.8-day // ceiling is rejected at publish time with ErrInvalidMessage. Sub-millisecond // magnitudes round to zero. // // Durability caveat (load-bearing): the plugin stores scheduled messages in a // node-local, non-replicated table, so a confirmed delayed publish can still be // lost if the owning node fails before the delay elapses — even with durable // topology and confirms on. For delays that must survive node failure, prefer a // durable (ideally quorum) queue with x-message-ttl plus a DLX. See DelayedTopic. Delay time.Duration }
Message is a typed AMQP message. M is the payload type; Body holds a pointer to the decoded or to-be-encoded value.
Zero-value defaults applied by applyDefaults:
- MessageID is a UUID v7 (RFC 9562) when left empty.
- Timestamp is time.Now() when zero.
- ContentType is set from the codec when empty.
- DeliveryMode zero value is DeliveryModePersistent (durable).
type Option ¶
type Option func(*connOptions)
Option configures a Connection during Dial.
func WithAddr ¶
WithAddr sets the primary AMQP URI (e.g. "amqp://user:pass@host:5672/vhost"). If not provided, defaults to "amqp://guest:guest@localhost/".
func WithAddrs ¶
WithAddrs sets a cluster-failover list of AMQP URIs. When set, this overrides WithAddr.
Each TCP socket in the pool walks its OWN shuffled permutation of the list: the first reachable node on that permutation wins and sticks for the life of the socket. The per-socket shuffle (seeded per process and per (role, index)) spreads the initial connections across the cluster instead of every socket — and every client process — stampeding the first URI. On a disconnect, reconnect rotates round-robin to the next URI in that socket's permutation (wrapping at the end), so a downed node is skipped on the following attempt instead of being retried in place.
All URIs should share the same scheme (amqp:// or amqps://); the TLS, SASL, and credential settings configured on the Connection apply to whichever node is dialled.
func WithAuth ¶
WithAuth sets PLAIN credentials. Ignored when WithSASLMechanism(SASLExternal) is in effect (a Dial-time warning is emitted).
func WithChannelMax ¶
WithChannelMax sets the maximum number of AMQP channels to negotiate with the broker. Zero (the default) lets the server choose the limit.
func WithChannelPoolSize ¶
WithChannelPoolSize sets the number of pre-opened channels per publisher TCP connection. Default is 8.
The value must be ≥ 1 and must not exceed the channel-max ceiling, otherwise Dial returns ErrInvalidOptions. When WithChannelMax is set explicitly this is checked synchronously before any socket is opened; when WithChannelMax is 0 (server-driven, RabbitMQ defaults to 2047) it is checked against the broker-negotiated value once the handshake completes.
func WithClientProperties ¶
WithClientProperties merges additional key-value pairs into the AMQP client-properties table sent during connection.open.
func WithConnectDelay ¶
WithConnectDelay introduces a fixed pause before the first connection attempt. Useful when co-starting alongside a broker container.
func WithConnectionName ¶
WithConnectionName sets a human-readable name shown in the broker's management UI and in log lines. Defaults to "<binary>-<hostname>-<pid>".
func WithConsumerConnections ¶
WithConsumerConnections sets the number of dedicated consumer TCP connections. Default is 2 (T07d). Setting n=1 causes Dial to log a warning.
func WithDialer ¶
WithDialer sets a custom net.Conn factory used instead of warren's default keepalive dialer. Useful for testing, proxies, or Unix-socket transports.
Half-open-socket hardening (T72 / SRE-09): the default dialer enables TCP keepalive so a write to a dead peer fails promptly rather than blocking up to ConfirmTimeout. For even faster write-side failure on Linux, supply a dialer that sets TCP_USER_TIMEOUT via net.Dialer.Control, e.g.:
WithDialer((&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 15 * time.Second,
Control: func(_, _ string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) {
// TCP_USER_TIMEOUT (Linux): cap how long a write may stay unacked.
_ = unix.SetsockoptInt(int(fd), unix.IPPROTO_TCP, unix.TCP_USER_TIMEOUT, 15000)
})
}}).Dial)
func WithFrameMax ¶
WithFrameMax sets the maximum AMQP frame size in bytes.
Recommended sizing tiers:
- 4 096 B — AMQP-spec minimum; never go below this.
- 131 072 B (128 KiB) — default amqp091-go value; good for most workloads.
- 1 048 576 B (1 MiB) — high-throughput bulk transfers.
- 104 857 600 B (100 MiB) — hard ceiling; values above this risk OOM on broker and client. Use chunked publishing instead of large frames.
Zero lets the server choose. Values in [1, 4095] are rejected at Dial time with ErrInvalidOptions (AMQP spec §2.3.5 — frame minimum is 4 096 bytes).
func WithHeartbeat ¶
WithHeartbeat sets the AMQP heartbeat interval negotiated with the broker.
Partition detection: the broker (and amqp091-go) drops the connection after missing two heartbeats, so the time to notice a half-open TCP link is roughly 2× the interval. Size the interval to the detection latency you can tolerate:
- 5 s — high-throughput / low-latency services (≈10 s detection)
- 30 s — batch / low-priority workloads (≈60 s detection)
- 60 s — battery-constrained clients or behind an idle-timeout load balancer (≈120 s detection)
WithHeartbeat(0) — the zero value, also the default when the option is not set — uses the server's negotiated default (~10 s on RabbitMQ); it is not a request to disable heartbeats. A negative value disables heartbeats entirely; Dial emits a warning because disabled heartbeats prevent detection of half-open TCP connections.
func WithLogger ¶
WithLogger sets the logger used for connection lifecycle events. Defaults to log.NoOpLogger when not provided.
func WithMetrics ¶
func WithMetrics(m metrics.ClientMetrics) Option
WithMetrics sets the ClientMetrics implementation. Defaults to NoOpClientMetrics when not provided.
func WithOnBlocked ¶
WithOnBlocked registers a callback fired when the broker sends a connection.blocked notification (e.g. due to a memory or disk alarm). The reason string is the human-readable explanation from the broker.
func WithOnReconnect ¶
func WithOnReconnect(fn func()) Option
WithOnReconnect registers a callback fired after each successful reconnect barrier (topology redeclared, consumers re-subscribed). The callback runs synchronously inside the reconnect barrier before traffic resumes.
func WithOnResubscribe ¶
WithOnResubscribe registers a callback fired once per consumer re-subscribe after a reconnect, with the queue name that was re-subscribed. It runs inside the reconnect barrier alongside the consumer_resubscribed_total metric, after the replacement subscription is installed and before delivery resumes.
Each consumer pinned to a reconnecting socket fires the callback once per reconnect; keep it fast and non-blocking — a slow callback delays delivery resumption for that consumer. Use it to refresh per-subscription state (e.g. reset an in-process dedupe window) on reconnect.
func WithOnTopologyDegraded ¶
WithOnTopologyDegraded registers a callback fired exactly once each time the connection enters the degraded state (topology redeclare failed after reconnect). The callback receives the redeclare error. It is re-armed on successful recovery so it fires again on the next degraded transition.
func WithPublisherConnections ¶
WithPublisherConnections sets the number of dedicated publisher TCP connections. Default is 2 (T07d). Setting n=1 causes Dial to log a warning: a single socket is a full-availability gap during reconnect.
func WithReconnectBackoff ¶
func WithReconnectBackoff(p RetryPolicy) Option
WithReconnectBackoff configures the exponential-backoff policy for reconnect attempts. Zero-value fields in p use RetryPolicy defaults (Min=1 s, Max=30 s, Factor=2.0, unlimited retries).
func WithReconnectBarrierTimeout ¶
WithReconnectBarrierTimeout caps the synchronous reconnect barrier. With the default PublishTimeout=0 and a context.Background(), a publisher would otherwise block indefinitely behind a "half-alive" broker that accepts the socket but stalls on queue.declare (e.g. Khepri Raft-quorum recovery, RMQ-17). On cap, blocked Publish calls return ErrReconnecting (transient, retryable), and the barrier force-reconnects the socket — with WithAddrs the rotation re-dials a different node. ConfirmTimeout does NOT cover the barrier (the frame is still unwritten), so this is a distinct, necessary mechanism (DS-02). A non-positive d resets to the default. See SPEC §6.1.
func WithSASLMechanism ¶
func WithSASLMechanism(m SASLMechanism) Option
WithSASLMechanism selects the SASL mechanism. The default is SASLPlain. SASLExternal delegates authentication to the TLS client certificate and requires WithTLSConfig with at least one certificate and an amqps:// URI.
func WithTLSConfig ¶
WithTLSConfig sets the TLS configuration for amqps:// connections. Required when using WithSASLMechanism(SASLExternal).
func WithTracer ¶
WithTracer stores a connection-level OTel tracer. It is reserved for future connection-level spans and currently drives none; publish/consume spans are enabled per builder via PublisherBuilder.Tracer / ConsumerBuilder.Tracer. Defaults to otel.NoOpTracer when not provided.
func WithoutMetrics ¶
func WithoutMetrics() Option
WithoutMetrics disables all metric emission for this connection (equivalent to WithMetrics(metrics.NoOpClientMetrics{})).
type OverflowPolicy ¶
type OverflowPolicy string
OverflowPolicy sets the x-overflow queue argument on a source queue with a DeadLetter or a max-length cap. An empty value means the broker default (drop-head).
const ( // OverflowDropHead is the broker default; drops the oldest message when the queue is full. OverflowDropHead OverflowPolicy = "drop-head" // OverflowRejectPublish rejects publisher confirms (ErrPublishNacked) when the queue is full. OverflowRejectPublish OverflowPolicy = "reject-publish" // OverflowRejectPublishDLX dead-letters the overflow message instead of dropping it. OverflowRejectPublishDLX OverflowPolicy = "reject-publish-dlx" )
type PublishResult ¶
type PublishResult struct {
Err error
}
PublishResult holds the outcome for a single message in a PublishBatch call. Err is nil when the message was confirmed by the broker; non-nil for ErrInvalidMessage, ErrPublishNacked, ErrUnroutable, or ErrChannelClosed.
type Publisher ¶
type Publisher[M any] struct { // contains filtered or unexported fields }
Publisher publishes typed AMQP messages to the broker.
Publisher is safe for concurrent use by multiple goroutines.
func (*Publisher[M]) Close ¶
Close drains all in-flight Publish calls and releases pool resources. Returns ErrAlreadyClosed if called more than once.
func (*Publisher[M]) Publish ¶
Publish encodes msg and sends it to the broker. It blocks until the broker sends a publisher confirm (basic.ack or basic.nack).
Publish is safe for concurrent use by multiple goroutines.
func (*Publisher[M]) PublishBatch ¶
func (p *Publisher[M]) PublishBatch(ctx context.Context, msgs []Message[M]) ([]PublishResult, error)
PublishBatch publishes all messages in msgs on a single AMQP channel, preserving input order (RabbitMQ's per-channel ordering guarantee). It never short-circuits: even if some messages fail client-side validation, valid messages are still published and confirmed.
An empty batch (len(msgs) == 0) returns (nil, nil) without contacting the broker.
If len(msgs) exceeds the configured PublishBatchMaxSize (default 1024), PublishBatch returns (nil, ErrBatchTooLarge) immediately without any broker work.
Per-message outcomes are in []PublishResult, one slot per input. Result.Err may be:
- nil (broker confirmed and routed)
- ErrInvalidMessage (client-side header validation, nil Body, or encode failure)
- ErrUnroutable (broker returned the message via basic.return — mandatory+no binding)
- ErrPublishNacked (broker sent basic.nack, e.g. overflow=reject-publish)
- ErrChannelClosed (channel died before confirm arrived)
- ErrConfirmTimeout (no confirm received within the configured ConfirmTimeout)
If any message fails, the overall error wraps ErrPartialBatch. Note that when a connection-level error occurs (e.g. ErrReconnecting, ErrChannelPoolExhausted), results is nil and err is the connection-level error — no per-message results are available because no messages were sent to the broker.
Mandatory delivery ¶
PublishBatch fully supports publishers configured with Mandatory(). When a message has no matching binding the broker sends basic.return (before the basic.ack). The result for that slot is ErrUnroutable (wrapped with the broker reply code so AMQPCode can retrieve it). Messages without a routing failure are unaffected. Correlation is performed by MessageID: applyDefaults stamps a UUIDv7 when Message.MessageID is empty, so every message always has a unique key.
Each message in a mandatory batch must have a unique MessageID, because the return correlation keys on it: two messages sharing an explicit MessageID would make the second returnTagMap.Store overwrite the first, mis-attributing ErrUnroutable. PublishBatch enforces this for Mandatory() publishers — a batch containing duplicate explicit MessageIDs is rejected with ErrInvalidMessage before any broker work (no message is published). Empty MessageIDs are auto-stamped with a unique UUIDv7 by applyDefaults, so only caller-supplied IDs can collide; leaving MessageID empty always passes. The check is scoped to mandatory publishers: a non-mandatory batch never receives basic.return frames, so there is no correlation map to corrupt and duplicate IDs are allowed.
PublishTimeout ¶
PublishTimeout configured on the publisher is NOT applied to PublishBatch. If a per-batch deadline is needed, wrap ctx with context.WithTimeout before calling PublishBatch.
Channel-close recovery ¶
Per-message ErrChannelClosed does NOT distinguish "broker persisted" from "broker did not receive". Retry produces duplicates when the broker persisted but the ack was lost. PublishRetry does NOT apply to PublishBatch — chunking and partial-retry are the caller's responsibility because the right strategy is workload-specific. Consumers MUST be idempotent per SPEC §6.2.1.
PublishRetry ¶
PublishRetry configured on the publisher is intentionally ignored for PublishBatch. Retry semantics across a multi-message batch require the caller to understand which messages were persisted vs lost, so automatic retry would produce uncontrolled duplicates. Use PublishRetry only with Publish (single message).
type PublisherBuilder ¶
type PublisherBuilder[M any] struct { // contains filtered or unexported fields }
PublisherBuilder configures and builds a Publisher[M].
All option methods follow a last-wins policy: calling the same method twice keeps only the final value.
func PublisherFor ¶
func PublisherFor[M any](conn *Connection) *PublisherBuilder[M]
PublisherFor returns a builder for a Publisher[M] tied to conn.
func (*PublisherBuilder[M]) Build ¶
func (b *PublisherBuilder[M]) Build() (*Publisher[M], error)
Build constructs and returns a Publisher[M]. Returns an error if the builder state is invalid.
func (*PublisherBuilder[M]) Codec ¶
func (b *PublisherBuilder[M]) Codec(c codec.Codec) *PublisherBuilder[M]
Codec sets the message codec. Default: JSON (lax by default — accepts unknown fields per Postel's Law so producer-first deploys do not poison v1 consumers' DLQs). Use codec.NewJSONStrict for consumer-side schema enforcement.
func (*PublisherBuilder[M]) ConfirmTimeout ¶
func (b *PublisherBuilder[M]) ConfirmTimeout(d time.Duration) *PublisherBuilder[M]
ConfirmTimeout sets the deadline for receiving a publisher confirm (basic.ack or basic.nack) after a publish. Default: 30 s. Zero disables the confirm deadline (discouraged; the publisher may block indefinitely if the broker never confirms).
func (*PublisherBuilder[M]) Exchange ¶
func (b *PublisherBuilder[M]) Exchange(name string) *PublisherBuilder[M]
Exchange sets the AMQP exchange name. Default: "" (default exchange).
func (*PublisherBuilder[M]) Mandatory ¶
func (b *PublisherBuilder[M]) Mandatory() *PublisherBuilder[M]
Mandatory sets the AMQP mandatory flag on every publish. A mandatory publish that cannot be routed to any queue triggers a basic.return frame and Publish returns ErrUnroutable (OnReturn fires first if set).
func (*PublisherBuilder[M]) MaxMessageSizeBytes ¶
func (b *PublisherBuilder[M]) MaxMessageSizeBytes(n int) *PublisherBuilder[M]
MaxMessageSizeBytes caps the encoded body size each Publish accepts. Publishes whose serialised body exceeds n bytes are rejected locally with ErrMessageTooLarge — protecting the publisher from OOM and the broker from frame fragmentation pressure (the broker-side equivalent, reply code 311 CONTENT_TOO_LARGE, only fires after the payload has been allocated and partially sent).
Default: 16 MiB (16 * 1024 * 1024). Pass 0 to disable the guardrail (discouraged for production paths). Negative values fail Build with ErrInvalidOptions.
The cap is enforced against the encoded body, not the in-memory Message[M], so it matches what travels on the wire. ErrMessageTooLarge is classified permanent (IsPermanent == true): the same payload will never fit on retry.
func (*PublisherBuilder[M]) Metrics ¶
func (b *PublisherBuilder[M]) Metrics(pm metrics.PublisherMetrics) *PublisherBuilder[M]
Metrics sets the PublisherMetrics recorder. Default: NoOp.
func (*PublisherBuilder[M]) OnReturn ¶
func (b *PublisherBuilder[M]) OnReturn(cb func(Return)) *PublisherBuilder[M]
OnReturn registers a callback that fires synchronously before Publish unblocks when a mandatory publish is returned by the broker (basic.return). The callback receives the full Return including properties and reply code. Last-wins: calling OnReturn twice keeps only the second callback.
Goroutine and blocking contract: the callback runs inline on the publisher channel's return-demux goroutine, which is driven by the AMQP connection's single reader goroutine (the basic.return frame is delivered over an unbuffered channel, so the reader stalls until the callback returns). A slow or blocking callback therefore stalls confirm and return processing for every publisher sharing that connection — keep it fast and non-blocking, and hand heavy work off to your own goroutine. (The cross-cutting callback-invocation goroutine contract is tracked by T144.)
func (*PublisherBuilder[M]) PublishBatchMaxSize ¶
func (b *PublisherBuilder[M]) PublishBatchMaxSize(n int) *PublisherBuilder[M]
PublishBatchMaxSize sets the per-call cap for PublishBatch (T22). Default: 1024. Passing more messages than the cap in a single PublishBatch call returns ErrBatchTooLarge immediately, before any broker work; the caller should chunk.
Sizing trade-off: the cap bounds the confirm tracker memory held per call (one outstanding entry per in-flight delivery tag) and the confirm-window worst case. A deeper window pipelines more publishes before the single confirm round-trip, which raises throughput against fast or remote brokers at the cost of more tracker memory held for the duration of the call. Raise it for higher throughput; lower it to bound per-call memory.
This is a per-call cap, NOT a sliding in-flight window across calls: Publisher[M] does not throttle concurrent PublishBatch invocations against one another. Validated at PublishBatch-time only.
func (*PublisherBuilder[M]) PublishRetry ¶
func (b *PublisherBuilder[M]) PublishRetry(p RetryPolicy) *PublisherBuilder[M]
PublishRetry configures automatic retry of publishes that fail with a transient error (IsTransient(err) == true). Permanent errors are never retried. Each retry attempt increments the mandatory metric publisher_retry_total{exchange, reason}.
Retries can produce duplicates. Consumers MUST be idempotent (dedupe by MessageID). See SPEC §6.2.1.
func (*PublisherBuilder[M]) PublishTimeout ¶
func (b *PublisherBuilder[M]) PublishTimeout(d time.Duration) *PublisherBuilder[M]
PublishTimeout sets an end-to-end deadline that bounds pool acquisition + write + confirm + blocked-connection wait + reconnect barrier. Zero (default) means the caller context is the only deadline. When both PublishTimeout and the caller context have deadlines, the shorter one wins.
func (*PublisherBuilder[M]) RoutingKey ¶
func (b *PublisherBuilder[M]) RoutingKey(rk string) *PublisherBuilder[M]
RoutingKey sets the default routing key used on every Publish call.
func (*PublisherBuilder[M]) StampUserID ¶
func (b *PublisherBuilder[M]) StampUserID() *PublisherBuilder[M]
StampUserID auto-sets Message[M].UserID to conn.AuthenticatedUser() on every Publish call. Use this to avoid manually populating UserID when the broker validates the stamp. Last-wins against a previous StampUserID() call.
Note: for SASL EXTERNAL with a dynamic GetClientCertificate callback, the authenticated user is resolved once at Dial() time and may not reflect a certificate rotated after that. In that configuration, set stampUserID=false and populate Message.UserID manually from the current certificate's CN.
func (*PublisherBuilder[M]) Tracer ¶
func (b *PublisherBuilder[M]) Tracer(t otel.Tracer) *PublisherBuilder[M]
Tracer sets the OTel tracer for publish spans.
func (*PublisherBuilder[M]) WithPublishRateLimit ¶
func (b *PublisherBuilder[M]) WithPublishRateLimit(perSec int) *PublisherBuilder[M]
WithPublishRateLimit caps the sustained publish rate to perSec messages per second via a local token bucket, protecting the broker from an accidental runaway publish loop. The bucket tolerates a burst of up to perSec messages, then paces the rest evenly. Each throttled attempt increments publisher_rate_limited_total{exchange}.
A publish that cannot acquire a token before its context is cancelled (the caller's ctx or the PublishTimeout deadline) returns ErrRateLimited, which is transient and wraps the originating ctx error. A throttled-but-completed publish returns nil — the limiter only delays it.
Each broker attempt acquires a token: when PublishRetry is configured, every retry of a single Publish call paces against the bucket too (retries are real broker traffic, so the guardrail covers them), and publisher_rate_limited_total increments once per throttled attempt rather than once per Publish call.
perSec <= 0 (the default) disables the limiter. PublishBatch is not rate-limited (mirroring PublishRetry's single-message scoping — see PublishBatch's godoc). A perSec approaching 1e9 loses precision as the emission interval rounds toward the nanosecond floor, so the effective ceiling saturates near one grant per nanosecond — far above any real broker, since this is a runaway-loop guardrail rather than a precise shaper. Last-wins.
func (*PublisherBuilder[M]) WithoutMetrics ¶
func (b *PublisherBuilder[M]) WithoutMetrics() *PublisherBuilder[M]
WithoutMetrics disables all publisher metrics (last-wins against Metrics).
type Queue ¶
type Queue struct {
Name string
Durable bool
Exclusive bool
AutoDelete bool
// NoWait sends the declare without waiting for the broker's reply. This
// downgrades mismatch detection to asynchronous: Declare returns nil even
// on a conflicting redeclare, and the broker reports the conflict (e.g.
// ErrPreconditionFailed) out-of-band on a channel Declare has already
// closed, so it is generally not observable by the caller. Leave
// NoWait=false if you rely on Declare surfacing ErrTopologyMismatch.
NoWait bool
// Type selects the queue implementation (classic, quorum, stream).
// An empty value means the broker default (classic).
Type QueueType
// DeliveryLimit is the broker-enforced redelivery cap for quorum queues
// (maps to x-delivery-limit). Non-zero on a non-quorum queue is rejected by
// Topology.validate.
//
// The meaning of zero is broker-version dependent and a poison-loop footgun
// either way, so set it explicitly: on RabbitMQ 4.x a zero DeliveryLimit
// takes the broker default of 20 (the message is silently dropped/dead-
// lettered at the 21st delivery), while on RabbitMQ 3.13 a zero
// DeliveryLimit is genuinely unbounded (an unhandled poison message loops
// forever). Topology.Declare emits a version-aware warning when a quorum
// queue is declared with DeliveryLimit==0.
DeliveryLimit int
// SingleActiveConsumer maps to x-single-active-consumer.
// Not allowed on stream queues.
SingleActiveConsumer bool
// MaxPriority sets x-max-priority. Only valid on classic queues.
MaxPriority int
Args map[string]any
}
Queue declares an AMQP queue.
type QueueType ¶
type QueueType string
QueueType selects the RabbitMQ queue implementation via the x-queue-type queue argument. An empty value means the broker default (classic).
const ( // QueueTypeClassic is the default RabbitMQ queue type. QueueTypeClassic QueueType = "classic" // QueueTypeQuorum is the replicated, durable queue type recommended for production. QueueTypeQuorum QueueType = "quorum" // QueueTypeStream is available for declaration in v0.1; native stream consume is v0.2. QueueTypeStream QueueType = "stream" )
type RawHandler ¶
RawHandler is the function signature for handlers that need full delivery access. The Delivery carries the decoded body plus all AMQP envelope fields.
type Replier ¶
type Replier[Req, Resp any] struct { // contains filtered or unexported fields }
Replier serves request/reply RPC: it consumes requests from a queue, runs a ReplyHandler, and publishes the response to the request's ReplyTo with the matching CorrelationID.
At-least-once reply ordering (SPEC §6.7). For a successful handler the Replier publishes the reply, AWAITS its broker confirm, and only THEN acks the request. If the reply publish fails (ErrPublishNacked, ErrConfirmTimeout, ErrChannelClosed) the request is Nack(false)'d so it routes to the request queue's DLX (if any) and the caller observes ErrCallTimeout on its ctx deadline. A crash between the reply confirm and the request ack causes the broker to redeliver the request and the Replier to send a SECOND reply — callers MUST treat replies as at-least-once and dedupe by CorrelationID.
Handler errors never produce an error envelope on the wire. A handler that returns a non-nil error triggers Nack(false) on the request and invokes the OnError hook; the caller just times out on its ctx. Without a DLX on the request queue, Nack(false) is a silent drop — the mandatory metric replier_drop_no_dlx_total makes it observable, and OnError is the only client-side signal. Configure a DLX on the request queue if you need failed requests preserved for forensics.
Use ReplierFor[Req, Resp](conn) to build a Replier.
func (*Replier[Req, Resp]) Health ¶
Health reports whether the Replier's consumer connection is healthy.
func (*Replier[Req, Resp]) Serve ¶
func (r *Replier[Req, Resp]) Serve(ctx context.Context, h ReplyHandler[Req, Resp]) error
Serve consumes requests and serves them with h until ctx is cancelled, at which point it waits for in-flight handlers to finish and returns. Serve may only be called once per Replier (the underlying consumer is single-use; build a new Replier to restart).
See the Replier type docs for the at-least-once reply ordering and the crash-between-confirm-and-ack window that callers must dedupe against by CorrelationID.
type ReplierBuilder ¶
type ReplierBuilder[Req, Resp any] struct { // contains filtered or unexported fields }
ReplierBuilder configures and builds a Replier[Req, Resp].
All option methods follow a last-wins policy: calling the same method twice keeps only the final value.
func ReplierFor ¶
func ReplierFor[Req, Resp any](conn *Connection) *ReplierBuilder[Req, Resp]
ReplierFor returns a builder for a Replier[Req, Resp] tied to conn.
func (*ReplierBuilder[Req, Resp]) AllowMissingDLX ¶
func (b *ReplierBuilder[Req, Resp]) AllowMissingDLX() *ReplierBuilder[Req, Resp]
AllowMissingDLX opts out of the Topology DLX-presence validation, acknowledging that a handler error or reply-publish failure on this queue is a silent drop (still surfaced via OnError and replier_drop_no_dlx_total). Use it when the request queue is intentionally declared without a dead-letter exchange.
func (*ReplierBuilder[Req, Resp]) Build ¶
func (b *ReplierBuilder[Req, Resp]) Build() (*Replier[Req, Resp], error)
Build constructs and returns a Replier[Req, Resp]. It validates DLX presence against any wired Topology, builds the internal request consumer, and pins the confirm-tracked reply publisher to a publisher-role connection. Returns ErrInvalidOptions on an invalid configuration.
Silent-drop failure mode: a handler error or a failed reply publish makes the Replier Nack(false) the request. With a DLX on the request queue the request is preserved there; without one it is dropped. Configure a DLX (and wire it via Topology so this method can validate its presence) if you need failed requests kept for forensics. The mandatory metric replier_drop_no_dlx_total and the OnError hook are the only signals when no DLX is configured.
func (*ReplierBuilder[Req, Resp]) Codec ¶
func (b *ReplierBuilder[Req, Resp]) Codec(c codec.Codec) *ReplierBuilder[Req, Resp]
Codec sets the codec used to decode requests and encode replies. Default: JSON.
func (*ReplierBuilder[Req, Resp]) ConfirmTimeout ¶
func (b *ReplierBuilder[Req, Resp]) ConfirmTimeout(d time.Duration) *ReplierBuilder[Req, Resp]
ConfirmTimeout bounds how long the Replier waits for the broker confirm of a reply publish before treating it as failed (and Nack(false)'ing the request). Default: 30 s.
func (*ReplierBuilder[Req, Resp]) Metrics ¶
func (b *ReplierBuilder[Req, Resp]) Metrics(cm metrics.ConsumerMetrics) *ReplierBuilder[Req, Resp]
Metrics sets the ConsumerMetrics recorder (which carries replier_drop_no_dlx_total). Default: NoOp.
func (*ReplierBuilder[Req, Resp]) OnError ¶
func (b *ReplierBuilder[Req, Resp]) OnError(fn func(ctx context.Context, req Req, err error)) *ReplierBuilder[Req, Resp]
OnError registers a hook invoked when a successful reply cannot be produced or addressed: the handler returns a non-nil error, the response fails to encode, or the request carries no ReplyTo address. (A reply that encodes and is published but never confirms is NOT reported here — that is a transport failure, not a handler one.) In every case the request is Nack(false)'d (so it routes to a DLX if configured, or is dropped if not) and no error envelope is sent to the caller — the caller observes ErrCallTimeout once its ctx expires.
The silent-drop failure mode is load-bearing: without a DLX on the request queue, Nack(false) is a drop and OnError is the only client-side signal. Log, metric, or alert from it. The mandatory metric replier_drop_no_dlx_total increments on every such drop even if OnError is not wired.
func (*ReplierBuilder[Req, Resp]) Queue ¶
func (b *ReplierBuilder[Req, Resp]) Queue(name string) *ReplierBuilder[Req, Resp]
Queue sets the request queue the Replier consumes from. Required.
func (*ReplierBuilder[Req, Resp]) Topology ¶
func (b *ReplierBuilder[Req, Resp]) Topology(t *Topology) *ReplierBuilder[Req, Resp]
Topology supplies the Topology used to declare the request queue so Build can statically validate that the queue has a DeadLetter entry. When it does not, Build returns ErrInvalidOptions unless AllowMissingDLX opts out. When the request queue is declared out-of-band (no Topology wired), the library cannot detect a missing DLX statically and the replier_drop_no_dlx_total metric plus OnError remain the only signal.
type ReplyHandler ¶
ReplyHandler is the function signature a Replier serves: it maps a decoded request to a response (or an error, which the Replier surfaces via OnError and nacks without requeue — never an error envelope on the wire). Defined here so both the Caller (T29) and Replier (T30) sides share one declaration.
type RetryPolicy ¶
type RetryPolicy struct {
// Min is the minimum backoff duration. Defaults to 1s when zero. It is also
// the floor every jitter strategy clamps up to, so no attempt returns less.
Min time.Duration
// Max is the maximum backoff duration. Defaults to 30s when zero.
Max time.Duration
// Factor is the exponential multiplier applied per failed attempt.
// Defaults to 2.0 when not a positive number (zero, negative, or NaN).
Factor float64
// Retries is the maximum number of consecutive failed attempts before the
// reconnect loop gives up. Zero means unlimited retries.
Retries int
// Jitter selects how the exponential delay is perturbed to de-synchronize a
// recovering fleet. The zero value is JitterFull (SRE-recommended); set
// JitterNone for deterministic timing in tests.
Jitter JitterStrategy
}
RetryPolicy configures exponential backoff with jitter for the reconnect loop and publish retries. Zero values are replaced with safe defaults at call time: Min=1s, Max=30s, Factor=2.0, Jitter=JitterFull.
func (RetryPolicy) NextBackoff ¶
func (p RetryPolicy) NextBackoff(n int) time.Duration
NextBackoff returns the backoff duration for attempt n (1-indexed). The pure exponential delay is Min*Factor^(n-1) capped at Max; the configured Jitter strategy then perturbs it, and the result is always clamped to the effective [Min, Max]. Min is the floor and wins when Max < Min: a degenerate config returns Min rather than a value below it. The result is always finite and non-negative for any input — a non-positive or NaN Factor falls back to the 2.0 default, and an out-of-range attempt n saturates to Min (n <= 0) or Max (very large n) rather than under- or overflowing.
type Return ¶
type Return struct {
ReplyCode uint16
ReplyText string
Exchange string
RoutingKey string
Properties ReturnedProperties
}
Return carries the broker's basic.return frame for a mandatory publish that could not be routed to any queue. OnReturn callbacks receive this value synchronously before the corresponding Publish call unblocks.
type ReturnedProperties ¶
type ReturnedProperties struct {
ContentType string
ContentEncoding string
Headers Headers
DeliveryMode DeliveryMode
Priority uint8
CorrelationID string
ReplyTo string
// Expiration is the per-message TTL encoded as milliseconds in the wire
// frame. Zero means no per-message TTL was set.
Expiration time.Duration
MessageID string
Timestamp time.Time
Type string
UserID string
AppID string
}
ReturnedProperties mirrors the 13 AMQP basic.properties fields carried in a basic.return frame. It has the same semantics as the corresponding Message[M] fields; see those godoc entries for value constraints.
type SASLMechanism ¶
type SASLMechanism string
SASLMechanism selects the SASL mechanism for the AMQP 0-9-1 handshake. The default is SASLPlain (username + password). SASLExternal delegates authentication to the TLS client certificate; WithAuth becomes a no-op and emits a Dial-time warning.
const ( // SASLPlain authenticates with username and password via the PLAIN mechanism. SASLPlain SASLMechanism = "PLAIN" // SASLExternal authenticates via TLS client certificate; requires amqps:// and a client cert. SASLExternal SASLMechanism = "EXTERNAL" )
type TimeoutVerdict ¶
type TimeoutVerdict uint8
TimeoutVerdict decides the ack/nack action when a handler exceeds its HandlerTimeout. The zero value is TimeoutNackNoRequeue so that a misconfigured handler does not create an infinite requeue loop.
const ( // TimeoutNackNoRequeue is the default; the message goes to the DLX (or is dropped). TimeoutNackNoRequeue TimeoutVerdict = iota // TimeoutNackRequeue requeues the message; subject to MaxRedeliveries / x-delivery-limit. TimeoutNackRequeue )
type Topology ¶
type Topology struct {
Exchanges []Exchange
Queues []Queue
Bindings []Binding
DeadLetters []DeadLetter
// ExchangeBindings declares exchange→exchange bindings (exchange.bind) for
// layered fan-out topologies (T69 / EDA-03). It is a SEPARATE slice — Binding
// (exchange→queue) is intentionally not reshaped (GA-05). The declare-once /
// deep-snapshot semantics extend to ExchangeBindings.
ExchangeBindings []ExchangeBinding
}
Topology describes the AMQP exchanges, queues, bindings, and dead-letter rules to be declared on the broker. Declare it once and reuse via AttachTo(conn) so the reconnect barrier can redeclare the full topology after a reconnect.
Topology.Declare is NOT concurrency-safe with itself or with AttachTo. Recommended pattern: call Declare once during application startup (sync.Once), then call AttachTo on the same Topology to hook into reconnect.
func (*Topology) AttachTo ¶
func (t *Topology) AttachTo(conn *Connection) error
AttachTo registers a deep snapshot of t as a reconnect redeclare callback on conn. On every reconnect cycle, the snapshot is passed to Topology.Declare inside the synchronous reconnect barrier — before publishers resume and before consumers re-issue basic.consume.
Snapshots are keyed by the pointer address of t. Calling AttachTo(conn) with the same *Topology pointer a second time replaces the prior snapshot (useful when the caller edits the topology and wants the new shape on the next reconnect). Calling AttachTo(conn) with a different pointer appends a new snapshot; all registered snapshots fire in registration order.
Returns ErrInvalidOptions if the topology fails validation. The recommended pattern is to call Declare first (which also validates), then AttachTo on the same pointer — that way validation errors surface at startup, not on reconnect.
Topology.Declare and AttachTo are NOT concurrency-safe with each other.
func (*Topology) Declare ¶
func (t *Topology) Declare(ctx context.Context, conn *Connection) error
Declare validates the topology, expands DLX entries in-memory (Step 1), then opens a temporary channel and emits exchange → queue → binding declares in that order (Step 2). It is idempotent: re-declaring the same shape returns nil. A conflicting redeclare returns ErrTopologyMismatch (which also satisfies errors.Is(err, ErrPreconditionFailed)).
When any entry sets NoWait=true, Declare cannot detect a conflict on that entry synchronously and returns nil even on a mismatch; see the NoWait field docs.
Topology.Declare is NOT concurrency-safe with itself or with AttachTo. Recommended pattern: call Declare exactly once at application startup (e.g. protected by sync.Once), then call AttachTo for reconnect hooks.
Source Files
¶
- batch_consumer.go
- batch_consumer_builder.go
- channelpool.go
- connection.go
- consumer.go
- consumer_builder.go
- delay.go
- delivery.go
- doc.go
- errors.go
- fixture.go
- message.go
- options_connection.go
- publisher.go
- publisher_builder.go
- publisher_rate_limiter.go
- retry.go
- rpc.go
- rpc_caller_builder.go
- rpc_replier.go
- rpc_replier_builder.go
- topology.go
- types.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package codec provides message encoding and decoding for AMQP publishers and consumers.
|
Package codec provides message encoding and decoding for AMQP publishers and consumers. |
|
Package conformance holds AMQP 0-9-1 wire-protocol conformance tests for warren.
|
Package conformance holds AMQP 0-9-1 wire-protocol conformance tests for warren. |
|
examples
|
|
|
batch_consume
command
Package main demonstrates how to consume AMQP messages in batches using the Warren library's BatchConsumer[M], with both size-based and timer-based flush triggers.
|
Package main demonstrates how to consume AMQP messages in batches using the Warren library's BatchConsumer[M], with both size-based and timer-based flush triggers. |
|
batch_publish
command
Package main demonstrates how to publish messages in batches using the Warren library's PublishBatch — a single always-all pipeline that preserves per-channel message ordering and returns a []PublishResult slice.
|
Package main demonstrates how to publish messages in batches using the Warren library's PublishBatch — a single always-all pipeline that preserves per-channel message ordering and returns a []PublishResult slice. |
|
consume
command
Package main demonstrates how to consume typed AMQP messages using the Warren library with per-message handler verdict, MaxRedeliveries enforcement, and HandlerTimeout.
|
Package main demonstrates how to consume typed AMQP messages using the Warren library with per-message handler verdict, MaxRedeliveries enforcement, and HandlerTimeout. |
|
deadletter
command
Package main demonstrates dead-letter topology expansion using the Warren library.
|
Package main demonstrates dead-letter topology expansion using the Warren library. |
|
delayed
command
Package main demonstrates delayed message delivery using the Warren library and the RabbitMQ rabbitmq_delayed_message_exchange plugin.
|
Package main demonstrates delayed message delivery using the Warren library and the RabbitMQ rabbitmq_delayed_message_exchange plugin. |
|
idempotent_consume
command
Package main demonstrates the canonical at-least-once dedupe pattern from SPEC §6.2.1: an idempotent consumer that keys a dedupe store on Delivery.MessageID so a message redelivered by PublishRetry, the reconnect barrier, a confirm timeout, or a consumer crash is processed by the business handler exactly once.
|
Package main demonstrates the canonical at-least-once dedupe pattern from SPEC §6.2.1: an idempotent consumer that keys a dedupe store on Delivery.MessageID so a message redelivered by PublishRetry, the reconnect barrier, a confirm timeout, or a consumer crash is processed by the business handler exactly once. |
|
ordered_consume
command
Package main demonstrates strict per-queue ordering with consumer failover, the SPEC §6.3 pattern: Queue.SingleActiveConsumer=true + Consumer.Concurrency(1).
|
Package main demonstrates strict per-queue ordering with consumer failover, the SPEC §6.3 pattern: Queue.SingleActiveConsumer=true + Consumer.Concurrency(1). |
|
otel
command
Package main demonstrates OpenTelemetry trace propagation through Warren: a publish span and a consume span that share one trace, linked parent → child across the broker via W3C TraceContext headers (SPEC §6.9).
|
Package main demonstrates OpenTelemetry trace propagation through Warren: a publish span and a consume span that share one trace, linked parent → child across the broker via W3C TraceContext headers (SPEC §6.9). |
|
publish
command
Package main demonstrates how to publish a typed AMQP message using the Warren library with publisher confirms, mandatory routing, and retry policy.
|
Package main demonstrates how to publish a typed AMQP message using the Warren library with publisher confirms, mandatory routing, and retry policy. |
|
rpc
command
Package main demonstrates synchronous request/reply RPC over AMQP using the Warren library's Caller and Replier over RabbitMQ direct reply-to.
|
Package main demonstrates synchronous request/reply RPC over AMQP using the Warren library's Caller and Replier over RabbitMQ direct reply-to. |
|
topology
command
Package main demonstrates topology declaration, idempotency, and reconnect redeclare using the Warren library.
|
Package main demonstrates topology declaration, idempotency, and reconnect redeclare using the Warren library. |
|
internal
|
|
|
amqperror
Package amqperror translates *amqp091.Error values (delivered by the broker on channel/connection close) into wrapped chains of the reply-code sentinels declared in the root warren package.
|
Package amqperror translates *amqp091.Error values (delivered by the broker on channel/connection close) into wrapped chains of the reply-code sentinels declared in the root warren package. |
|
amqptest
Package amqptest is an internal testcontainers-go helper that spins up a RabbitMQ broker for warren's own integration suites (the root and example *_integration_test.go files).
|
Package amqptest is an internal testcontainers-go helper that spins up a RabbitMQ broker for warren's own integration suites (the root and example *_integration_test.go files). |
|
cmd/covercheck
command
Command covercheck enforces per-package and critical-path coverage floors against a Go coverage profile, failing (exit 1) when any package drops below its threshold.
|
Command covercheck enforces per-package and critical-path coverage floors against a Go coverage profile, failing (exit 1) when any package drops below its threshold. |
|
confirms
Package confirms manages publisher confirmations for a single AMQP channel.
|
Package confirms manages publisher confirmations for a single AMQP channel. |
|
connpool
Package connpool provides pure helper functions for the multi-TCP-connection pool used by Connection (T07d).
|
Package connpool provides pure helper functions for the multi-TCP-connection pool used by Connection (T07d). |
|
reconnect
Package reconnect provides a supervised reconnect loop with configurable exponential backoff.
|
Package reconnect provides a supervised reconnect loop with configurable exponential backoff. |
|
redact
Package redact is the mandatory credential-redaction choke-point for the warren library.
|
Package redact is the mandatory credential-redaction choke-point for the warren library. |
|
Package log provides the Logger interface and three adapters (NoOp, Slog, Std) used by the warren library for structured log emission.
|
Package log provides the Logger interface and three adapters (NoOp, Slog, Std) used by the warren library for structured log emission. |
|
Package otel provides the Tracer interface used by Publisher and Consumer to emit OpenTelemetry spans, and a Propagator that injects and extracts W3C TraceContext headers from AMQP message headers.
|
Package otel provides the Tracer interface used by Publisher and Consumer to emit OpenTelemetry spans, and a Propagator that injects and extracts W3C TraceContext headers from AMQP message headers. |