rabbitmq

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 25, 2025 License: MIT Imports: 13 Imported by: 0

README

Go RabbitMQ

Home  /

 

A modern, production-ready Go package for RabbitMQ operations with environment-first configuration, ULID message IDs, and comprehensive production features.

 

Go Reference Go Tests Go Report Card GitHub Tag License

 

Table of Contents

🔝 back to top

 

Key Features

  • Environment-First: Configure via environment variables for cloud-native deployments
  • ULID Message IDs: 6x faster generation, database-optimized, lexicographically sortable
  • Auto-Reconnection: Intelligent retry with configurable backoff
  • Production-Ready: Graceful shutdown, timeouts, dead letter queues, HA support
  • High Performance: Zero-allocation logging, optimized for throughput
  • Fully Tested: Comprehensive test coverage with CI/CD pipeline

🔝 back to top

 

Quick Start

🔝 back to top

 

Installation
go get github.com/cloudresty/go-rabbitmq

🔝 back to top

 

Basic Usage
package main

import (
    "context"
    "github.com/cloudresty/go-rabbitmq"
)

func main() {
    // Publisher - uses RABBITMQ_* environment variables
    publisher, err := rabbitmq.NewPublisher()
    if err != nil {
        panic(err)
    }
    defer publisher.Close()

    // Publish a message with auto-generated ULID
    err = publisher.Publish(context.Background(), rabbitmq.PublishConfig{
        Exchange:   "events",
        RoutingKey: "user.created",
        Message:    []byte(`{"user_id": "123", "name": "John"}`),
    })

    // Consumer - uses RABBITMQ_* environment variables
    consumer, err := rabbitmq.NewConsumer()
    if err != nil {
        panic(err)
    }
    defer consumer.Close()

    // Consume messages
    err = consumer.Consume(context.Background(), rabbitmq.ConsumeConfig{
        Queue: "user-events",
        Handler: func(ctx context.Context, message []byte) error {
            // Process message
            return nil
        },
    })
}

🔝 back to top

 

Environment Configuration

Set environment variables for your deployment:

export RABBITMQ_HOST=localhost
export RABBITMQ_USERNAME=guest
export RABBITMQ_PASSWORD=guest
export RABBITMQ_CONNECTION_NAME=my-service

🔝 back to top

 

Documentation

Document Description
API Reference Complete function reference and usage patterns
Environment Configuration Environment variables and deployment configurations
Production Features Auto-reconnection, graceful shutdown, HA queues, dead letters
ULID Message IDs High-performance, database-optimized message identifiers
Examples Comprehensive examples and usage patterns

🔝 back to top

 

Why This Package?

This package is designed for modern cloud-native applications that require robust, high-performance messaging solutions. It leverages the power of RabbitMQ while providing a developer-friendly API that integrates seamlessly with environment-based configurations.

🔝 back to top

 

Environment-First Design

Perfect for modern cloud deployments with Docker, Kubernetes, and CI/CD pipelines. No more hardcoded connection strings.

🔝 back to top

 

ULID Message IDs

Get 6x faster message ID generation with better database performance compared to UUIDs. Natural time-ordering and collision resistance.

🔝 back to top

 

Production-Ready

Built-in support for high availability, graceful shutdown, automatic reconnection, and comprehensive timeout controls.

🔝 back to top

 

Performance Optimized

Zero-allocation logging, efficient ULID generation, and optimized for high-throughput scenarios.

🔝 back to top

 

Production Usage

// Use custom environment prefix for multi-service deployments
publisher, err := rabbitmq.NewPublisherWithPrefix("PAYMENTS_")

// Declare production-ready queues with dead letter support
err = publisher.DeclareQuorumQueue("payments")

// Graceful shutdown with signal handling
shutdownManager := rabbitmq.NewShutdownManager(config)
shutdownManager.SetupSignalHandler()
shutdownManager.Register(publisher, consumer)
shutdownManager.Wait() // Blocks until SIGINT/SIGTERM

🔝 back to top

 

Requirements

  • Go 1.24+ (recommended)
  • RabbitMQ 4.0+ (recommended)

🔝 back to top

 

Contributing

We welcome contributions! Please see our Contributing Guidelines for details.

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for your changes
  4. Ensure all tests pass
  5. Submit a pull request

🔝 back to top

 

Security

If you discover a security vulnerability, please report it via email to security@cloudresty.com.

🔝 back to top

 

License

This project is licensed under the MIT License - see the LICENSE.txt file for details.

🔝 back to top

 


 

An open source project brought to you by the Cloudresty team.

Website  |  LinkedIn  |  BlueSky  |  GitHub

 

Documentation

Index

Constants

View Source
const (
	ContentTypeJSON = "application/json"
	ContentTypeText = "text/plain"
)

ContentType constants

Variables

This section is empty.

Functions

func DeclareStandardTopology

func DeclareStandardTopology(conn *Connection, exchangeName, queueName string) error

DeclareStandardTopology sets up a standard topology with work queues

func SetupTopology

func SetupTopology(conn *Connection, exchanges []ExchangeConfig, queues []QueueConfig, bindings []BindingConfig) error

SetupTopology creates exchanges, queues, and bindings

Types

type BindingConfig

type BindingConfig struct {
	QueueName    string
	ExchangeName string
	RoutingKey   string
	NoWait       bool
	Arguments    map[string]any
}

BindingConfig holds configuration for binding a queue to an exchange

type Closable

type Closable interface {
	Close() error
}

Closable interface for components that can be gracefully closed

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection represents a RabbitMQ connection with retry logic

func NewConnection

func NewConnection(config ConnectionConfig) (*Connection, error)

NewConnection creates a new RabbitMQ connection

func (*Connection) Channel

func (c *Connection) Channel() *amqp.Channel

Channel returns the AMQP channel

func (*Connection) Close

func (c *Connection) Close() error

Close closes the connection and channel

func (*Connection) IsConnected

func (c *Connection) IsConnected() bool

IsConnected checks if the connection is still active

func (*Connection) IsReconnecting

func (c *Connection) IsReconnecting() bool

IsReconnecting returns true if the connection is currently attempting to reconnect

func (*Connection) NotifyClose

func (c *Connection) NotifyClose() <-chan *amqp.Error

NotifyClose returns a channel that will receive close notifications

type ConnectionConfig

type ConnectionConfig struct {
	URL                  string
	RetryAttempts        int
	RetryDelay           time.Duration
	Heartbeat            time.Duration
	ConnectionName       string
	AutoReconnect        bool          // Enable automatic reconnection
	ReconnectDelay       time.Duration // Delay between reconnection attempts
	MaxReconnectAttempts int           // Max reconnection attempts (0 = unlimited)
	DialTimeout          time.Duration // Timeout for establishing connection
	ChannelTimeout       time.Duration // Timeout for channel operations
}

ConnectionConfig holds configuration for RabbitMQ connection

func DefaultConnectionConfig

func DefaultConnectionConfig(url string) ConnectionConfig

DefaultConnectionConfig returns a default connection configuration

type ConsumeConfig

type ConsumeConfig struct {
	Queue     string
	Consumer  string
	Handler   MessageHandler
	AutoAck   *bool // Optional override for auto-acknowledgment
	Exclusive *bool // Optional override for exclusive consumption
	Arguments map[string]interface{}
}

ConsumeConfig holds configuration for a single consume operation

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer handles consuming messages from RabbitMQ

func NewConsumer

func NewConsumer() (*Consumer, error)

NewConsumer creates a new consumer using environment configuration Set RABBITMQ_* environment variables or defaults will be used

func NewConsumerWithConfig

func NewConsumerWithConfig(config ConsumerConfig) (*Consumer, error)

NewConsumerWithConfig creates a new consumer with custom configuration

func NewConsumerWithPrefix added in v1.1.0

func NewConsumerWithPrefix(prefix string) (*Consumer, error)

NewConsumerWithPrefix creates a new consumer using environment configuration with custom prefix Example: NewConsumerWithPrefix("MYAPP_") looks for MYAPP_RABBITMQ_HOST, etc.

func (*Consumer) BindQueue

func (c *Consumer) BindQueue(queueName, routingKey, exchangeName string, noWait bool, args map[string]any) error

BindQueue binds a queue to an exchange

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the consumer connection with graceful shutdown timeout

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context, config ConsumeConfig) error

Consume starts consuming messages from a queue

func (*Consumer) ConsumeWithDeliveryHandler

func (c *Consumer) ConsumeWithDeliveryHandler(ctx context.Context, config ConsumeConfig, handler DeliveryHandler) error

ConsumeWithDeliveryHandler starts consuming with a raw delivery handler

func (*Consumer) DeclareHAQueue

func (c *Consumer) DeclareHAQueue(name string) (amqp.Queue, error)

DeclareHAQueue declares a production-ready HA classic queue

func (*Consumer) DeclareQueue

func (c *Consumer) DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool, args map[string]any) (amqp.Queue, error)

DeclareQueue declares a queue with the given parameters

func (*Consumer) DeclareQueueWithConfig

func (c *Consumer) DeclareQueueWithConfig(config QueueConfig) (amqp.Queue, error)

DeclareQueueWithConfig declares a queue using QueueConfig

func (*Consumer) DeclareQuorumQueue

func (c *Consumer) DeclareQuorumQueue(name string) (amqp.Queue, error)

DeclareQuorumQueue declares a production-ready quorum queue

func (*Consumer) GetConnection

func (c *Consumer) GetConnection() *Connection

GetConnection returns the underlying connection for advanced operations

func (*Consumer) IsConnected

func (c *Consumer) IsConnected() bool

IsConnected checks if the consumer is connected

func (*Consumer) Stop

func (c *Consumer) Stop()

Stop stops the consumer

type ConsumerConfig

type ConsumerConfig struct {
	ConnectionConfig
	AutoAck         bool
	Exclusive       bool
	NoLocal         bool
	NoWait          bool
	PrefetchCount   int
	PrefetchSize    int
	PrefetchGlobal  bool
	MessageTimeout  time.Duration // Timeout for processing individual messages
	ShutdownTimeout time.Duration // Timeout for graceful consumer shutdown
}

ConsumerConfig holds configuration for the consumer

type DeliveryHandler

type DeliveryHandler func(ctx context.Context, delivery amqp.Delivery) error

DeliveryHandler is a function type for handling raw AMQP deliveries

type DeliveryInfo

type DeliveryInfo struct {
	MessageCount uint32
	Exchange     string
	RoutingKey   string
	Redelivered  bool
	DeliveryTag  uint64
	// Message metadata from AMQP properties
	MessageID     string
	CorrelationID string
	ReplyTo       string
	Type          string
	AppID         string
	UserID        string
	Timestamp     time.Time
	ContentType   string
	Priority      uint8
	Headers       map[string]any
}

DeliveryInfo contains information about a delivered message

func ExtractDeliveryInfo

func ExtractDeliveryInfo(delivery *amqp.Delivery) DeliveryInfo

ExtractDeliveryInfo extracts delivery information from an AMQP delivery

type EnvConfig added in v1.1.0

type EnvConfig struct {
	// Connection basics
	Username string `env:"RABBITMQ_USERNAME,default=guest"`
	Password string `env:"RABBITMQ_PASSWORD,default=guest"`
	Host     string `env:"RABBITMQ_HOST,default=localhost"`
	Port     int    `env:"RABBITMQ_PORT,default=5672"`
	VHost    string `env:"RABBITMQ_VHOST,default=/"`

	// Protocol and security
	Protocol    string `env:"RABBITMQ_PROTOCOL,default=amqp"` // amqp or amqps
	TLSEnabled  bool   `env:"RABBITMQ_TLS_ENABLED,default=false"`
	TLSInsecure bool   `env:"RABBITMQ_TLS_INSECURE,default=false"` // Skip cert verification

	// HTTP Management API
	HTTPProtocol string `env:"RABBITMQ_HTTP_PROTOCOL,default=http"` // http or https
	HTTPPort     int    `env:"RABBITMQ_HTTP_PORT,default=15672"`

	// Connection behavior
	ConnectionName string        `env:"RABBITMQ_CONNECTION_NAME,default=go-rabbitmq"`
	Heartbeat      time.Duration `env:"RABBITMQ_HEARTBEAT,default=10s"`
	RetryAttempts  int           `env:"RABBITMQ_RETRY_ATTEMPTS,default=5"`
	RetryDelay     time.Duration `env:"RABBITMQ_RETRY_DELAY,default=2s"`

	// Timeouts
	DialTimeout    time.Duration `env:"RABBITMQ_DIAL_TIMEOUT,default=30s"`
	ChannelTimeout time.Duration `env:"RABBITMQ_CHANNEL_TIMEOUT,default=10s"`

	// Auto-reconnection
	AutoReconnect        bool          `env:"RABBITMQ_AUTO_RECONNECT,default=true"`
	ReconnectDelay       time.Duration `env:"RABBITMQ_RECONNECT_DELAY,default=5s"`
	MaxReconnectAttempts int           `env:"RABBITMQ_MAX_RECONNECT_ATTEMPTS,default=0"` // 0 = unlimited

	// Publisher settings
	PublisherConfirmationTimeout time.Duration `env:"RABBITMQ_PUBLISHER_CONFIRMATION_TIMEOUT,default=5s"`
	PublisherShutdownTimeout     time.Duration `env:"RABBITMQ_PUBLISHER_SHUTDOWN_TIMEOUT,default=15s"`
	PublisherPersistent          bool          `env:"RABBITMQ_PUBLISHER_PERSISTENT,default=true"`

	// Consumer settings
	ConsumerPrefetchCount   int           `env:"RABBITMQ_CONSUMER_PREFETCH_COUNT,default=1"`
	ConsumerAutoAck         bool          `env:"RABBITMQ_CONSUMER_AUTO_ACK,default=false"`
	ConsumerMessageTimeout  time.Duration `env:"RABBITMQ_CONSUMER_MESSAGE_TIMEOUT,default=5m"`
	ConsumerShutdownTimeout time.Duration `env:"RABBITMQ_CONSUMER_SHUTDOWN_TIMEOUT,default=30s"`
}

EnvConfig holds all RabbitMQ configuration that can be loaded from environment variables

func LoadFromEnv added in v1.1.0

func LoadFromEnv() (*EnvConfig, error)

LoadFromEnv loads configuration from environment variables using default RABBITMQ_ prefix

func LoadFromEnvWithPrefix added in v1.1.0

func LoadFromEnvWithPrefix(prefix string) (*EnvConfig, error)

LoadFromEnvWithPrefix loads configuration from environment variables with custom prefix

func (*EnvConfig) BuildAMQPURL added in v1.1.0

func (e *EnvConfig) BuildAMQPURL() string

BuildAMQPURL constructs the AMQP connection URL from environment configuration

func (*EnvConfig) BuildHTTPURL added in v1.1.0

func (e *EnvConfig) BuildHTTPURL() string

BuildHTTPURL constructs the HTTP management API URL from environment configuration

func (*EnvConfig) ToConnectionConfig added in v1.1.0

func (e *EnvConfig) ToConnectionConfig() ConnectionConfig

ToConnectionConfig converts EnvConfig to ConnectionConfig

func (*EnvConfig) ToConsumerConfig added in v1.1.0

func (e *EnvConfig) ToConsumerConfig() ConsumerConfig

ToConsumerConfig converts EnvConfig to ConsumerConfig

func (*EnvConfig) ToPublisherConfig added in v1.1.0

func (e *EnvConfig) ToPublisherConfig() PublisherConfig

ToPublisherConfig converts EnvConfig to PublisherConfig

type Error

type Error struct {
	Type    string
	Message string
	Cause   error
}

Error types for better error handling

func NewConnectionError

func NewConnectionError(message string, cause error) *Error

NewConnectionError creates a new connection error

func NewConsumeError

func NewConsumeError(message string, cause error) *Error

NewConsumeError creates a new consume error

func NewPublishError

func NewPublishError(message string, cause error) *Error

NewPublishError creates a new publish error

func (*Error) Error

func (e *Error) Error() string

func (*Error) Unwrap

func (e *Error) Unwrap() error

type ExchangeConfig

type ExchangeConfig struct {
	Name       string
	Type       ExchangeType
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Arguments  map[string]any
}

ExchangeConfig holds configuration for declaring an exchange

type ExchangeType

type ExchangeType string

ExchangeType represents different types of exchanges

const (
	ExchangeTypeDirect  ExchangeType = "direct"
	ExchangeTypeFanout  ExchangeType = "fanout"
	ExchangeTypeTopic   ExchangeType = "topic"
	ExchangeTypeHeaders ExchangeType = "headers"
)

type InFlightTracker

type InFlightTracker struct {
	// contains filtered or unexported fields
}

InFlightTracker tracks in-flight operations for graceful shutdown

func NewInFlightTracker

func NewInFlightTracker() *InFlightTracker

NewInFlightTracker creates a new in-flight operations tracker

func (*InFlightTracker) Close

func (ift *InFlightTracker) Close() error

Close prevents new operations and waits for existing ones to complete

func (*InFlightTracker) CloseWithTimeout

func (ift *InFlightTracker) CloseWithTimeout(timeout time.Duration) error

CloseWithTimeout waits for in-flight operations with a timeout

func (*InFlightTracker) Done

func (ift *InFlightTracker) Done()

Done marks the completion of an operation

func (*InFlightTracker) IsClosed

func (ift *InFlightTracker) IsClosed() bool

IsClosed returns true if the tracker is closed

func (*InFlightTracker) Start

func (ift *InFlightTracker) Start() bool

Start marks the beginning of an operation

type Message

type Message struct {
	Body        []byte
	ContentType string
	Headers     map[string]any
	Exchange    string
	RoutingKey  string
	Persistent  bool
	// Message identification and tracing
	MessageID     string // Unique message identifier (auto-generated if empty)
	CorrelationID string // Correlation ID for request-response patterns
	ReplyTo       string // Reply queue for RPC patterns
	// Message metadata
	Type   string // Message type/schema identifier
	AppID  string // Application ID that originated the message
	UserID string // User ID (if authenticated)
	// Timing and expiration
	Timestamp  int64  // Unix timestamp when message was created
	Expiration string // Message expiration (in milliseconds as string)
	// Message priority (0-255, higher = more priority)
	Priority uint8
}

Message represents a message with metadata

func NewJSONMessage

func NewJSONMessage(body []byte) *Message

NewJSONMessage creates a new Message for JSON content

func NewMessage

func NewMessage(body []byte) *Message

NewMessage creates a new Message with auto-generated ID and timestamp

func NewMessageWithID

func NewMessageWithID(body []byte, messageID string) *Message

NewMessageWithID creates a new Message with a specific ID

func NewTextMessage

func NewTextMessage(body []byte) *Message

NewTextMessage creates a new Message for plain text content

func (*Message) ToPublishing

func (m *Message) ToPublishing() amqp.Publishing

ToPublishing converts a Message to amqp.Publishing

func (*Message) WithAppID

func (m *Message) WithAppID(appID string) *Message

WithAppID sets the application ID

func (*Message) WithCorrelationID

func (m *Message) WithCorrelationID(correlationID string) *Message

WithCorrelationID sets the correlation ID for request-response patterns

func (*Message) WithExpiration

func (m *Message) WithExpiration(expiration time.Duration) *Message

WithExpiration sets message expiration in duration

func (*Message) WithHeader

func (m *Message) WithHeader(key string, value any) *Message

WithHeader adds a custom header to the message

func (*Message) WithHeaders

func (m *Message) WithHeaders(headers map[string]any) *Message

WithHeaders adds multiple custom headers to the message

func (*Message) WithPriority

func (m *Message) WithPriority(priority uint8) *Message

WithPriority sets message priority (0-255, higher = more priority)

func (*Message) WithReplyTo

func (m *Message) WithReplyTo(replyTo string) *Message

WithReplyTo sets the reply queue for RPC patterns

func (*Message) WithType

func (m *Message) WithType(messageType string) *Message

WithType sets the message type/schema identifier

func (*Message) WithUserID

func (m *Message) WithUserID(userID string) *Message

WithUserID sets the user ID (if authenticated)

type MessageHandler

type MessageHandler func(ctx context.Context, message []byte) error

MessageHandler is a function type for handling consumed messages

type PublishConfig

type PublishConfig struct {
	Exchange    string
	RoutingKey  string
	Message     []byte
	ContentType string
	Headers     map[string]interface{}
	Persistent  *bool // Optional override for message persistence
	// Message identification and tracing (for backward compatibility)
	MessageID     string
	CorrelationID string
	ReplyTo       string
	Type          string
	AppID         string
	UserID        string
	Expiration    string
	Priority      uint8
}

PublishConfig holds configuration for a single publish operation

type PublishMessageConfig

type PublishMessageConfig struct {
	Exchange   string
	RoutingKey string
	Message    *Message
}

PublishMessageConfig holds configuration for publishing Message objects

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher handles publishing messages to RabbitMQ

func NewPublisher

func NewPublisher() (*Publisher, error)

NewPublisher creates a new publisher using environment configuration Set RABBITMQ_* environment variables or defaults will be used

func NewPublisherWithConfig

func NewPublisherWithConfig(config PublisherConfig) (*Publisher, error)

NewPublisherWithConfig creates a new publisher with custom configuration

func NewPublisherWithPrefix added in v1.1.0

func NewPublisherWithPrefix(prefix string) (*Publisher, error)

NewPublisherWithPrefix creates a new publisher using environment configuration with custom prefix Example: NewPublisherWithPrefix("MYAPP_") looks for MYAPP_RABBITMQ_HOST, etc.

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the publisher connection with graceful shutdown timeout

func (*Publisher) DeclareExchange

func (p *Publisher) DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool, args map[string]interface{}) error

DeclareExchange declares an exchange with the given parameters

func (*Publisher) DeclareHAQueue

func (p *Publisher) DeclareHAQueue(name string) error

DeclareHAQueue declares a production-ready HA classic queue

func (*Publisher) DeclareQueue

func (p *Publisher) DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool, args map[string]interface{}) error

DeclareQueue declares a queue with the given parameters

func (*Publisher) DeclareQueueWithConfig

func (p *Publisher) DeclareQueueWithConfig(config QueueConfig) error

DeclareQueueWithConfig declares a queue using QueueConfig

func (*Publisher) DeclareQuorumQueue

func (p *Publisher) DeclareQuorumQueue(name string) error

DeclareQuorumQueue declares a production-ready quorum queue

func (*Publisher) GetConnection

func (p *Publisher) GetConnection() *Connection

GetConnection returns the underlying connection for advanced operations

func (*Publisher) IsConnected

func (p *Publisher) IsConnected() bool

IsConnected checks if the publisher is connected

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, config PublishConfig) error

Publish publishes a message to RabbitMQ

func (*Publisher) PublishMessage

func (p *Publisher) PublishMessage(ctx context.Context, config PublishMessageConfig) error

PublishMessage publishes a Message object to RabbitMQ

func (*Publisher) PublishWithConfirmation

func (p *Publisher) PublishWithConfirmation(ctx context.Context, config PublishConfig) error

PublishWithConfirmation publishes a message with confirmation

type PublisherConfig

type PublisherConfig struct {
	ConnectionConfig
	DefaultExchange     string
	DefaultRoutingKey   string
	Persistent          bool
	Mandatory           bool
	Immediate           bool
	ConfirmationTimeout time.Duration // Timeout for publisher confirmations
	ShutdownTimeout     time.Duration // Timeout for graceful publisher shutdown
}

PublisherConfig holds configuration for the publisher

type QueueConfig

type QueueConfig struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Arguments  map[string]any
	// Cluster-aware settings
	QueueType            QueueType // Queue type: classic, quorum, stream
	HighAvailability     bool      // Enable HA for classic queues
	ReplicationFactor    int       // Replication factor for quorum queues (default: 3)
	MaxLength            int       // Maximum queue length (0 = unlimited)
	MaxLengthBytes       int       // Maximum queue size in bytes (0 = unlimited)
	MessageTTL           int       // Message TTL in milliseconds (0 = no TTL)
	DeadLetterExchange   string    // Dead letter exchange name
	DeadLetterRoutingKey string    // Dead letter routing key
	// Dead Letter Infrastructure (NEW)
	AutoCreateDLX bool   // Automatically create dead letter exchange and queue (default: true)
	DLXSuffix     string // Suffix for DLX name (default: ".dlx")
	DLQSuffix     string // Suffix for DLQ name (default: ".dlq")
	DLQMaxLength  int    // Max length for dead letter queue (0 = unlimited)
	DLQMessageTTL int    // TTL for messages in DLQ in milliseconds (0 = no TTL, default: 7 days)
}

QueueConfig holds configuration for declaring a queue

func DefaultClassicQueueConfig

func DefaultClassicQueueConfig(name string) QueueConfig

DefaultClassicQueueConfig returns a basic durable classic queue configuration

func DefaultHAQueueConfig

func DefaultHAQueueConfig(name string) QueueConfig

DefaultHAQueueConfig returns a production-ready HA classic queue configuration

func DefaultQuorumQueueConfig

func DefaultQuorumQueueConfig(name string) QueueConfig

DefaultQuorumQueueConfig returns a production-ready quorum queue configuration

func (*QueueConfig) GetDLQConfig

func (q *QueueConfig) GetDLQConfig() QueueConfig

GetDLQConfig returns a QueueConfig for the dead letter queue

func (*QueueConfig) GetDLQName

func (q *QueueConfig) GetDLQName() string

GetDLQName returns the dead letter queue name for this queue config

func (*QueueConfig) GetDLXConfig

func (q *QueueConfig) GetDLXConfig() ExchangeConfig

GetDLXConfig returns an ExchangeConfig for the dead letter exchange

func (*QueueConfig) GetDLXName

func (q *QueueConfig) GetDLXName() string

GetDLXName returns the dead letter exchange name for this queue config

func (*QueueConfig) ToArguments

func (q *QueueConfig) ToArguments() map[string]any

ToArguments converts the QueueConfig to RabbitMQ queue arguments

func (*QueueConfig) WithCustomDeadLetter

func (q *QueueConfig) WithCustomDeadLetter(dlxName, routingKey string) *QueueConfig

WithCustomDeadLetter configures a custom dead letter exchange (disables auto-creation)

func (*QueueConfig) WithDeadLetter

func (q *QueueConfig) WithDeadLetter(dlxSuffix, dlqSuffix string, dlqTTLDays int) *QueueConfig

WithDeadLetter enables and configures dead letter infrastructure

func (*QueueConfig) WithoutDeadLetter

func (q *QueueConfig) WithoutDeadLetter() *QueueConfig

WithoutDeadLetter disables automatic dead letter infrastructure creation

type QueueType

type QueueType string

QueueType represents the type of queue

const (
	QueueTypeClassic QueueType = "classic"
	QueueTypeQuorum  QueueType = "quorum"
	QueueTypeStream  QueueType = "stream"
)

type ShutdownConfig

type ShutdownConfig struct {
	Timeout           time.Duration // Overall shutdown timeout
	SignalTimeout     time.Duration // Additional time to wait after receiving signal
	GracefulDrainTime time.Duration // Time to allow for in-flight operations to complete
}

ShutdownConfig holds configuration for the shutdown manager

func DefaultShutdownConfig

func DefaultShutdownConfig() ShutdownConfig

DefaultShutdownConfig returns sensible default shutdown configuration

type ShutdownManager

type ShutdownManager struct {
	// contains filtered or unexported fields
}

ShutdownManager provides coordinated graceful shutdown for RabbitMQ components

func NewShutdownManager

func NewShutdownManager(config ShutdownConfig) *ShutdownManager

NewShutdownManager creates a new shutdown manager

func (*ShutdownManager) IsShutdown

func (sm *ShutdownManager) IsShutdown() bool

IsShutdown returns true if shutdown has been initiated

func (*ShutdownManager) Register

func (sm *ShutdownManager) Register(component Closable)

Register adds a component to be managed during shutdown

func (*ShutdownManager) SetupSignalHandler

func (sm *ShutdownManager) SetupSignalHandler() <-chan os.Signal

SetupSignalHandler sets up signal handling for graceful shutdown

func (*ShutdownManager) Shutdown

func (sm *ShutdownManager) Shutdown()

Shutdown performs graceful shutdown of all registered components

func (*ShutdownManager) Wait

func (sm *ShutdownManager) Wait()

Wait blocks until shutdown is complete

func (*ShutdownManager) WaitWithContext

func (sm *ShutdownManager) WaitWithContext(ctx context.Context) error

WaitWithContext blocks until shutdown is complete or context is cancelled

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL