rabbitmq

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: MIT Imports: 6 Imported by: 0

README

Go RabbitMQ Auto-Reconnect Project

go-github release (latest SemVer)

This project demonstrates a simple implementation of a RabbitMQ consumer and producer in Go with automatic reconnection capabilities.

Explores the hypothesis of using a single connection for many channels, both for producers and consumers, in a RabbitMQ setup. This architectural choice aims to optimize resource usage and improve performance in scenarios with a large number of channels.

Requirements

  • go 1.21
Installation
  1. use the module:
   go get github.com/afaf-tech/go-rabbitmq 
Consumer
# How to use the consumer
Producer
# How to use the producer

Backlog

  • Proper Logging
  • Producer function

License

This project is licensed under the MIT License - see the LICENSE.md file for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// URI is the connection string for RabbitMQ. It should follow the AMQP URI format, e.g.,
	// "amqp://guest:guest@localhost:5672/". It is used to establish the connection to the RabbitMQ broker.
	URI string

	// RetryDuration specifies the time duration to wait before retrying a failed connection attempt.
	// This is useful to implement a backoff strategy in case of temporary network issues.
	RetryDuration time.Duration

	// AMQPConfig holds AMQP-specific configuration options. If nil, default AMQP configurations
	// are used. This can include settings like heartbeat intervals and other advanced AMQP features.
	AMQPConfig *amqp.Config

	// MaxChannels defines the maximum number of channels that can be opened to the RabbitMQ server.
	// If the value is 0 or negative, the default is used (which may be 10 channels).
	MaxChannels int
}

Config holds the configuration options for establishing a RabbitMQ connection and managing consumer channels.

type Connection

type Connection struct {
	// Connection is the underlying AMQP connection provided by the AMQP client library.
	// It provides the basic connection functionalities such as opening channels, closing the connection, etc.
	*amqp.Connection
	// contains filtered or unexported fields
}

Connection wraps the actual AMQP connection and provides reconnection logic, ensuring that the connection and channels remain active or are re-established when necessary.

func NewConnection

func NewConnection(config *Config) *Connection

NewConnection creates a new Connection object and initializes it with the provided configuration

func (*Connection) Close

func (c *Connection) Close() error

Close closes the RabbitMQ connection and all open channels

func (*Connection) Connect

func (c *Connection) Connect() error

Connect establishes the RabbitMQ connection with automatic retry on failure

func (*Connection) IsClosed

func (c *Connection) IsClosed() bool

IsClosed checks if the RabbitMQ connection is closed

func (*Connection) OpenChannel

func (c *Connection) OpenChannel() (*amqp.Channel, error)

OpenChannel opens a new channel with retry logic, ensuring it doesn't exceed MaxChannels

func (*Connection) Reconnect

func (c *Connection) Reconnect() error

Reconnect attempts to reconnect if the connection was lost

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents an AMQP consumer that consumes messages from a RabbitMQ queue. It manages the connection, channel, and queue configurations for the consumer.

func NewConsumer

func NewConsumer(conn *Connection, name, queue string, options QueueOptions) (*Consumer, error)

NewConsumer initializes a new Consumer instance and declares the queue with the specified options

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context, handler func(ctx context.Context, msg *Message))

type Message added in v0.1.1

type Message struct {
	// Body is the content of the message received from the queue.
	// It contains the actual data sent by the producer.
	Body []byte

	// Ack is a function that, when called, acknowledges the message.
	// It informs RabbitMQ that the message has been successfully processed.
	Ack func()

	// Nack is a function that, when called, negatively acknowledges the message.
	// It informs RabbitMQ that the message was not processed successfully, and depending on
	// the RabbitMQ configuration, it may be re-delivered or discarded.
	Nack func()

	// Reject is a function that, when called, rejects the message.
	// It informs RabbitMQ that the message was not processed and does not want it to be re-delivered.
	Reject func()

	// QueueName is the name of the RabbitMQ queue from which the message was consumed.
	// It helps to identify the source of the message.
	QueueName string

	// ConsumerID is the unique identifier of the consumer that received the message.
	// It can be used for logging or to distinguish between multiple consumers.
	ConsumerID string
}

Message represents a message received by a consumer from a RabbitMQ queue. It includes the message body and methods for acknowledging, negatively acknowledging, or rejecting the message, as well as metadata about the message's origin.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer represents an AMQP producer that sends messages to a specified RabbitMQ queue.

func NewProducer

func NewProducer(conn *Connection, name, queue string) (*Producer, error)

NewProducer initializes a new Producer instance and declares the queue with the specified options

func (*Producer) Close

func (p *Producer) Close()

Close gracefully shuts down the producer

func (*Producer) Publish

func (p *Producer) Publish(ctx context.Context, message []byte, options PublishOptions) error

Publish sends a message to the queue with retry logic and context support

type PublishOptions

type PublishOptions struct {
	// Exchange is the name of the exchange to which the message will be published.
	// The exchange determines how the message will be routed to queues.
	Exchange string

	// RoutingKey is the routing key used by the exchange to decide where to route the message.
	// The value depends on the type of exchange (e.g., direct, topic).
	RoutingKey string

	// ContentType specifies the content type of the message. It helps the consumer interpret the message body.
	// For example, "text/plain" or "application/json".
	ContentType string

	// Body is the actual message content, represented as a byte slice.
	// This is the payload of the message being sent to the RabbitMQ exchange.
	Body []byte

	// Mandatory indicates whether the message is mandatory.
	// If true, RabbitMQ will return the message to the producer if it cannot be routed to a queue.
	Mandatory bool

	// Immediate indicates whether the message is immediate.
	// If true, RabbitMQ will try to deliver the message to a consumer immediately, if possible.
	Immediate bool

	// Headers is an optional map of headers that can be included with the message.
	// These headers can carry metadata or additional information to help the consumer process the message.
	Headers amqp.Table
}

PublishOptions represents the configuration options for publishing a message to RabbitMQ.

type QueueOptions

type QueueOptions struct {
	// Durable specifies whether the queue should survive broker restarts. If true, the queue will
	// be durable, meaning it will persist even if RabbitMQ crashes or restarts.
	Durable bool

	// AutoAck enables or disables automatic message acknowledgment. If true, messages are automatically
	// acknowledged by the broker once they are received by the consumer.
	AutoAck bool

	// AutoDelete determines whether the queue should be automatically deleted when no consumers are
	// connected. If true, the queue will be deleted when the last consumer disconnects.
	AutoDelete bool

	// Exclusive makes the queue private to the connection that created it. If true, the queue can only
	// be used by the connection that declared it and will be deleted once that connection closes.
	Exclusive bool

	// NoWait prevents the server from sending a response to the queue declaration. If true, the declaration
	// will not wait for an acknowledgment from the server and no error will be returned if the queue already exists.
	NoWait bool

	// NoLocal prevents the delivery of messages to the connection that published them. If true, messages
	// will not be delivered to the connection that created the queue.
	NoLocal bool

	// Args allows additional arguments to be passed when declaring the queue. This can be used for advanced
	// RabbitMQ configurations, such as setting arguments for policies or defining queue TTLs (Time-To-Live).
	Args amqp.Table
}

QueueOptions represents the configuration options for declaring a queue in RabbitMQ.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL