Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // URI is the connection string for RabbitMQ. It should follow the AMQP URI format, e.g., // "amqp://guest:guest@localhost:5672/". It is used to establish the connection to the RabbitMQ broker. URI string // RetryDuration specifies the time duration to wait before retrying a failed connection attempt. // This is useful to implement a backoff strategy in case of temporary network issues. RetryDuration time.Duration // AMQPConfig holds AMQP-specific configuration options. If nil, default AMQP configurations // are used. This can include settings like heartbeat intervals and other advanced AMQP features. AMQPConfig *amqp.Config // MaxChannels defines the maximum number of channels that can be opened to the RabbitMQ server. // If the value is 0 or negative, the default is used (which may be 10 channels). MaxChannels int }
Config holds the configuration options for establishing a RabbitMQ connection and managing consumer channels.
type Connection ¶
type Connection struct { // Connection is the underlying AMQP connection provided by the AMQP client library. // It provides the basic connection functionalities such as opening channels, closing the connection, etc. *amqp.Connection // contains filtered or unexported fields }
Connection wraps the actual AMQP connection and provides reconnection logic, ensuring that the connection and channels remain active or are re-established when necessary.
func NewConnection ¶
func NewConnection(config *Config) *Connection
NewConnection creates a new Connection object and initializes it with the provided configuration
func (*Connection) Close ¶
func (c *Connection) Close() error
Close closes the RabbitMQ connection and all open channels
func (*Connection) Connect ¶
func (c *Connection) Connect() error
Connect establishes the RabbitMQ connection with automatic retry on failure
func (*Connection) IsClosed ¶
func (c *Connection) IsClosed() bool
IsClosed checks if the RabbitMQ connection is closed
func (*Connection) OpenChannel ¶
func (c *Connection) OpenChannel() (*amqp.Channel, error)
OpenChannel opens a new channel with retry logic, ensuring it doesn't exceed MaxChannels
func (*Connection) Reconnect ¶
func (c *Connection) Reconnect() error
Reconnect attempts to reconnect if the connection was lost
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents an AMQP consumer that consumes messages from a RabbitMQ queue. It manages the connection, channel, and queue configurations for the consumer.
func NewConsumer ¶
func NewConsumer(conn *Connection, name, queue string, options QueueOptions) (*Consumer, error)
NewConsumer initializes a new Consumer instance and declares the queue with the specified options
type Message ¶ added in v0.1.1
type Message struct { // Body is the content of the message received from the queue. // It contains the actual data sent by the producer. Body []byte // Ack is a function that, when called, acknowledges the message. // It informs RabbitMQ that the message has been successfully processed. Ack func() // Nack is a function that, when called, negatively acknowledges the message. // It informs RabbitMQ that the message was not processed successfully, and depending on // the RabbitMQ configuration, it may be re-delivered or discarded. Nack func() // Reject is a function that, when called, rejects the message. // It informs RabbitMQ that the message was not processed and does not want it to be re-delivered. Reject func() // QueueName is the name of the RabbitMQ queue from which the message was consumed. // It helps to identify the source of the message. QueueName string // ConsumerID is the unique identifier of the consumer that received the message. // It can be used for logging or to distinguish between multiple consumers. ConsumerID string }
Message represents a message received by a consumer from a RabbitMQ queue. It includes the message body and methods for acknowledging, negatively acknowledging, or rejecting the message, as well as metadata about the message's origin.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer represents an AMQP producer that sends messages to a specified RabbitMQ queue.
func NewProducer ¶
func NewProducer(conn *Connection, name, queue string) (*Producer, error)
NewProducer initializes a new Producer instance and declares the queue with the specified options
type PublishOptions ¶
type PublishOptions struct { // Exchange is the name of the exchange to which the message will be published. // The exchange determines how the message will be routed to queues. Exchange string // RoutingKey is the routing key used by the exchange to decide where to route the message. // The value depends on the type of exchange (e.g., direct, topic). RoutingKey string // ContentType specifies the content type of the message. It helps the consumer interpret the message body. // For example, "text/plain" or "application/json". ContentType string // Body is the actual message content, represented as a byte slice. // This is the payload of the message being sent to the RabbitMQ exchange. Body []byte // Mandatory indicates whether the message is mandatory. // If true, RabbitMQ will return the message to the producer if it cannot be routed to a queue. Mandatory bool // Immediate indicates whether the message is immediate. // If true, RabbitMQ will try to deliver the message to a consumer immediately, if possible. Immediate bool // Headers is an optional map of headers that can be included with the message. // These headers can carry metadata or additional information to help the consumer process the message. Headers amqp.Table }
PublishOptions represents the configuration options for publishing a message to RabbitMQ.
type QueueOptions ¶
type QueueOptions struct { // Durable specifies whether the queue should survive broker restarts. If true, the queue will // be durable, meaning it will persist even if RabbitMQ crashes or restarts. Durable bool // AutoAck enables or disables automatic message acknowledgment. If true, messages are automatically // acknowledged by the broker once they are received by the consumer. AutoAck bool // AutoDelete determines whether the queue should be automatically deleted when no consumers are // connected. If true, the queue will be deleted when the last consumer disconnects. AutoDelete bool // Exclusive makes the queue private to the connection that created it. If true, the queue can only // be used by the connection that declared it and will be deleted once that connection closes. Exclusive bool // NoWait prevents the server from sending a response to the queue declaration. If true, the declaration // will not wait for an acknowledgment from the server and no error will be returned if the queue already exists. NoWait bool // NoLocal prevents the delivery of messages to the connection that published them. If true, messages // will not be delivered to the connection that created the queue. NoLocal bool // Args allows additional arguments to be passed when declaring the queue. This can be used for advanced // RabbitMQ configurations, such as setting arguments for policies or defining queue TTLs (Time-To-Live). Args amqp.Table }
QueueOptions represents the configuration options for declaring a queue in RabbitMQ.