rabbitmq

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: MIT Imports: 11 Imported by: 0

README

rabbitwrap

Go Reference CI Go Report Card

A production-ready RabbitMQ client wrapper for Go with automatic reconnection, publisher confirms, consumer middleware, and a fluent API.

Installation

go get github.com/KARTIKrocks/rabbitwrap

Requires Go 1.22+.

Features

  • Auto-reconnection with exponential backoff for connections, publishers, and consumers
  • Publisher confirms for reliable message delivery
  • Consumer middleware (logging, recovery, retry — or bring your own)
  • Concurrent consumers with configurable worker goroutines
  • Graceful shutdown waits for in-flight handlers to complete
  • Message builder with fluent API
  • Batch publishing support
  • Dead letter queue and quorum queue support
  • TLS support
  • Health checks via conn.IsHealthy()
  • Structured logging via pluggable Logger interface
  • Thread-safe — connections and publishers safe for concurrent use

Quick Start

import rabbitmq "github.com/KARTIKrocks/rabbitwrap"

config := rabbitmq.DefaultConfig().
    WithHost("localhost", 5672).
    WithCredentials("guest", "guest").
    WithLogger(rabbitmq.NewStdLogger())

conn, err := rabbitmq.NewConnection(config)
if err != nil {
    log.Fatal(err)
}
defer conn.Close()

See examples/basic/main.go for a complete working example.

Connection

Basic Connection
config := rabbitmq.DefaultConfig().
    WithHost("localhost", 5672).
    WithCredentials("guest", "guest")

conn, err := rabbitmq.NewConnection(config)
if err != nil {
    log.Fatal(err)
}
defer conn.Close()

conn.OnConnect(func() {
    log.Println("Connected to RabbitMQ")
})

conn.OnDisconnect(func(err error) {
    log.Printf("Disconnected: %v", err)
})
Connection with URL
config := rabbitmq.DefaultConfig().
    WithURL("amqp://user:pass@localhost:5672/vhost")
TLS Connection
config := rabbitmq.DefaultConfig().
    WithHost("localhost", 5671).
    WithTLS(&tls.Config{MinVersion: tls.VersionTLS12})
Reconnection with Exponential Backoff
config := rabbitmq.DefaultConfig().
    WithReconnect(
        1*time.Second,   // initial delay
        60*time.Second,  // max delay
        0,               // max attempts (0 = unlimited)
    )

The delay doubles on each attempt: 1s, 2s, 4s, 8s, ... up to the max delay.

Logging
// Use built-in standard logger
config := rabbitmq.DefaultConfig().
    WithLogger(rabbitmq.NewStdLogger())

// Or implement the Logger interface for your framework
type Logger interface {
    Debugf(format string, args ...any)
    Infof(format string, args ...any)
    Warnf(format string, args ...any)
    Errorf(format string, args ...any)
}

Publishing Messages

Basic Publisher
pubConfig := rabbitmq.DefaultPublisherConfig().
    WithExchange("my-exchange").
    WithRoutingKey("my-key")

publisher, err := rabbitmq.NewPublisher(conn, pubConfig)
if err != nil {
    log.Fatal(err)
}
defer publisher.Close()

// Publish text message
err = publisher.PublishText(ctx, "Hello, World!")

// Publish JSON message
err = publisher.PublishJSON(ctx, map[string]any{
    "user_id": 123,
    "action":  "login",
})

// Publish with custom message
msg := rabbitmq.NewMessage([]byte("data")).
    WithPriority(5).
    WithHeader("trace-id", "abc123")

err = publisher.Publish(ctx, msg)

Publishers automatically re-establish their channel when the connection recovers.

Publish to Specific Exchange/Key
err = publisher.PublishWithKey(ctx, "different-key", msg)
err = publisher.PublishToExchange(ctx, "other-exchange", "key", msg)
Batch Publishing
batch := rabbitmq.NewBatchPublisher(publisher)

batch.Add(rabbitmq.NewTextMessage("message 1"))
batch.Add(rabbitmq.NewTextMessage("message 2"))
batch.AddWithKey("specific-key", rabbitmq.NewTextMessage("message 3"))

err = batch.PublishAndClear(ctx)
Publisher Confirms
pubConfig := rabbitmq.DefaultPublisherConfig().
    WithConfirmMode(true, 5*time.Second)

publisher, _ := rabbitmq.NewPublisher(conn, pubConfig)

err := publisher.Publish(ctx, msg)
if errors.Is(err, rabbitmq.ErrNack) {
    // Message was not acknowledged by broker
}
if errors.Is(err, rabbitmq.ErrTimeout) {
    // Confirmation timed out
}

Consuming Messages

Basic Consumer
consConfig := rabbitmq.DefaultConsumerConfig().
    WithQueue("my-queue").
    WithPrefetch(10, 0)

consumer, err := rabbitmq.NewConsumer(conn, consConfig)
if err != nil {
    log.Fatal(err)
}
defer consumer.Close()

err = consumer.Consume(ctx, func(ctx context.Context, d *rabbitmq.Delivery) error {
    log.Printf("Received: %s", d.Text())
    return nil // return nil to ack, error to nack
})

Consumers automatically resume consuming after the connection recovers.

Concurrent Consumers

Process messages in parallel with multiple worker goroutines:

consConfig := rabbitmq.DefaultConsumerConfig().
    WithQueue("my-queue").
    WithPrefetch(50, 0).
    WithConcurrency(5).
    WithGracefulShutdown(true)

On Close(), the consumer waits for all in-flight handlers to finish. Use CloseWithContext to set a shutdown deadline:

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
consumer.CloseWithContext(ctx)
Manual Message Handling
deliveryCh, err := consumer.Start(ctx)
if err != nil {
    log.Fatal(err)
}

for delivery := range deliveryCh {
    if processOK {
        delivery.Ack(false)
    } else {
        delivery.Nack(false, true) // requeue
    }
}
Consumer Middleware

Middleware wraps the message handler, executing in order (outermost first):

consConfig := rabbitmq.DefaultConsumerConfig().
    WithQueue("my-queue").
    WithMiddleware(
        rabbitmq.LoggingMiddleware(rabbitmq.NewStdLogger()),
        rabbitmq.RecoveryMiddleware(func(r any) {
            log.Printf("recovered from panic: %v", r)
        }),
        rabbitmq.RetryMiddleware(3, 1*time.Second),
    )
Built-in Middleware
Middleware Description
LoggingMiddleware(logger) Logs message processing with duration
RecoveryMiddleware(onPanic) Recovers from panics in handlers
RetryMiddleware(maxRetries, delay) Retries failed message processing
Custom Middleware
func TracingMiddleware(tracer Tracer) rabbitmq.Middleware {
    return func(next rabbitmq.MessageHandler) rabbitmq.MessageHandler {
        return func(ctx context.Context, d *rabbitmq.Delivery) error {
            span := tracer.StartSpan("process_message")
            defer span.End()
            return next(ctx, d)
        }
    }
}
Composing Middleware
combined := rabbitmq.Chain(mw1, mw2, mw3)
handler := combined(myHandler)
Error Handling
consConfig := rabbitmq.DefaultConsumerConfig().
    WithQueue("my-queue").
    WithRequeueOnError(true).
    WithErrorHandler(func(err error) {
        log.Printf("Consumer error: %v", err)
    })

Health Checks

if conn.IsHealthy() {
    // Connection is open and responsive
}

if conn.IsClosed() {
    // Connection has been closed
}

Queue and Exchange Management

Declare Queue
info, err := consumer.DeclareQueue("my-queue", true, false, false, nil)

// With configuration
queueConfig := rabbitmq.DefaultQueueConfig("my-queue").
    WithDurable(true).
    WithDeadLetter("dlx-exchange", "dlx-key").
    WithMessageTTL(24 * time.Hour).
    WithMaxLength(10000)

info, err = consumer.DeclareQueueWithConfig(queueConfig)

// Quorum queue for high availability
queueConfig = rabbitmq.DefaultQueueConfig("ha-queue").WithQuorum()
info, err = consumer.DeclareQueueWithConfig(queueConfig)
Declare Exchange
err = publisher.DeclareExchange("my-exchange", rabbitmq.ExchangeTopic, true, false, nil)

exchangeConfig := rabbitmq.DefaultExchangeConfig("my-exchange", rabbitmq.ExchangeFanout).
    WithDurable(true)
err = consumer.DeclareExchange(exchangeConfig)
Bind/Unbind Queue
err = consumer.BindQueue("my-queue", "my-exchange", "routing.key", nil)
err = consumer.UnbindQueue("my-queue", "my-exchange", "routing.key", nil)
Delete/Purge
deletedMsgs, err := consumer.DeleteQueue("my-queue", false, false)
purgedMsgs, err := consumer.PurgeQueue("my-queue")
err = consumer.DeleteExchange("my-exchange", false)

Message Types

// Binary
msg := rabbitmq.NewMessage([]byte("binary data"))

// Text
msg := rabbitmq.NewTextMessage("Hello, World!")

// JSON
msg, err := rabbitmq.NewJSONMessage(map[string]any{"key": "value"})
Message Options
msg := rabbitmq.NewMessage(data).
    WithContentType("application/json").
    WithDeliveryMode(rabbitmq.Persistent).
    WithPriority(5).
    WithCorrelationID("request-123").
    WithReplyTo("reply-queue").
    WithMessageID("msg-001").
    WithType("user.created").
    WithAppID("my-app").
    WithTTL(1 * time.Hour).
    WithHeader("trace-id", "abc").
    WithHeaders(map[string]any{"key": "value"})

Sentinel Errors

rabbitmq.ErrConnectionClosed  // Connection is closed
rabbitmq.ErrChannelClosed     // Channel is closed
rabbitmq.ErrPublishFailed     // Publish operation failed
rabbitmq.ErrConsumeFailed     // Consume operation failed
rabbitmq.ErrInvalidConfig     // Invalid configuration
rabbitmq.ErrNotConnected      // Not connected
rabbitmq.ErrTimeout           // Operation timeout
rabbitmq.ErrNack              // Message was nacked
rabbitmq.ErrMaxReconnects     // Max reconnection attempts reached
rabbitmq.ErrShuttingDown      // Shutting down

if errors.Is(err, rabbitmq.ErrConnectionClosed) {
    // Handle...
}

Development

# Run unit tests
make test

# Run go vet + golangci-lint + staticcheck
make check

# Run integration tests (requires Docker)
make test-integration

# Start RabbitMQ locally
make docker-up

Thread Safety

  • Connection — safe for concurrent use
  • Publisher — safe for concurrent use
  • Consumer — use one goroutine per consumer; create multiple consumers for parallel processing

Contributing

See CONTRIBUTING.md.

License

MIT

Documentation

Overview

Package rabbitmq provides a simplified, production-ready wrapper around the official RabbitMQ Go client (amqp091-go).

It offers connection management with automatic reconnection, publisher confirms for reliable message delivery, consumer management with prefetch and manual acknowledgment, a fluent message builder API, batch publishing, dead letter queue support, and TLS support.

Connection

Create a connection with automatic reconnection using exponential backoff:

config := rabbitmq.DefaultConfig().
    WithHost("localhost", 5672).
    WithCredentials("guest", "guest").
    WithReconnect(1*time.Second, 60*time.Second, 0)

conn, err := rabbitmq.NewConnection(config)
if err != nil {
    log.Fatal(err)
}
defer conn.Close()

Publishing

Publish messages with publisher confirms enabled by default:

publisher, err := rabbitmq.NewPublisher(conn,
    rabbitmq.DefaultPublisherConfig().
        WithExchange("my-exchange").
        WithRoutingKey("my-key"),
)
if err != nil {
    log.Fatal(err)
}
defer publisher.Close()

// Text message
err = publisher.PublishText(ctx, "Hello, World!")

// JSON message
err = publisher.PublishJSON(ctx, map[string]any{"event": "user.created"})

// Custom message with headers
msg := rabbitmq.NewMessage([]byte("data")).
    WithPriority(5).
    WithHeader("trace-id", "abc123")
err = publisher.Publish(ctx, msg)

Consuming

Consume messages with automatic ack/nack handling and middleware:

consumer, err := rabbitmq.NewConsumer(conn,
    rabbitmq.DefaultConsumerConfig().
        WithQueue("my-queue").
        WithPrefetch(10, 0).
        WithMiddleware(
            rabbitmq.LoggingMiddleware(rabbitmq.NewStdLogger()),
            rabbitmq.RecoveryMiddleware(func(r any) {
                log.Printf("panic: %v", r)
            }),
        ),
)
if err != nil {
    log.Fatal(err)
}
defer consumer.Close()

err = consumer.Consume(ctx, func(ctx context.Context, d *rabbitmq.Delivery) error {
    log.Printf("Received: %s", d.Text())
    return nil // return nil to ack, error to nack
})

Middleware

The package includes built-in middleware for common patterns:

Custom middleware can be created by implementing the Middleware type:

func MyMiddleware() rabbitmq.Middleware {
    return func(next rabbitmq.MessageHandler) rabbitmq.MessageHandler {
        return func(ctx context.Context, d *rabbitmq.Delivery) error {
            // before
            err := next(ctx, d)
            // after
            return err
        }
    }
}

Package rabbitmq provides a simplified interface for RabbitMQ messaging with support for publishers, consumers, exchanges, and queues.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConnectionClosed = errors.New("rabbitmq: connection closed")
	ErrChannelClosed    = errors.New("rabbitmq: channel closed")
	ErrPublishFailed    = errors.New("rabbitmq: publish failed")
	ErrConsumeFailed    = errors.New("rabbitmq: consume failed")
	ErrInvalidConfig    = errors.New("rabbitmq: invalid configuration")
	ErrNotConnected     = errors.New("rabbitmq: not connected")
	ErrTimeout          = errors.New("rabbitmq: operation timeout")
	ErrNack             = errors.New("rabbitmq: message was nacked")
	ErrMaxReconnects    = errors.New("rabbitmq: max reconnection attempts reached")
	ErrShuttingDown     = errors.New("rabbitmq: shutting down")
)

Sentinel errors for RabbitMQ operations.

Functions

This section is empty.

Types

type BatchPublisher

type BatchPublisher struct {
	// contains filtered or unexported fields
}

BatchPublisher enables batch publishing.

func NewBatchPublisher

func NewBatchPublisher(publisher *Publisher) *BatchPublisher

NewBatchPublisher creates a new batch publisher.

func (*BatchPublisher) Add

func (b *BatchPublisher) Add(msg *Message) *BatchPublisher

Add adds a message to the batch.

func (*BatchPublisher) AddToExchange

func (b *BatchPublisher) AddToExchange(exchange, routingKey string, msg *Message) *BatchPublisher

AddToExchange adds a message to a specific exchange.

func (*BatchPublisher) AddWithKey

func (b *BatchPublisher) AddWithKey(routingKey string, msg *Message) *BatchPublisher

AddWithKey adds a message with a specific routing key.

func (*BatchPublisher) Clear

func (b *BatchPublisher) Clear()

Clear clears the batch.

func (*BatchPublisher) Publish

func (b *BatchPublisher) Publish(ctx context.Context) error

Publish publishes all messages in the batch.

func (*BatchPublisher) PublishAndClear

func (b *BatchPublisher) PublishAndClear(ctx context.Context) error

PublishAndClear publishes all messages and clears the batch.

func (*BatchPublisher) Size

func (b *BatchPublisher) Size() int

Size returns the number of messages in the batch.

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel wraps an AMQP channel.

func (*Channel) Close

func (c *Channel) Close() error

Close closes the channel.

func (*Channel) Raw

func (c *Channel) Raw() *amqp.Channel

Raw returns the underlying amqp.Channel.

func (*Channel) SetQos

func (c *Channel) SetQos(prefetchCount, prefetchSize int, global bool) error

SetQos sets the quality of service.

type Config

type Config struct {
	// URL is the AMQP connection URL.
	URL string

	// Host is the RabbitMQ host (used if URL is empty).
	Host string

	// Port is the RabbitMQ port (default: 5672).
	Port int

	// Username for authentication (default: "guest").
	Username string

	// Password for authentication (default: "guest").
	Password string

	// VHost is the virtual host (default: "/").
	VHost string

	// TLS configuration for secure connections.
	TLS *tls.Config

	// Heartbeat interval (default: 10s).
	Heartbeat time.Duration

	// ConnectionTimeout for establishing connection (default: 30s).
	ConnectionTimeout time.Duration

	// ReconnectDelay is the initial delay between reconnection attempts (default: 1s).
	// The delay increases exponentially up to ReconnectDelayMax.
	ReconnectDelay time.Duration

	// ReconnectDelayMax is the maximum delay between reconnection attempts (default: 60s).
	ReconnectDelayMax time.Duration

	// MaxReconnectAttempts is the maximum reconnection attempts (0 = unlimited).
	MaxReconnectAttempts int

	// Logger for connection events. Defaults to a no-op logger.
	Logger Logger
}

Config holds the RabbitMQ connection configuration.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a default RabbitMQ configuration.

func (Config) WithCredentials

func (c Config) WithCredentials(username, password string) Config

WithCredentials returns a new config with the specified credentials.

func (Config) WithHeartbeat

func (c Config) WithHeartbeat(heartbeat time.Duration) Config

WithHeartbeat returns a new config with the specified heartbeat.

func (Config) WithHost

func (c Config) WithHost(host string, port int) Config

WithHost returns a new config with the specified host and port.

func (Config) WithLogger

func (c Config) WithLogger(logger Logger) Config

WithLogger returns a new config with the specified logger.

func (Config) WithReconnect

func (c Config) WithReconnect(initialDelay, maxDelay time.Duration, maxAttempts int) Config

WithReconnect returns a new config with reconnection settings.

func (Config) WithTLS

func (c Config) WithTLS(config *tls.Config) Config

WithTLS returns a new config with TLS enabled.

func (Config) WithURL

func (c Config) WithURL(url string) Config

WithURL returns a new config with the specified URL.

func (Config) WithVHost

func (c Config) WithVHost(vhost string) Config

WithVHost returns a new config with the specified virtual host.

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection manages the RabbitMQ connection with auto-reconnect.

func NewConnection

func NewConnection(config Config) (*Connection, error)

NewConnection creates a new RabbitMQ connection.

func (*Connection) Channel

func (c *Connection) Channel() (*Channel, error)

Channel creates a new channel.

func (*Connection) Close

func (c *Connection) Close() error

Close closes the connection.

func (*Connection) IsClosed

func (c *Connection) IsClosed() bool

IsClosed returns true if the connection is closed.

func (*Connection) IsHealthy

func (c *Connection) IsHealthy() bool

IsHealthy returns true if the connection is open and responsive. It attempts to create and immediately close a channel as a health probe.

func (*Connection) OnConnect

func (c *Connection) OnConnect(fn func())

OnConnect sets the connection callback.

func (*Connection) OnDisconnect

func (c *Connection) OnDisconnect(fn func(error))

OnDisconnect sets the disconnection callback.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer consumes messages from RabbitMQ.

func NewConsumer

func NewConsumer(conn *Connection, config ConsumerConfig) (*Consumer, error)

NewConsumer creates a new consumer.

func (*Consumer) BindExchange

func (c *Consumer) BindExchange(destination, source, routingKey string, args map[string]any) error

BindExchange binds an exchange to another exchange.

func (*Consumer) BindQueue

func (c *Consumer) BindQueue(queue, exchange, routingKey string, args map[string]any) error

BindQueue binds a queue to an exchange.

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the consumer. If GracefulShutdown is enabled (default), it waits for all in-flight message handlers to complete before closing.

func (*Consumer) CloseWithContext

func (c *Consumer) CloseWithContext(ctx context.Context) error

CloseWithContext closes the consumer with a context for controlling the graceful shutdown timeout. If the context is cancelled before handlers complete, the consumer closes immediately.

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context, handler MessageHandler) error

Consume starts consuming and calls handler for each message. The handler is automatically wrapped with any configured middleware. Consumption automatically resumes after connection recovery. If Concurrency > 1, multiple goroutines process messages in parallel.

func (*Consumer) DeclareExchange

func (c *Consumer) DeclareExchange(config ExchangeConfig) error

DeclareExchange declares an exchange.

func (*Consumer) DeclareQueue

func (c *Consumer) DeclareQueue(name string, durable, autoDelete, exclusive bool, args map[string]any) (QueueInfo, error)

DeclareQueue declares a queue.

func (*Consumer) DeclareQueueWithConfig

func (c *Consumer) DeclareQueueWithConfig(config QueueConfig) (QueueInfo, error)

DeclareQueueWithConfig declares a queue with the given configuration.

func (*Consumer) DeleteExchange

func (c *Consumer) DeleteExchange(name string, ifUnused bool) error

DeleteExchange deletes an exchange.

func (*Consumer) DeleteQueue

func (c *Consumer) DeleteQueue(queue string, ifUnused, ifEmpty bool) (int, error)

DeleteQueue deletes a queue.

func (*Consumer) IsClosed

func (c *Consumer) IsClosed() bool

IsClosed returns true if the consumer is closed.

func (*Consumer) PurgeQueue

func (c *Consumer) PurgeQueue(queue string) (int, error)

PurgeQueue removes all messages from a queue.

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) (<-chan *Delivery, error)

Start starts consuming messages and returns a delivery channel. The delivery channel is automatically re-established on reconnection.

func (*Consumer) Stop

func (c *Consumer) Stop()

Stop stops consuming without closing the underlying channel. Call Close to release all resources.

func (*Consumer) UnbindExchange

func (c *Consumer) UnbindExchange(destination, source, routingKey string, args map[string]any) error

UnbindExchange unbinds an exchange from another exchange.

func (*Consumer) UnbindQueue

func (c *Consumer) UnbindQueue(queue, exchange, routingKey string, args map[string]any) error

UnbindQueue unbinds a queue from an exchange.

type ConsumerConfig

type ConsumerConfig struct {
	// Queue is the queue to consume from.
	Queue string

	// ConsumerTag is the consumer identifier.
	ConsumerTag string

	// AutoAck enables automatic message acknowledgment.
	AutoAck bool

	// Exclusive makes this an exclusive consumer.
	Exclusive bool

	// NoLocal prevents consuming messages published on same connection.
	NoLocal bool

	// NoWait doesn't wait for server confirmation.
	NoWait bool

	// Args are additional arguments.
	Args map[string]any

	// PrefetchCount is the number of messages to prefetch.
	PrefetchCount int

	// PrefetchSize is the prefetch size in bytes.
	PrefetchSize int

	// RequeueOnError requeues messages when handler returns error.
	RequeueOnError bool

	// Concurrency is the number of goroutines processing messages (default: 1).
	// Each goroutine calls the handler sequentially. Increase for parallel processing.
	Concurrency int

	// GracefulShutdown waits for in-flight message handlers to complete on Close (default: true).
	GracefulShutdown bool

	// OnError is called when an error occurs.
	OnError ErrorHandler

	// Middleware is applied to the message handler in order.
	Middleware []Middleware
}

ConsumerConfig holds consumer-specific configuration.

func DefaultConsumerConfig

func DefaultConsumerConfig() ConsumerConfig

DefaultConsumerConfig returns a default consumer configuration.

func (ConsumerConfig) WithAutoAck

func (c ConsumerConfig) WithAutoAck(autoAck bool) ConsumerConfig

WithAutoAck returns a new config with auto-ack setting.

func (ConsumerConfig) WithConcurrency

func (c ConsumerConfig) WithConcurrency(n int) ConsumerConfig

WithConcurrency returns a new config with the specified number of handler goroutines.

func (ConsumerConfig) WithConsumerTag

func (c ConsumerConfig) WithConsumerTag(tag string) ConsumerConfig

WithConsumerTag returns a new config with the specified consumer tag.

func (ConsumerConfig) WithErrorHandler

func (c ConsumerConfig) WithErrorHandler(handler ErrorHandler) ConsumerConfig

WithErrorHandler returns a new config with the specified error handler.

func (ConsumerConfig) WithExclusive

func (c ConsumerConfig) WithExclusive(exclusive bool) ConsumerConfig

WithExclusive returns a new config with exclusive setting.

func (ConsumerConfig) WithGracefulShutdown

func (c ConsumerConfig) WithGracefulShutdown(enabled bool) ConsumerConfig

WithGracefulShutdown returns a new config with graceful shutdown setting. When enabled, Close waits for in-flight message handlers to complete.

func (ConsumerConfig) WithMiddleware

func (c ConsumerConfig) WithMiddleware(mw ...Middleware) ConsumerConfig

WithMiddleware returns a new config with the specified middleware.

func (ConsumerConfig) WithPrefetch

func (c ConsumerConfig) WithPrefetch(count, size int) ConsumerConfig

WithPrefetch returns a new config with prefetch settings.

func (ConsumerConfig) WithQueue

func (c ConsumerConfig) WithQueue(queue string) ConsumerConfig

WithQueue returns a new config with the specified queue.

func (ConsumerConfig) WithRequeueOnError

func (c ConsumerConfig) WithRequeueOnError(requeue bool) ConsumerConfig

WithRequeueOnError returns a new config with requeue on error setting.

type Delivery

type Delivery struct {
	*Message

	// Exchange is the exchange the message was published to.
	Exchange string

	// RoutingKey is the routing key.
	RoutingKey string

	// Redelivered indicates if this is a redelivery.
	Redelivered bool

	// DeliveryTag is the delivery tag for acknowledgment.
	DeliveryTag uint64

	// ConsumerTag is the consumer identifier.
	ConsumerTag string
	// contains filtered or unexported fields
}

Delivery represents a received message.

func (*Delivery) Ack

func (d *Delivery) Ack(multiple bool) error

Ack acknowledges the message.

func (*Delivery) Nack

func (d *Delivery) Nack(multiple, requeue bool) error

Nack negatively acknowledges the message.

func (*Delivery) Reject

func (d *Delivery) Reject(requeue bool) error

Reject rejects the message.

type DeliveryMode

type DeliveryMode uint8

DeliveryMode represents the message delivery mode.

const (
	Transient  DeliveryMode = 1
	Persistent DeliveryMode = 2
)

Supported delivery modes.

type ErrorHandler

type ErrorHandler func(err error)

ErrorHandler handles errors.

type ExchangeConfig

type ExchangeConfig struct {
	// Name is the exchange name.
	Name string

	// Type is the exchange type.
	Type ExchangeType

	// Durable makes the exchange survive broker restarts.
	Durable bool

	// AutoDelete deletes the exchange when no bindings.
	AutoDelete bool

	// Internal makes the exchange internal.
	Internal bool

	// Args are additional arguments.
	Args map[string]any
}

ExchangeConfig holds exchange declaration configuration.

func DefaultExchangeConfig

func DefaultExchangeConfig(name string, exchangeType ExchangeType) ExchangeConfig

DefaultExchangeConfig returns a default exchange configuration.

func (ExchangeConfig) WithAutoDelete

func (c ExchangeConfig) WithAutoDelete(autoDelete bool) ExchangeConfig

WithAutoDelete returns a new config with auto-delete setting.

func (ExchangeConfig) WithDurable

func (c ExchangeConfig) WithDurable(durable bool) ExchangeConfig

WithDurable returns a new config with durable setting.

func (ExchangeConfig) WithInternal

func (c ExchangeConfig) WithInternal(internal bool) ExchangeConfig

WithInternal returns a new config with internal setting.

type ExchangeType

type ExchangeType string

ExchangeType represents the type of exchange.

const (
	ExchangeDirect  ExchangeType = "direct"
	ExchangeFanout  ExchangeType = "fanout"
	ExchangeTopic   ExchangeType = "topic"
	ExchangeHeaders ExchangeType = "headers"
)

Supported exchange types.

type Logger

type Logger interface {
	Debugf(format string, args ...any)
	Infof(format string, args ...any)
	Warnf(format string, args ...any)
	Errorf(format string, args ...any)
}

Logger is the interface for logging within rabbitwrap. Implement this interface to integrate with your logging framework.

func NewStdLogger

func NewStdLogger() Logger

NewStdLogger returns a Logger that writes to the standard library logger.

type Message

type Message struct {
	// Body is the message body.
	Body []byte

	// ContentType is the MIME type.
	ContentType string

	// ContentEncoding is the encoding.
	ContentEncoding string

	// DeliveryMode indicates persistence.
	DeliveryMode DeliveryMode

	// Priority is the message priority (0-9).
	Priority uint8

	// CorrelationID is for request-reply patterns.
	CorrelationID string

	// ReplyTo is the reply queue name.
	ReplyTo string

	// Expiration is the message TTL.
	Expiration string

	// MessageID is a unique message identifier.
	MessageID string

	// Timestamp is the message timestamp.
	Timestamp time.Time

	// Type is the message type name.
	Type string

	// UserID is the creating user.
	UserID string

	// AppID is the creating application.
	AppID string

	// Headers are custom headers.
	Headers map[string]any
}

Message represents a RabbitMQ message.

func NewJSONMessage

func NewJSONMessage(v any) (*Message, error)

NewJSONMessage creates a new JSON message.

func NewMessage

func NewMessage(body []byte) *Message

NewMessage creates a new message with the given body.

func NewTextMessage

func NewTextMessage(text string) *Message

NewTextMessage creates a new text message.

func (*Message) JSON

func (m *Message) JSON(v any) error

JSON unmarshals the body into the provided value.

func (*Message) Text

func (m *Message) Text() string

Text returns the body as a string.

func (*Message) WithAppID

func (m *Message) WithAppID(appID string) *Message

WithAppID sets the application ID.

func (*Message) WithContentType

func (m *Message) WithContentType(contentType string) *Message

WithContentType sets the content type.

func (*Message) WithCorrelationID

func (m *Message) WithCorrelationID(id string) *Message

WithCorrelationID sets the correlation ID.

func (*Message) WithDeliveryMode

func (m *Message) WithDeliveryMode(mode DeliveryMode) *Message

WithDeliveryMode sets the delivery mode.

func (*Message) WithExpiration

func (m *Message) WithExpiration(expiration string) *Message

WithExpiration sets the expiration (TTL).

func (*Message) WithHeader

func (m *Message) WithHeader(key string, value any) *Message

WithHeader adds a custom header.

func (*Message) WithHeaders

func (m *Message) WithHeaders(headers map[string]any) *Message

WithHeaders sets multiple headers.

func (*Message) WithMessageID

func (m *Message) WithMessageID(id string) *Message

WithMessageID sets the message ID.

func (*Message) WithPriority

func (m *Message) WithPriority(priority uint8) *Message

WithPriority sets the priority.

func (*Message) WithReplyTo

func (m *Message) WithReplyTo(replyTo string) *Message

WithReplyTo sets the reply-to queue.

func (*Message) WithTTL

func (m *Message) WithTTL(ttl time.Duration) *Message

WithTTL sets the TTL as a duration.

func (*Message) WithType

func (m *Message) WithType(t string) *Message

WithType sets the message type.

type MessageHandler

type MessageHandler func(ctx context.Context, delivery *Delivery) error

MessageHandler handles received messages.

type Middleware

type Middleware func(next MessageHandler) MessageHandler

Middleware wraps a MessageHandler, returning a new MessageHandler. Middleware is applied in the order provided — the first middleware in the slice is the outermost wrapper.

func Chain

func Chain(mw ...Middleware) Middleware

Chain composes multiple middleware into a single Middleware. Middleware is applied left-to-right: Chain(A, B, C)(handler) == A(B(C(handler))).

func LoggingMiddleware

func LoggingMiddleware(logger Logger) Middleware

LoggingMiddleware logs each message processing with its duration.

func RecoveryMiddleware

func RecoveryMiddleware(onPanic func(recovered any)) Middleware

RecoveryMiddleware recovers from panics in the message handler, calls the provided callback with the recovered value, and returns an error so the message is not silently acknowledged as successful.

func RetryMiddleware

func RetryMiddleware(maxRetries int, delay time.Duration) Middleware

RetryMiddleware retries failed message processing up to maxRetries times with the given delay between attempts.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher publishes messages to RabbitMQ.

func NewPublisher

func NewPublisher(conn *Connection, config PublisherConfig) (*Publisher, error)

NewPublisher creates a new publisher.

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the publisher.

func (*Publisher) DeclareExchange

func (p *Publisher) DeclareExchange(name string, kind ExchangeType, durable, autoDelete bool, args map[string]any) error

DeclareExchange declares an exchange.

func (*Publisher) IsClosed

func (p *Publisher) IsClosed() bool

IsClosed returns true if the publisher is closed.

func (*Publisher) NotifyReturn

func (p *Publisher) NotifyReturn(handler func(Return))

NotifyReturn registers a handler for undeliverable messages. This is called when the Mandatory or Immediate flags are set and the broker cannot route or deliver the message.

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, msg *Message) error

Publish publishes a message.

func (*Publisher) PublishDelayed

func (p *Publisher) PublishDelayed(ctx context.Context, msg *Message, delay time.Duration) error

PublishDelayed publishes a message with a delay using TTL and dead letter exchange.

func (*Publisher) PublishJSON

func (p *Publisher) PublishJSON(ctx context.Context, v any) error

PublishJSON publishes a JSON message.

func (*Publisher) PublishText

func (p *Publisher) PublishText(ctx context.Context, text string) error

PublishText publishes a text message.

func (*Publisher) PublishToExchange

func (p *Publisher) PublishToExchange(ctx context.Context, exchange, routingKey string, msg *Message) error

PublishToExchange publishes a message to a specific exchange with routing key.

func (*Publisher) PublishToKeys

func (p *Publisher) PublishToKeys(ctx context.Context, routingKeys []string, msg *Message) error

PublishToKeys publishes a message to multiple routing keys on the configured exchange.

func (*Publisher) PublishWithKey

func (p *Publisher) PublishWithKey(ctx context.Context, routingKey string, msg *Message) error

PublishWithKey publishes a message with a specific routing key.

type PublisherConfig

type PublisherConfig struct {
	// Exchange is the exchange to publish to.
	Exchange string

	// RoutingKey is the default routing key.
	RoutingKey string

	// Mandatory makes the server return unroutable messages.
	Mandatory bool

	// Immediate makes the server return messages when no consumer is available.
	Immediate bool

	// ConfirmMode enables publisher confirms.
	ConfirmMode bool

	// ConfirmTimeout is the timeout for waiting for confirms.
	ConfirmTimeout time.Duration
}

PublisherConfig holds publisher-specific configuration.

func DefaultPublisherConfig

func DefaultPublisherConfig() PublisherConfig

DefaultPublisherConfig returns a default publisher configuration.

func (PublisherConfig) WithConfirmMode

func (c PublisherConfig) WithConfirmMode(enabled bool, timeout time.Duration) PublisherConfig

WithConfirmMode returns a new config with confirm mode settings.

func (PublisherConfig) WithExchange

func (c PublisherConfig) WithExchange(exchange string) PublisherConfig

WithExchange returns a new config with the specified exchange.

func (PublisherConfig) WithImmediate

func (c PublisherConfig) WithImmediate(immediate bool) PublisherConfig

WithImmediate returns a new config with immediate flag set.

func (PublisherConfig) WithMandatory

func (c PublisherConfig) WithMandatory(mandatory bool) PublisherConfig

WithMandatory returns a new config with mandatory flag set.

func (PublisherConfig) WithRoutingKey

func (c PublisherConfig) WithRoutingKey(key string) PublisherConfig

WithRoutingKey returns a new config with the specified routing key.

type QueueConfig

type QueueConfig struct {
	// Name is the queue name.
	Name string

	// Durable makes the queue survive broker restarts.
	Durable bool

	// AutoDelete deletes the queue when no consumers.
	AutoDelete bool

	// Exclusive makes the queue exclusive to this connection.
	Exclusive bool

	// Args are additional arguments.
	Args map[string]any

	// DeadLetterExchange for dead letter routing.
	DeadLetterExchange string

	// DeadLetterRoutingKey for dead letter routing.
	DeadLetterRoutingKey string

	// MessageTTL is the default message TTL.
	MessageTTL time.Duration

	// MaxLength is the maximum number of messages.
	MaxLength int

	// MaxLengthBytes is the maximum queue size in bytes.
	MaxLengthBytes int

	// Quorum enables quorum queue type for high availability.
	Quorum bool
}

QueueConfig holds queue declaration configuration.

func DefaultQueueConfig

func DefaultQueueConfig(name string) QueueConfig

DefaultQueueConfig returns a default queue configuration.

func (QueueConfig) WithAutoDelete

func (c QueueConfig) WithAutoDelete(autoDelete bool) QueueConfig

WithAutoDelete returns a new config with auto-delete setting.

func (QueueConfig) WithDeadLetter

func (c QueueConfig) WithDeadLetter(exchange, routingKey string) QueueConfig

WithDeadLetter returns a new config with dead letter settings.

func (QueueConfig) WithDurable

func (c QueueConfig) WithDurable(durable bool) QueueConfig

WithDurable returns a new config with durable setting.

func (QueueConfig) WithExclusive

func (c QueueConfig) WithExclusive(exclusive bool) QueueConfig

WithExclusive returns a new config with exclusive setting.

func (QueueConfig) WithMaxLength

func (c QueueConfig) WithMaxLength(maxLength int) QueueConfig

WithMaxLength returns a new config with max length.

func (QueueConfig) WithMaxLengthBytes

func (c QueueConfig) WithMaxLengthBytes(maxBytes int) QueueConfig

WithMaxLengthBytes returns a new config with max length in bytes.

func (QueueConfig) WithMessageTTL

func (c QueueConfig) WithMessageTTL(ttl time.Duration) QueueConfig

WithMessageTTL returns a new config with message TTL.

func (QueueConfig) WithQuorum

func (c QueueConfig) WithQuorum() QueueConfig

WithQuorum enables quorum queue type for high availability across a cluster.

type QueueInfo

type QueueInfo struct {
	Name      string
	Messages  int
	Consumers int
}

QueueInfo holds queue information.

type Return

type Return struct {
	amqp.Return
}

Return represents an undeliverable message returned by the broker. This happens when Mandatory or Immediate flags are set and the message cannot be routed or delivered.

Directories

Path Synopsis
examples
basic command
Package main demonstrates basic rabbitwrap usage.
Package main demonstrates basic rabbitwrap usage.
publisher command
Package main demonstrates publishing messages with rabbitwrap.
Package main demonstrates publishing messages with rabbitwrap.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL