Documentation
¶
Overview ¶
Package rabbitmq provides a simplified, production-ready wrapper around the official RabbitMQ Go client (amqp091-go).
It offers connection management with automatic reconnection, publisher confirms for reliable message delivery, consumer management with prefetch and manual acknowledgment, a fluent message builder API, batch publishing, dead letter queue support, and TLS support.
Connection ¶
Create a connection with automatic reconnection using exponential backoff:
config := rabbitmq.DefaultConfig().
WithHost("localhost", 5672).
WithCredentials("guest", "guest").
WithReconnect(1*time.Second, 60*time.Second, 0)
conn, err := rabbitmq.NewConnection(config)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
Publishing ¶
Publish messages with publisher confirms enabled by default:
publisher, err := rabbitmq.NewPublisher(conn,
rabbitmq.DefaultPublisherConfig().
WithExchange("my-exchange").
WithRoutingKey("my-key"),
)
if err != nil {
log.Fatal(err)
}
defer publisher.Close()
// Text message
err = publisher.PublishText(ctx, "Hello, World!")
// JSON message
err = publisher.PublishJSON(ctx, map[string]any{"event": "user.created"})
// Custom message with headers
msg := rabbitmq.NewMessage([]byte("data")).
WithPriority(5).
WithHeader("trace-id", "abc123")
err = publisher.Publish(ctx, msg)
Consuming ¶
Consume messages with automatic ack/nack handling and middleware:
consumer, err := rabbitmq.NewConsumer(conn,
rabbitmq.DefaultConsumerConfig().
WithQueue("my-queue").
WithPrefetch(10, 0).
WithMiddleware(
rabbitmq.LoggingMiddleware(rabbitmq.NewStdLogger()),
rabbitmq.RecoveryMiddleware(func(r any) {
log.Printf("panic: %v", r)
}),
),
)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
err = consumer.Consume(ctx, func(ctx context.Context, d *rabbitmq.Delivery) error {
log.Printf("Received: %s", d.Text())
return nil // return nil to ack, error to nack
})
Middleware ¶
The package includes built-in middleware for common patterns:
- LoggingMiddleware logs message processing with duration
- RecoveryMiddleware recovers from panics in handlers
- RetryMiddleware retries failed message processing with delay
Custom middleware can be created by implementing the Middleware type:
func MyMiddleware() rabbitmq.Middleware {
return func(next rabbitmq.MessageHandler) rabbitmq.MessageHandler {
return func(ctx context.Context, d *rabbitmq.Delivery) error {
// before
err := next(ctx, d)
// after
return err
}
}
}
Package rabbitmq provides a simplified interface for RabbitMQ messaging with support for publishers, consumers, exchanges, and queues.
Index ¶
- Variables
- type BatchPublisher
- func (b *BatchPublisher) Add(msg *Message) *BatchPublisher
- func (b *BatchPublisher) AddToExchange(exchange, routingKey string, msg *Message) *BatchPublisher
- func (b *BatchPublisher) AddWithKey(routingKey string, msg *Message) *BatchPublisher
- func (b *BatchPublisher) Clear()
- func (b *BatchPublisher) Publish(ctx context.Context) error
- func (b *BatchPublisher) PublishAndClear(ctx context.Context) error
- func (b *BatchPublisher) Size() int
- type Channel
- type Config
- func (c Config) WithCredentials(username, password string) Config
- func (c Config) WithHeartbeat(heartbeat time.Duration) Config
- func (c Config) WithHost(host string, port int) Config
- func (c Config) WithLogger(logger Logger) Config
- func (c Config) WithReconnect(initialDelay, maxDelay time.Duration, maxAttempts int) Config
- func (c Config) WithTLS(config *tls.Config) Config
- func (c Config) WithURL(url string) Config
- func (c Config) WithVHost(vhost string) Config
- type Connection
- type Consumer
- func (c *Consumer) BindExchange(destination, source, routingKey string, args map[string]any) error
- func (c *Consumer) BindQueue(queue, exchange, routingKey string, args map[string]any) error
- func (c *Consumer) Close() error
- func (c *Consumer) CloseWithContext(ctx context.Context) error
- func (c *Consumer) Consume(ctx context.Context, handler MessageHandler) error
- func (c *Consumer) DeclareExchange(config ExchangeConfig) error
- func (c *Consumer) DeclareQueue(name string, durable, autoDelete, exclusive bool, args map[string]any) (QueueInfo, error)
- func (c *Consumer) DeclareQueueWithConfig(config QueueConfig) (QueueInfo, error)
- func (c *Consumer) DeleteExchange(name string, ifUnused bool) error
- func (c *Consumer) DeleteQueue(queue string, ifUnused, ifEmpty bool) (int, error)
- func (c *Consumer) IsClosed() bool
- func (c *Consumer) PurgeQueue(queue string) (int, error)
- func (c *Consumer) Start(ctx context.Context) (<-chan *Delivery, error)
- func (c *Consumer) Stop()
- func (c *Consumer) UnbindExchange(destination, source, routingKey string, args map[string]any) error
- func (c *Consumer) UnbindQueue(queue, exchange, routingKey string, args map[string]any) error
- type ConsumerConfig
- func (c ConsumerConfig) WithAutoAck(autoAck bool) ConsumerConfig
- func (c ConsumerConfig) WithConcurrency(n int) ConsumerConfig
- func (c ConsumerConfig) WithConsumerTag(tag string) ConsumerConfig
- func (c ConsumerConfig) WithErrorHandler(handler ErrorHandler) ConsumerConfig
- func (c ConsumerConfig) WithExclusive(exclusive bool) ConsumerConfig
- func (c ConsumerConfig) WithGracefulShutdown(enabled bool) ConsumerConfig
- func (c ConsumerConfig) WithMiddleware(mw ...Middleware) ConsumerConfig
- func (c ConsumerConfig) WithPrefetch(count, size int) ConsumerConfig
- func (c ConsumerConfig) WithQueue(queue string) ConsumerConfig
- func (c ConsumerConfig) WithRequeueOnError(requeue bool) ConsumerConfig
- type Delivery
- type DeliveryMode
- type ErrorHandler
- type ExchangeConfig
- type ExchangeType
- type Logger
- type Message
- func (m *Message) JSON(v any) error
- func (m *Message) Text() string
- func (m *Message) WithAppID(appID string) *Message
- func (m *Message) WithContentType(contentType string) *Message
- func (m *Message) WithCorrelationID(id string) *Message
- func (m *Message) WithDeliveryMode(mode DeliveryMode) *Message
- func (m *Message) WithExpiration(expiration string) *Message
- func (m *Message) WithHeader(key string, value any) *Message
- func (m *Message) WithHeaders(headers map[string]any) *Message
- func (m *Message) WithMessageID(id string) *Message
- func (m *Message) WithPriority(priority uint8) *Message
- func (m *Message) WithReplyTo(replyTo string) *Message
- func (m *Message) WithTTL(ttl time.Duration) *Message
- func (m *Message) WithType(t string) *Message
- type MessageHandler
- type Middleware
- type Publisher
- func (p *Publisher) Close() error
- func (p *Publisher) DeclareExchange(name string, kind ExchangeType, durable, autoDelete bool, args map[string]any) error
- func (p *Publisher) IsClosed() bool
- func (p *Publisher) NotifyReturn(handler func(Return))
- func (p *Publisher) Publish(ctx context.Context, msg *Message) error
- func (p *Publisher) PublishDelayed(ctx context.Context, msg *Message, delay time.Duration) error
- func (p *Publisher) PublishJSON(ctx context.Context, v any) error
- func (p *Publisher) PublishText(ctx context.Context, text string) error
- func (p *Publisher) PublishToExchange(ctx context.Context, exchange, routingKey string, msg *Message) error
- func (p *Publisher) PublishToKeys(ctx context.Context, routingKeys []string, msg *Message) error
- func (p *Publisher) PublishWithKey(ctx context.Context, routingKey string, msg *Message) error
- type PublisherConfig
- func (c PublisherConfig) WithConfirmMode(enabled bool, timeout time.Duration) PublisherConfig
- func (c PublisherConfig) WithExchange(exchange string) PublisherConfig
- func (c PublisherConfig) WithImmediate(immediate bool) PublisherConfig
- func (c PublisherConfig) WithMandatory(mandatory bool) PublisherConfig
- func (c PublisherConfig) WithRoutingKey(key string) PublisherConfig
- type QueueConfig
- func (c QueueConfig) WithAutoDelete(autoDelete bool) QueueConfig
- func (c QueueConfig) WithDeadLetter(exchange, routingKey string) QueueConfig
- func (c QueueConfig) WithDurable(durable bool) QueueConfig
- func (c QueueConfig) WithExclusive(exclusive bool) QueueConfig
- func (c QueueConfig) WithMaxLength(maxLength int) QueueConfig
- func (c QueueConfig) WithMaxLengthBytes(maxBytes int) QueueConfig
- func (c QueueConfig) WithMessageTTL(ttl time.Duration) QueueConfig
- func (c QueueConfig) WithQuorum() QueueConfig
- type QueueInfo
- type Return
Constants ¶
This section is empty.
Variables ¶
var ( ErrConnectionClosed = errors.New("rabbitmq: connection closed") ErrChannelClosed = errors.New("rabbitmq: channel closed") ErrPublishFailed = errors.New("rabbitmq: publish failed") ErrConsumeFailed = errors.New("rabbitmq: consume failed") ErrInvalidConfig = errors.New("rabbitmq: invalid configuration") ErrNotConnected = errors.New("rabbitmq: not connected") ErrTimeout = errors.New("rabbitmq: operation timeout") ErrNack = errors.New("rabbitmq: message was nacked") ErrMaxReconnects = errors.New("rabbitmq: max reconnection attempts reached") ErrShuttingDown = errors.New("rabbitmq: shutting down") )
Sentinel errors for RabbitMQ operations.
Functions ¶
This section is empty.
Types ¶
type BatchPublisher ¶
type BatchPublisher struct {
// contains filtered or unexported fields
}
BatchPublisher enables batch publishing.
func NewBatchPublisher ¶
func NewBatchPublisher(publisher *Publisher) *BatchPublisher
NewBatchPublisher creates a new batch publisher.
func (*BatchPublisher) Add ¶
func (b *BatchPublisher) Add(msg *Message) *BatchPublisher
Add adds a message to the batch.
func (*BatchPublisher) AddToExchange ¶
func (b *BatchPublisher) AddToExchange(exchange, routingKey string, msg *Message) *BatchPublisher
AddToExchange adds a message to a specific exchange.
func (*BatchPublisher) AddWithKey ¶
func (b *BatchPublisher) AddWithKey(routingKey string, msg *Message) *BatchPublisher
AddWithKey adds a message with a specific routing key.
func (*BatchPublisher) Publish ¶
func (b *BatchPublisher) Publish(ctx context.Context) error
Publish publishes all messages in the batch.
func (*BatchPublisher) PublishAndClear ¶
func (b *BatchPublisher) PublishAndClear(ctx context.Context) error
PublishAndClear publishes all messages and clears the batch.
func (*BatchPublisher) Size ¶
func (b *BatchPublisher) Size() int
Size returns the number of messages in the batch.
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel wraps an AMQP channel.
type Config ¶
type Config struct {
// URL is the AMQP connection URL.
URL string
// Host is the RabbitMQ host (used if URL is empty).
Host string
// Port is the RabbitMQ port (default: 5672).
Port int
// Username for authentication (default: "guest").
Username string
// Password for authentication (default: "guest").
Password string
// VHost is the virtual host (default: "/").
VHost string
// TLS configuration for secure connections.
TLS *tls.Config
// Heartbeat interval (default: 10s).
Heartbeat time.Duration
// ConnectionTimeout for establishing connection (default: 30s).
ConnectionTimeout time.Duration
// ReconnectDelay is the initial delay between reconnection attempts (default: 1s).
// The delay increases exponentially up to ReconnectDelayMax.
ReconnectDelay time.Duration
// ReconnectDelayMax is the maximum delay between reconnection attempts (default: 60s).
ReconnectDelayMax time.Duration
// MaxReconnectAttempts is the maximum reconnection attempts (0 = unlimited).
MaxReconnectAttempts int
// Logger for connection events. Defaults to a no-op logger.
Logger Logger
}
Config holds the RabbitMQ connection configuration.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a default RabbitMQ configuration.
func (Config) WithCredentials ¶
WithCredentials returns a new config with the specified credentials.
func (Config) WithHeartbeat ¶
WithHeartbeat returns a new config with the specified heartbeat.
func (Config) WithLogger ¶
WithLogger returns a new config with the specified logger.
func (Config) WithReconnect ¶
WithReconnect returns a new config with reconnection settings.
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection manages the RabbitMQ connection with auto-reconnect.
func NewConnection ¶
func NewConnection(config Config) (*Connection, error)
NewConnection creates a new RabbitMQ connection.
func (*Connection) Channel ¶
func (c *Connection) Channel() (*Channel, error)
Channel creates a new channel.
func (*Connection) IsClosed ¶
func (c *Connection) IsClosed() bool
IsClosed returns true if the connection is closed.
func (*Connection) IsHealthy ¶
func (c *Connection) IsHealthy() bool
IsHealthy returns true if the connection is open and responsive. It attempts to create and immediately close a channel as a health probe.
func (*Connection) OnConnect ¶
func (c *Connection) OnConnect(fn func())
OnConnect sets the connection callback.
func (*Connection) OnDisconnect ¶
func (c *Connection) OnDisconnect(fn func(error))
OnDisconnect sets the disconnection callback.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer consumes messages from RabbitMQ.
func NewConsumer ¶
func NewConsumer(conn *Connection, config ConsumerConfig) (*Consumer, error)
NewConsumer creates a new consumer.
func (*Consumer) BindExchange ¶
BindExchange binds an exchange to another exchange.
func (*Consumer) Close ¶
Close closes the consumer. If GracefulShutdown is enabled (default), it waits for all in-flight message handlers to complete before closing.
func (*Consumer) CloseWithContext ¶
CloseWithContext closes the consumer with a context for controlling the graceful shutdown timeout. If the context is cancelled before handlers complete, the consumer closes immediately.
func (*Consumer) Consume ¶
func (c *Consumer) Consume(ctx context.Context, handler MessageHandler) error
Consume starts consuming and calls handler for each message. The handler is automatically wrapped with any configured middleware. Consumption automatically resumes after connection recovery. If Concurrency > 1, multiple goroutines process messages in parallel.
func (*Consumer) DeclareExchange ¶
func (c *Consumer) DeclareExchange(config ExchangeConfig) error
DeclareExchange declares an exchange.
func (*Consumer) DeclareQueue ¶
func (c *Consumer) DeclareQueue(name string, durable, autoDelete, exclusive bool, args map[string]any) (QueueInfo, error)
DeclareQueue declares a queue.
func (*Consumer) DeclareQueueWithConfig ¶
func (c *Consumer) DeclareQueueWithConfig(config QueueConfig) (QueueInfo, error)
DeclareQueueWithConfig declares a queue with the given configuration.
func (*Consumer) DeleteExchange ¶
DeleteExchange deletes an exchange.
func (*Consumer) DeleteQueue ¶
DeleteQueue deletes a queue.
func (*Consumer) PurgeQueue ¶
PurgeQueue removes all messages from a queue.
func (*Consumer) Start ¶
Start starts consuming messages and returns a delivery channel. The delivery channel is automatically re-established on reconnection.
func (*Consumer) Stop ¶
func (c *Consumer) Stop()
Stop stops consuming without closing the underlying channel. Call Close to release all resources.
type ConsumerConfig ¶
type ConsumerConfig struct {
// Queue is the queue to consume from.
Queue string
// ConsumerTag is the consumer identifier.
ConsumerTag string
// AutoAck enables automatic message acknowledgment.
AutoAck bool
// Exclusive makes this an exclusive consumer.
Exclusive bool
// NoLocal prevents consuming messages published on same connection.
NoLocal bool
// NoWait doesn't wait for server confirmation.
NoWait bool
// Args are additional arguments.
Args map[string]any
// PrefetchCount is the number of messages to prefetch.
PrefetchCount int
// PrefetchSize is the prefetch size in bytes.
PrefetchSize int
// RequeueOnError requeues messages when handler returns error.
RequeueOnError bool
// Concurrency is the number of goroutines processing messages (default: 1).
// Each goroutine calls the handler sequentially. Increase for parallel processing.
Concurrency int
// GracefulShutdown waits for in-flight message handlers to complete on Close (default: true).
GracefulShutdown bool
// OnError is called when an error occurs.
OnError ErrorHandler
// Middleware is applied to the message handler in order.
Middleware []Middleware
}
ConsumerConfig holds consumer-specific configuration.
func DefaultConsumerConfig ¶
func DefaultConsumerConfig() ConsumerConfig
DefaultConsumerConfig returns a default consumer configuration.
func (ConsumerConfig) WithAutoAck ¶
func (c ConsumerConfig) WithAutoAck(autoAck bool) ConsumerConfig
WithAutoAck returns a new config with auto-ack setting.
func (ConsumerConfig) WithConcurrency ¶
func (c ConsumerConfig) WithConcurrency(n int) ConsumerConfig
WithConcurrency returns a new config with the specified number of handler goroutines.
func (ConsumerConfig) WithConsumerTag ¶
func (c ConsumerConfig) WithConsumerTag(tag string) ConsumerConfig
WithConsumerTag returns a new config with the specified consumer tag.
func (ConsumerConfig) WithErrorHandler ¶
func (c ConsumerConfig) WithErrorHandler(handler ErrorHandler) ConsumerConfig
WithErrorHandler returns a new config with the specified error handler.
func (ConsumerConfig) WithExclusive ¶
func (c ConsumerConfig) WithExclusive(exclusive bool) ConsumerConfig
WithExclusive returns a new config with exclusive setting.
func (ConsumerConfig) WithGracefulShutdown ¶
func (c ConsumerConfig) WithGracefulShutdown(enabled bool) ConsumerConfig
WithGracefulShutdown returns a new config with graceful shutdown setting. When enabled, Close waits for in-flight message handlers to complete.
func (ConsumerConfig) WithMiddleware ¶
func (c ConsumerConfig) WithMiddleware(mw ...Middleware) ConsumerConfig
WithMiddleware returns a new config with the specified middleware.
func (ConsumerConfig) WithPrefetch ¶
func (c ConsumerConfig) WithPrefetch(count, size int) ConsumerConfig
WithPrefetch returns a new config with prefetch settings.
func (ConsumerConfig) WithQueue ¶
func (c ConsumerConfig) WithQueue(queue string) ConsumerConfig
WithQueue returns a new config with the specified queue.
func (ConsumerConfig) WithRequeueOnError ¶
func (c ConsumerConfig) WithRequeueOnError(requeue bool) ConsumerConfig
WithRequeueOnError returns a new config with requeue on error setting.
type Delivery ¶
type Delivery struct {
*Message
// Exchange is the exchange the message was published to.
Exchange string
// RoutingKey is the routing key.
RoutingKey string
// Redelivered indicates if this is a redelivery.
Redelivered bool
// DeliveryTag is the delivery tag for acknowledgment.
DeliveryTag uint64
// ConsumerTag is the consumer identifier.
ConsumerTag string
// contains filtered or unexported fields
}
Delivery represents a received message.
type DeliveryMode ¶
type DeliveryMode uint8
DeliveryMode represents the message delivery mode.
const ( Transient DeliveryMode = 1 Persistent DeliveryMode = 2 )
Supported delivery modes.
type ExchangeConfig ¶
type ExchangeConfig struct {
// Name is the exchange name.
Name string
// Type is the exchange type.
Type ExchangeType
// Durable makes the exchange survive broker restarts.
Durable bool
// AutoDelete deletes the exchange when no bindings.
AutoDelete bool
// Internal makes the exchange internal.
Internal bool
// Args are additional arguments.
Args map[string]any
}
ExchangeConfig holds exchange declaration configuration.
func DefaultExchangeConfig ¶
func DefaultExchangeConfig(name string, exchangeType ExchangeType) ExchangeConfig
DefaultExchangeConfig returns a default exchange configuration.
func (ExchangeConfig) WithAutoDelete ¶
func (c ExchangeConfig) WithAutoDelete(autoDelete bool) ExchangeConfig
WithAutoDelete returns a new config with auto-delete setting.
func (ExchangeConfig) WithDurable ¶
func (c ExchangeConfig) WithDurable(durable bool) ExchangeConfig
WithDurable returns a new config with durable setting.
func (ExchangeConfig) WithInternal ¶
func (c ExchangeConfig) WithInternal(internal bool) ExchangeConfig
WithInternal returns a new config with internal setting.
type ExchangeType ¶
type ExchangeType string
ExchangeType represents the type of exchange.
const ( ExchangeDirect ExchangeType = "direct" ExchangeFanout ExchangeType = "fanout" ExchangeTopic ExchangeType = "topic" ExchangeHeaders ExchangeType = "headers" )
Supported exchange types.
type Logger ¶
type Logger interface {
Debugf(format string, args ...any)
Infof(format string, args ...any)
Warnf(format string, args ...any)
Errorf(format string, args ...any)
}
Logger is the interface for logging within rabbitwrap. Implement this interface to integrate with your logging framework.
func NewStdLogger ¶
func NewStdLogger() Logger
NewStdLogger returns a Logger that writes to the standard library logger.
type Message ¶
type Message struct {
// Body is the message body.
Body []byte
// ContentType is the MIME type.
ContentType string
// ContentEncoding is the encoding.
ContentEncoding string
// DeliveryMode indicates persistence.
DeliveryMode DeliveryMode
// Priority is the message priority (0-9).
Priority uint8
// CorrelationID is for request-reply patterns.
CorrelationID string
// ReplyTo is the reply queue name.
ReplyTo string
// Expiration is the message TTL.
Expiration string
// MessageID is a unique message identifier.
MessageID string
// Timestamp is the message timestamp.
Timestamp time.Time
// Type is the message type name.
Type string
// UserID is the creating user.
UserID string
// AppID is the creating application.
AppID string
// Headers are custom headers.
Headers map[string]any
}
Message represents a RabbitMQ message.
func NewJSONMessage ¶
NewJSONMessage creates a new JSON message.
func NewMessage ¶
NewMessage creates a new message with the given body.
func NewTextMessage ¶
NewTextMessage creates a new text message.
func (*Message) WithContentType ¶
WithContentType sets the content type.
func (*Message) WithCorrelationID ¶
WithCorrelationID sets the correlation ID.
func (*Message) WithDeliveryMode ¶
func (m *Message) WithDeliveryMode(mode DeliveryMode) *Message
WithDeliveryMode sets the delivery mode.
func (*Message) WithExpiration ¶
WithExpiration sets the expiration (TTL).
func (*Message) WithHeader ¶
WithHeader adds a custom header.
func (*Message) WithHeaders ¶
WithHeaders sets multiple headers.
func (*Message) WithMessageID ¶
WithMessageID sets the message ID.
func (*Message) WithPriority ¶
WithPriority sets the priority.
func (*Message) WithReplyTo ¶
WithReplyTo sets the reply-to queue.
type MessageHandler ¶
MessageHandler handles received messages.
type Middleware ¶
type Middleware func(next MessageHandler) MessageHandler
Middleware wraps a MessageHandler, returning a new MessageHandler. Middleware is applied in the order provided — the first middleware in the slice is the outermost wrapper.
func Chain ¶
func Chain(mw ...Middleware) Middleware
Chain composes multiple middleware into a single Middleware. Middleware is applied left-to-right: Chain(A, B, C)(handler) == A(B(C(handler))).
func LoggingMiddleware ¶
func LoggingMiddleware(logger Logger) Middleware
LoggingMiddleware logs each message processing with its duration.
func RecoveryMiddleware ¶
func RecoveryMiddleware(onPanic func(recovered any)) Middleware
RecoveryMiddleware recovers from panics in the message handler, calls the provided callback with the recovered value, and returns an error so the message is not silently acknowledged as successful.
func RetryMiddleware ¶
func RetryMiddleware(maxRetries int, delay time.Duration) Middleware
RetryMiddleware retries failed message processing up to maxRetries times with the given delay between attempts.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher publishes messages to RabbitMQ.
func NewPublisher ¶
func NewPublisher(conn *Connection, config PublisherConfig) (*Publisher, error)
NewPublisher creates a new publisher.
func (*Publisher) DeclareExchange ¶
func (p *Publisher) DeclareExchange(name string, kind ExchangeType, durable, autoDelete bool, args map[string]any) error
DeclareExchange declares an exchange.
func (*Publisher) NotifyReturn ¶
NotifyReturn registers a handler for undeliverable messages. This is called when the Mandatory or Immediate flags are set and the broker cannot route or deliver the message.
func (*Publisher) PublishDelayed ¶
PublishDelayed publishes a message with a delay using TTL and dead letter exchange.
func (*Publisher) PublishJSON ¶
PublishJSON publishes a JSON message.
func (*Publisher) PublishText ¶
PublishText publishes a text message.
func (*Publisher) PublishToExchange ¶
func (p *Publisher) PublishToExchange(ctx context.Context, exchange, routingKey string, msg *Message) error
PublishToExchange publishes a message to a specific exchange with routing key.
func (*Publisher) PublishToKeys ¶
PublishToKeys publishes a message to multiple routing keys on the configured exchange.
type PublisherConfig ¶
type PublisherConfig struct {
// Exchange is the exchange to publish to.
Exchange string
// RoutingKey is the default routing key.
RoutingKey string
// Mandatory makes the server return unroutable messages.
Mandatory bool
// Immediate makes the server return messages when no consumer is available.
Immediate bool
// ConfirmMode enables publisher confirms.
ConfirmMode bool
// ConfirmTimeout is the timeout for waiting for confirms.
ConfirmTimeout time.Duration
}
PublisherConfig holds publisher-specific configuration.
func DefaultPublisherConfig ¶
func DefaultPublisherConfig() PublisherConfig
DefaultPublisherConfig returns a default publisher configuration.
func (PublisherConfig) WithConfirmMode ¶
func (c PublisherConfig) WithConfirmMode(enabled bool, timeout time.Duration) PublisherConfig
WithConfirmMode returns a new config with confirm mode settings.
func (PublisherConfig) WithExchange ¶
func (c PublisherConfig) WithExchange(exchange string) PublisherConfig
WithExchange returns a new config with the specified exchange.
func (PublisherConfig) WithImmediate ¶
func (c PublisherConfig) WithImmediate(immediate bool) PublisherConfig
WithImmediate returns a new config with immediate flag set.
func (PublisherConfig) WithMandatory ¶
func (c PublisherConfig) WithMandatory(mandatory bool) PublisherConfig
WithMandatory returns a new config with mandatory flag set.
func (PublisherConfig) WithRoutingKey ¶
func (c PublisherConfig) WithRoutingKey(key string) PublisherConfig
WithRoutingKey returns a new config with the specified routing key.
type QueueConfig ¶
type QueueConfig struct {
// Name is the queue name.
Name string
// Durable makes the queue survive broker restarts.
Durable bool
// AutoDelete deletes the queue when no consumers.
AutoDelete bool
// Exclusive makes the queue exclusive to this connection.
Exclusive bool
// Args are additional arguments.
Args map[string]any
// DeadLetterExchange for dead letter routing.
DeadLetterExchange string
// DeadLetterRoutingKey for dead letter routing.
DeadLetterRoutingKey string
// MessageTTL is the default message TTL.
MessageTTL time.Duration
// MaxLength is the maximum number of messages.
MaxLength int
// MaxLengthBytes is the maximum queue size in bytes.
MaxLengthBytes int
// Quorum enables quorum queue type for high availability.
Quorum bool
}
QueueConfig holds queue declaration configuration.
func DefaultQueueConfig ¶
func DefaultQueueConfig(name string) QueueConfig
DefaultQueueConfig returns a default queue configuration.
func (QueueConfig) WithAutoDelete ¶
func (c QueueConfig) WithAutoDelete(autoDelete bool) QueueConfig
WithAutoDelete returns a new config with auto-delete setting.
func (QueueConfig) WithDeadLetter ¶
func (c QueueConfig) WithDeadLetter(exchange, routingKey string) QueueConfig
WithDeadLetter returns a new config with dead letter settings.
func (QueueConfig) WithDurable ¶
func (c QueueConfig) WithDurable(durable bool) QueueConfig
WithDurable returns a new config with durable setting.
func (QueueConfig) WithExclusive ¶
func (c QueueConfig) WithExclusive(exclusive bool) QueueConfig
WithExclusive returns a new config with exclusive setting.
func (QueueConfig) WithMaxLength ¶
func (c QueueConfig) WithMaxLength(maxLength int) QueueConfig
WithMaxLength returns a new config with max length.
func (QueueConfig) WithMaxLengthBytes ¶
func (c QueueConfig) WithMaxLengthBytes(maxBytes int) QueueConfig
WithMaxLengthBytes returns a new config with max length in bytes.
func (QueueConfig) WithMessageTTL ¶
func (c QueueConfig) WithMessageTTL(ttl time.Duration) QueueConfig
WithMessageTTL returns a new config with message TTL.
func (QueueConfig) WithQuorum ¶
func (c QueueConfig) WithQuorum() QueueConfig
WithQuorum enables quorum queue type for high availability across a cluster.