Documentation
¶
Overview ¶
Package grabbit provides a simplified and idiomatic wrapper around the RabbitMQ Go client, making it easier to consume messages using common AMQP patterns.
Key features include: - Easy configuration of exchanges, queues, and bindings. - Middleware support for reusable message processing logic. - Context integration for graceful shutdowns. - Customizable error handling. - Support for advanced connection settings, including TLS. - Broker state management and metrics tracking.
Index ¶
- func WithBindingArgs(args amqp.Table) func(*BindingOptions)
- func WithBindingHeaders(headers amqp.Table) func(*BindingOptions)
- func WithBindingNoWait(noWait bool) func(*BindingOptions)
- func WithConsumerArgs(args amqp.Table) func(*ConsumerOptions)
- func WithConsumerAutoAck(autoAck bool) func(*ConsumerOptions)
- func WithConsumerExclusive(exclusive bool) func(*ConsumerOptions)
- func WithConsumerNoLocal(noLocal bool) func(*ConsumerOptions)
- func WithConsumerNoWait(noWait bool) func(*ConsumerOptions)
- func WithConsumerTag(tag string) func(*ConsumerOptions)
- func WithExchangeArgs(args amqp.Table) func(*ExchangeOptions)
- func WithExchangeAutoDelete(autoDelete bool) func(*ExchangeOptions)
- func WithExchangeDurable(durable bool) func(*ExchangeOptions)
- func WithExchangeInternal(internal bool) func(*ExchangeOptions)
- func WithExchangeNoWait(noWait bool) func(*ExchangeOptions)
- func WithQoSPrefetchSize(prefetchSize int) func(*QoSOptions)
- func WithQueueArgs(args amqp.Table) func(*QueueOptions)
- func WithQueueAutoDelete(autoDelete bool) func(*QueueOptions)
- func WithQueueDeadLetterExchange(exchange string) func(*QueueOptions)
- func WithQueueDurable(durable bool) func(*QueueOptions)
- func WithQueueExclusive(exclusive bool) func(*QueueOptions)
- func WithQueueMaxRetries(max int) func(*QueueOptions)
- func WithQueueNoWait(noWait bool) func(*QueueOptions)
- type BackoffConfig
- type BindingOptions
- type Broker
- func (b *Broker) Consumer(name string, handler HandlerFunc) *Consumer
- func (b *Broker) GetStatus() BrokerStatus
- func (b *Broker) SetBackoffConfig(config BackoffConfig)
- func (b *Broker) SetConfig(config amqp.Config)
- func (b *Broker) SetErrorHandler(handler ErrorHandler)
- func (b *Broker) Shutdown() error
- func (b *Broker) Start(url string) error
- func (b *Broker) Use(middleware ...MiddlewareFunc)
- type BrokerStatus
- type Consumer
- func (c *Consumer) Binding(routingKey string, opts ...func(*BindingOptions)) *Consumer
- func (c *Consumer) ConsumerOptions(opts ...func(*ConsumerOptions)) *Consumer
- func (c *Consumer) Exchange(name string, exchangeType ExchangeType, opts ...func(*ExchangeOptions)) *Consumer
- func (c *Consumer) QoS(prefetchCount int, opts ...func(*QoSOptions)) *Consumer
- func (c *Consumer) Queue(name string, opts ...func(*QueueOptions)) *Consumer
- func (c *Consumer) Use(middleware ...MiddlewareFunc) *Consumer
- type ConsumerOptions
- type Context
- func (c *Context) Ack(multiple bool) error
- func (c *Context) Body() []byte
- func (c *Context) Get(key string) (interface{}, bool)
- func (c *Context) Header(key string) interface{}
- func (c *Context) Nack(multiple, requeue bool) error
- func (c *Context) Reject(requeue bool) error
- func (c *Context) Set(key string, value interface{})
- type ErrorHandler
- type ExchangeOptions
- type ExchangeType
- type HandlerFunc
- type MiddlewareFunc
- type QoSOptions
- type QueueOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WithBindingArgs ¶
func WithBindingArgs(args amqp.Table) func(*BindingOptions)
WithBindingArgs sets the Args option for BindingOptions.
func WithBindingHeaders ¶
func WithBindingHeaders(headers amqp.Table) func(*BindingOptions)
WithBindingHeaders sets the Headers option for BindingOptions.
func WithBindingNoWait ¶
func WithBindingNoWait(noWait bool) func(*BindingOptions)
WithBindingNoWait sets the NoWait option for BindingOptions.
func WithConsumerArgs ¶
func WithConsumerArgs(args amqp.Table) func(*ConsumerOptions)
WithConsumerArgs sets the Args option for ConsumerOptions.
func WithConsumerAutoAck ¶
func WithConsumerAutoAck(autoAck bool) func(*ConsumerOptions)
WithConsumerAutoAck sets the AutoAck option for ConsumerOptions.
func WithConsumerExclusive ¶
func WithConsumerExclusive(exclusive bool) func(*ConsumerOptions)
WithConsumerExclusive sets the Exclusive option for ConsumerOptions.
func WithConsumerNoLocal ¶
func WithConsumerNoLocal(noLocal bool) func(*ConsumerOptions)
WithConsumerNoLocal sets the NoLocal option for ConsumerOptions.
func WithConsumerNoWait ¶
func WithConsumerNoWait(noWait bool) func(*ConsumerOptions)
WithConsumerNoWait sets the NoWait option for ConsumerOptions.
func WithConsumerTag ¶
func WithConsumerTag(tag string) func(*ConsumerOptions)
WithConsumerTag sets the ConsumerTag option for ConsumerOptions.
func WithExchangeArgs ¶
func WithExchangeArgs(args amqp.Table) func(*ExchangeOptions)
WithExchangeArgs sets the Args option for ExchangeOptions.
func WithExchangeAutoDelete ¶
func WithExchangeAutoDelete(autoDelete bool) func(*ExchangeOptions)
WithExchangeAutoDelete sets the AutoDelete option for ExchangeOptions.
func WithExchangeDurable ¶
func WithExchangeDurable(durable bool) func(*ExchangeOptions)
WithExchangeDurable sets the Durable option for ExchangeOptions.
func WithExchangeInternal ¶
func WithExchangeInternal(internal bool) func(*ExchangeOptions)
WithExchangeInternal sets the Internal option for ExchangeOptions.
func WithExchangeNoWait ¶
func WithExchangeNoWait(noWait bool) func(*ExchangeOptions)
WithExchangeNoWait sets the NoWait option for ExchangeOptions.
func WithQoSPrefetchSize ¶
func WithQoSPrefetchSize(prefetchSize int) func(*QoSOptions)
WithQoSPrefetchSize sets the PrefetchSize option for QoSOptions.
func WithQueueArgs ¶
func WithQueueArgs(args amqp.Table) func(*QueueOptions)
WithQueueArgs sets the Args option for QueueOptions.
func WithQueueAutoDelete ¶
func WithQueueAutoDelete(autoDelete bool) func(*QueueOptions)
WithQueueAutoDelete sets the AutoDelete option for QueueOptions.
func WithQueueDeadLetterExchange ¶
func WithQueueDeadLetterExchange(exchange string) func(*QueueOptions)
WithQueueDeadLetterExchange sets the dead-letter exchange for the queue.
func WithQueueDurable ¶
func WithQueueDurable(durable bool) func(*QueueOptions)
WithQueueDurable sets the Durable option for QueueOptions.
func WithQueueExclusive ¶
func WithQueueExclusive(exclusive bool) func(*QueueOptions)
WithQueueExclusive sets the Exclusive option for QueueOptions.
func WithQueueMaxRetries ¶
func WithQueueMaxRetries(max int) func(*QueueOptions)
WithQueueMaxRetries sets the maximum number of retries for the queue.
func WithQueueNoWait ¶
func WithQueueNoWait(noWait bool) func(*QueueOptions)
WithQueueNoWait sets the NoWait option for QueueOptions.
Types ¶
type BackoffConfig ¶
type BackoffConfig struct {
// InitialInterval is the initial wait time before reconnecting.
InitialInterval time.Duration
// MaxInterval is the maximum wait time between reconnections.
MaxInterval time.Duration
// Multiplier is the multiplier for exponential backoff.
Multiplier float64
}
BackoffConfig defines the configuration for exponential backoff.
type BindingOptions ¶
type BindingOptions struct {
// RoutingKey is the routing key for binding.
RoutingKey string
// Headers are the headers for a headers exchange.
Headers amqp.Table
// NoWait indicates that the server should not wait for the binding.
NoWait bool
// Args are additional arguments for the binding.
Args amqp.Table
}
BindingOptions defines the options for binding a queue to an exchange.
type Broker ¶
type Broker struct {
StatusChan chan BrokerStatus
// contains filtered or unexported fields
}
Broker manages connections and consumers for RabbitMQ.
func NewBroker ¶
NewBroker creates a new Broker instance with the provided application context. The application context is used to listen for cancellation signals and access global configurations.
func (*Broker) Consumer ¶
func (b *Broker) Consumer(name string, handler HandlerFunc) *Consumer
Consumer creates a new Consumer with the specified name and handler, and adds it to the Broker.
func (*Broker) GetStatus ¶
func (b *Broker) GetStatus() BrokerStatus
GetStatus returns the current status of the broker.
func (*Broker) SetBackoffConfig ¶
func (b *Broker) SetBackoffConfig(config BackoffConfig)
SetBackoffConfig sets the backoff configuration for reconnections.
func (*Broker) SetConfig ¶
SetConfig sets the AMQP configuration for the broker. This allows users to customize connection settings, including TLS.
func (*Broker) SetErrorHandler ¶
func (b *Broker) SetErrorHandler(handler ErrorHandler)
SetErrorHandler sets the error handler for the broker. Users can provide their own error handler to process errors from the broker and consumers.
func (*Broker) Shutdown ¶
Shutdown gracefully shuts down the broker and all its consumers. It cancels the broker's context, triggering cancellation signals.
func (*Broker) Start ¶
Start establishes the connection to the RabbitMQ server and starts all consumers. It will attempt to reconnect and restart consumers upon connection loss. It listens for cancellation signals from the application's context.
func (*Broker) Use ¶
func (b *Broker) Use(middleware ...MiddlewareFunc)
Use adds middleware(s) to the Broker. Middleware functions will be applied to all consumers managed by the broker.
type BrokerStatus ¶
type BrokerStatus int
BrokerStatus represents the connection status of the broker.
const ( // StatusDisconnected indicates that the broker is disconnected. StatusDisconnected BrokerStatus = iota // StatusConnecting indicates that the broker is attempting to connect. StatusConnecting // StatusConnected indicates that the broker is connected. StatusConnected )
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a message consumer with its configurations.
func (*Consumer) Binding ¶
func (c *Consumer) Binding(routingKey string, opts ...func(*BindingOptions)) *Consumer
Binding configures the binding options for the consumer.
func (*Consumer) ConsumerOptions ¶
func (c *Consumer) ConsumerOptions(opts ...func(*ConsumerOptions)) *Consumer
ConsumerOptions sets the consumer options.
func (*Consumer) Exchange ¶
func (c *Consumer) Exchange(name string, exchangeType ExchangeType, opts ...func(*ExchangeOptions)) *Consumer
Exchange configures the exchange for the consumer.
func (*Consumer) QoS ¶
func (c *Consumer) QoS(prefetchCount int, opts ...func(*QoSOptions)) *Consumer
QoS sets the QoS options for the consumer.
func (*Consumer) Queue ¶
func (c *Consumer) Queue(name string, opts ...func(*QueueOptions)) *Consumer
Queue configures the queue for the consumer.
func (*Consumer) Use ¶
func (c *Consumer) Use(middleware ...MiddlewareFunc) *Consumer
Use adds middleware(s) to the Consumer. Middleware functions will be applied to the consumer's handler.
type ConsumerOptions ¶
type ConsumerOptions struct {
// ConsumerTag is the identifier for the consumer.
ConsumerTag string
// AutoAck indicates whether messages are automatically acknowledged.
AutoAck bool
// Exclusive indicates whether the consumer has exclusive access to the queue.
Exclusive bool
// NoLocal indicates that the server should not deliver messages published on the same channel.
NoLocal bool
// NoWait indicates that the server should not wait for the consumer to be registered.
NoWait bool
// Args are additional arguments for the consumer.
Args amqp.Table
}
ConsumerOptions defines the options for a consumer.
type Context ¶
type Context struct {
context.Context
Delivery amqp.Delivery
// contains filtered or unexported fields
}
Context provides methods to interact with the incoming message.
func (*Context) Ack ¶
Ack acknowledges the message, indicating successful processing. If multiple is true, all messages up to this delivery tag are acknowledged.
func (*Context) Nack ¶
Nack negatively acknowledges the message, indicating unsuccessful processing. If requeue is true, the message will be requeued. If multiple is true, multiple messages are negatively acknowledged.
type ErrorHandler ¶
type ErrorHandler func(error)
ErrorHandler is a function type for handling errors.
type ExchangeOptions ¶
type ExchangeOptions struct {
// Name is the name of the exchange.
Name string
// Type is the type of the exchange (e.g., direct, fanout, topic, headers).
Type ExchangeType
// Durable indicates whether the exchange survives broker restarts.
Durable bool
// AutoDelete indicates whether the exchange is deleted when unused.
AutoDelete bool
// Internal indicates whether the exchange is internal (used by the broker).
Internal bool
// NoWait indicates that the server should not wait for the exchange declaration.
NoWait bool
// Args are additional arguments for the exchange declaration.
Args amqp.Table
}
ExchangeOptions defines the configuration options for an exchange.
type ExchangeType ¶
type ExchangeType string
ExchangeType represents the type of an AMQP exchange.
const ( // DirectExchange routes messages to queues based on the routing key. DirectExchange ExchangeType = "direct" // FanoutExchange routes messages to all bound queues, ignoring routing keys. FanoutExchange ExchangeType = "fanout" // TopicExchange routes messages to queues based on pattern matching. TopicExchange ExchangeType = "topic" // HeadersExchange routes messages based on matching message headers. HeadersExchange ExchangeType = "headers" )
type HandlerFunc ¶
HandlerFunc defines the handler function type for processing messages.
type MiddlewareFunc ¶
type MiddlewareFunc func(HandlerFunc) HandlerFunc
MiddlewareFunc defines the middleware function type.
type QoSOptions ¶
type QoSOptions struct {
// PrefetchCount is the number of messages to prefetch.
PrefetchCount int
// PrefetchSize is the number of bytes to prefetch.
PrefetchSize int
}
QoSOptions defines the Quality of Service options for a consumer.
type QueueOptions ¶
type QueueOptions struct {
// Name is the name of the queue.
Name string
// Durable indicates whether the queue survives broker restarts.
Durable bool
// AutoDelete indicates whether the queue is deleted when unused.
AutoDelete bool
// Exclusive indicates whether the queue is exclusive to the connection.
Exclusive bool
// NoWait indicates that the server should not wait for the queue declaration.
NoWait bool
// Args are additional arguments for the queue declaration.
Args amqp.Table
}
QueueOptions defines the configuration options for a queue.