Documentation
¶
Overview ¶
Package pubsub provides a wrapper around the amqp091-go library for working with RabbitMQ. Its primary goal is automatic reconnection on connection loss, channel reinitialization, resource declaration (exchange, queue, binding), and consumer management with Ack/Nack support.
The architecture is built around a Session facade that owns the AMQP connection, channel, and three background goroutines:
- handleReconnect — monitors TCP connection loss and reconnects;
- handleReOpenChannel — recreates the AMQP channel after a channel error;
- handleConsumers — starts and restarts consumers when the channel is ready.
Coordination between goroutines is implemented via signal channels (connReadyCh, chanReadyCh, consumerUpdateCh) and atomic flags (isReady, isClosed).
Index ¶
- Variables
- type Bind
- type Consumer
- type Delivery
- type EventHandler
- type Exchange
- type Logger
- type Option
- type Queue
- type Session
- func (session *Session) AddConsumer(c *Consumer) error
- func (session *Session) ChannelClose() error
- func (session *Session) Close() error
- func (session *Session) ExchangeDeclare(ex Exchange) error
- func (session *Session) Publish(message []byte) error
- func (session *Session) PublishTo(exchange, key string, message []byte) error
- func (session *Session) PublishToWithContext(ctx context.Context, exchange, key string, message []byte) error
- func (session *Session) PublishWithContext(ctx context.Context, message []byte) error
- func (session *Session) QueueBind(b Bind) error
- func (session *Session) QueueDeclare(q Queue) error
- func (session *Session) RemoveConsumer(tag string) error
- func (session *Session) Stream(c *Consumer) (<-chan amqp.Delivery, error)
- func (session *Session) Subscribe(handler func(Delivery) error) error
- func (session *Session) SubscribeTo(queue string, handler func(Delivery) error) error
- func (session *Session) SubscribeToWithTag(queue string, handler func(Delivery) error) (string, error)
- func (session *Session) SubscribeWithTag(handler func(Delivery) error) (string, error)
- func (session *Session) UnsafePublish(message []byte, exchange, key string) error
- func (session *Session) UnsafePublishTo(exchange, key string, message []byte) error
- func (session *Session) UnsafePublishToWithContext(ctx context.Context, exchange, key string, message []byte) error
- func (session *Session) WaitReady(ctx context.Context) error
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotConnected indicates the session is not connected to the RabbitMQ server. // Returned when attempting to publish or create a stream without a ready channel. ErrNotConnected = errors.New("not connected to a server") // ErrAlreadyClosed indicates a duplicate call to Close() on an already closed session. ErrAlreadyClosed = errors.New("already closed: not connected to the server") // ErrShutdown indicates the session is in the process of shutting down. // Returned by publish methods when the done channel is closed. ErrShutdown = errors.New("session is shutting down") // ErrNotSetDefaultQueue indicates no default queue has been set. // Returned by Subscribe() if WithDeclare was not called with IsUsageDefault. ErrNotSetDefaultQueue = errors.New("default queue is not set") // ErrNotSetDefaultExchange indicates no default exchange or routing key has been set. // Returned by Publish() if WithDeclare was not called with IsUsageDefault. ErrNotSetDefaultExchange = errors.New("default exchange or key is not set") // ErrConsumerAlreadyExists indicates a consumer with the given name is already registered. // Returned by AddConsumer() to prevent silent overwrites. ErrConsumerAlreadyExists = errors.New("consumer with this name already exists") // ErrInvalidConsumer indicates invalid consumer configuration. ErrInvalidConsumer = errors.New("invalid consumer configuration") // ErrInvalidExchange indicates invalid exchange declaration parameters. ErrInvalidExchange = errors.New("invalid exchange declaration") // ErrInvalidQueue indicates invalid queue declaration parameters. ErrInvalidQueue = errors.New("invalid queue declaration") // ErrInvalidBind indicates invalid queue binding parameters. ErrInvalidBind = errors.New("invalid queue binding") // ErrInvalidDelay indicates an invalid delay option value. ErrInvalidDelay = errors.New("invalid delay option value") )
--------------------------------------------------------------------------- Standard errors returned by Session methods. ---------------------------------------------------------------------------
Functions ¶
This section is empty.
Types ¶
type Bind ¶
Bind describes a binding of a queue to an exchange via a routing key.
Fields:
- QueueName — queue name.
- ExchangeName — exchange name.
- Key — routing key used to route messages to the queue.
- IsUsageDefault — if true, this routing key will be used by Publish/PublishWithContext by default.
type Consumer ¶
type Consumer struct {
Name string
QueueName string
AutoAck bool
Exclusive bool
Handler EventHandler
// contains filtered or unexported fields
}
Consumer describes a message consumer from an AMQP queue.
Fields:
- Name — unique consumer tag. If empty, one is generated automatically.
- QueueName — name of the queue the consumer reads messages from.
- AutoAck — if true, the server considers the message delivered immediately (no Ack needed).
- Exclusive — if true, only one consumer can read from the queue.
- Handler — function called for each message.
- run — atomic flag: 1 = consumer goroutine is running, 0 = stopped. Used by handleConsumers to determine if a restart is needed.
type Delivery ¶ added in v0.10.0
Delivery represents a received message in a simplified form. It hides the internal details of amqp.Delivery, exposing only the fields useful to the handler: exchange, routing key, and message body.
type EventHandler ¶
EventHandler is a function that handles an incoming message. If it returns nil, the message is acknowledged (Ack). If it returns an error, the message is rejected (Nack) and requeued.
type Exchange ¶
type Exchange struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
IsUsageDefault bool
}
Exchange describes an AMQP exchange to be declared on the server.
Fields:
- Name — exchange name (e.g., "events").
- Kind — exchange type: "direct", "fanout", "topic", "headers".
- Durable — if true, the exchange survives server restarts.
- AutoDelete — if true, the exchange is deleted when all queues unbind from it.
- Internal — if true, the exchange is only accessible for routing between exchanges.
- IsUsageDefault — if true, this exchange will be used by Publish/PublishWithContext by default.
type Logger ¶ added in v1.0.0
Logger defines the interface for replacing the session's logger. Any implementation providing a Printf method can be passed via the WithLogger option.
type Option ¶
Option is a functional option applied to Session during New(). If an option returns an error, it is logged but session creation continues.
func WithDeclare ¶
WithDeclare creates an option that registers an exchange, queue, and binding in the session. Resources will be declared on the server on every reconnection and channel reinitialization.
If IsUsageDefault is set for the exchange, queue, or bind, they will be used as default resources for the Publish and Subscribe methods.
func WithLogger ¶ added in v1.0.0
WithLogger creates an option to replace the session's default logger. If nil is passed, the logger is not changed (defaultLogger remains).
func WithReInitDelay ¶ added in v1.0.0
WithReInitDelay overrides the delay between channel re-initialization attempts. The value must be > 0.
func WithReconnectDelay ¶ added in v1.0.0
WithReconnectDelay overrides the delay between reconnect attempts. The value must be > 0.
func WithResendDelay ¶ added in v1.0.0
WithResendDelay overrides the delay between publish retry attempts. The value must be > 0.
type Queue ¶
Queue describes an AMQP queue for declaration on the server.
Fields:
- Name — queue name. Must be non-empty.
- Durable — if true, the queue survives server restarts.
- AutoDelete — if true, the queue is deleted when all consumers disconnect.
- Exclusive — if true, the queue is only accessible by the current connection and deleted when it closes.
- IsUsageDefault — if true, this queue will be used by the Subscribe method by default.
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session encapsulates the AMQP connection, channel, resource declarations, and consumer management. It provides automatic reconnection and reinitialization on failures.
Lifecycle:
- New() creates a session and launches three background goroutines.
- handleReconnect establishes the TCP connection to RabbitMQ.
- handleReOpenChannel opens an AMQP channel, declares resources, enables confirm mode.
- handleConsumers starts consumers after the channel is ready.
- Close() stops all goroutines, waits for message processing to finish, closes the channel and connection.
Concurrent access:
- mu (sync.RWMutex) protects: queues, exchanges, binds, defaultExchange, defaultQueue, defaultBind, consumers, connection, notifyConnClose, notifyChanClose, notifyConfirm, connReadyCh, chanReadyCh.
- publishMu (sync.Mutex) serializes PublishWithContext calls to ensure correct correspondence between publishes and confirms.
- channel (atomic.Pointer) — atomic pointer to the current AMQP channel.
- isReady, isClosed (int32) — atomic state flags.
func New ¶
New creates a new session and immediately begins connection attempts to the server.
Three background goroutines are started automatically:
- handleReconnect — establishes and restores the TCP connection.
- handleReOpenChannel — opens the AMQP channel and declares resources.
- handleConsumers — starts consumers when the channel is ready.
Options (opts) are applied sequentially. If an option returns an error (e.g., WithDeclare on an already-connected channel fails to declare a resource), the error is logged but session creation continues — the resource will be re-declared on the next reconnection.
The returned session must be closed with Close() to release resources.
func (*Session) AddConsumer ¶ added in v0.7.0
AddConsumer registers a new consumer in the session.
If c.Name is empty, a unique name of the format "consumer-<hex>" is generated. If a consumer with the same name already exists, returns ErrConsumerAlreadyExists.
After registration, the consumer will be automatically started by the handleConsumers goroutine when the AMQP channel is ready. On reconnection, the consumer is restarted automatically.
func (*Session) ChannelClose ¶
ChannelClose forcibly closes the current AMQP channel. This will trigger a channel error, and handleReOpenChannel will automatically create a new one.
Useful for testing channel error recovery behavior.
func (*Session) Close ¶
Close gracefully shuts down the session.
Sequence:
- Atomically sets isClosed = 1 (CAS prevents duplicate calls).
- Closes the done channel — all goroutines receive the termination signal.
- Resets isReady = 0 — new publishes will be rejected.
- Closes chanReadyCh (setChanReady(true)) — unblocks consumer goroutines waiting for channel readiness so they can see the closed done and exit.
- Waits for all consumer goroutines to finish (consumerWG.Wait()).
- Closes the AMQP channel.
- Closes the AMQP connection (if not already closed by the server).
Returns ErrAlreadyClosed on duplicate calls. Returns the channel or connection close error (first non-nil).
func (*Session) ExchangeDeclare ¶ added in v0.9.0
ExchangeDeclare registers an exchange in the session and declares it on the server if the channel is already initialized.
If an exchange with the same name is already registered, no duplicate is added. If ex.IsUsageDefault == true, this exchange becomes the default for Publish/PublishWithContext. Exchange name and kind must be non-empty.
func (*Session) Publish ¶ added in v0.7.0
Publish sends a message to the default exchange and routing key, waiting for a server confirmation.
Blocks until:
- The server confirms receipt (Ack) — returns nil.
- The session is closed — returns ErrShutdown.
Uses context.Background(), so it cannot be cancelled externally. For cancellable publishing, use PublishWithContext.
Requires defaultExchange and defaultBind to be set (via WithDeclare with IsUsageDefault=true), otherwise returns ErrNotSetDefaultExchange.
func (*Session) PublishTo ¶ added in v0.7.0
PublishTo sends a message to the specified exchange with the specified routing key, waiting for a server confirmation.
Similar to Publish but allows explicit exchange and routing key specification. Uses context.Background().
func (*Session) PublishToWithContext ¶ added in v1.0.0
func (session *Session) PublishToWithContext(ctx context.Context, exchange, key string, message []byte) error
PublishToWithContext sends a message to the specified exchange with the specified routing key with context cancellation support.
func (*Session) PublishWithContext ¶ added in v1.0.0
PublishWithContext sends a message to the default exchange and routing key with context cancellation support.
Blocks until confirmation, context cancellation, or session shutdown. Returns ctx.Err() on context cancellation.
func (*Session) QueueBind ¶ added in v0.9.0
QueueBind registers a binding of a queue to an exchange and performs the binding on the server if the channel is already initialized.
A binding is considered a duplicate if QueueName, ExchangeName, and Key all match. If b.IsUsageDefault == true, this binding's routing key will be used by Publish/PublishWithContext by default. QueueName and ExchangeName must be non-empty.
func (*Session) QueueDeclare ¶
QueueDeclare registers a queue in the session and declares it on the server if the channel is already initialized.
If a queue with the same name is already registered, no duplicate is added. If q.IsUsageDefault == true, this queue becomes the default queue for Subscribe. Queue name must be non-empty.
func (*Session) RemoveConsumer ¶ added in v1.0.0
RemoveConsumer stops and removes a consumer by its name (tag).
Removes the consumer from the map and cancels it on the AMQP server via ch.Cancel(). After cancellation, the AMQP server closes the delivery channel, and the consumer goroutine terminates automatically.
If no consumer with the given tag is found, returns nil (not an error).
func (*Session) Stream ¶
Stream creates an AMQP consumer for the specified consumer and returns a delivery channel. This is a low-level method — for most cases, prefer Subscribe or SubscribeTo.
Returns ErrNotConnected if the session is not ready.
The delivery channel is closed automatically when the AMQP channel closes or the consumer is cancelled via ch.Cancel().
func (*Session) Subscribe ¶
Subscribe registers a handler for the default queue. Requires defaultQueue to be set (via WithDeclare with IsUsageDefault=true).
The consumer is automatically restarted on reconnection.
func (*Session) SubscribeTo ¶ added in v0.7.0
SubscribeTo registers a handler for the specified queue. Creates a Consumer with an auto-generated name and registers it.
The consumer is automatically restarted on reconnection.
func (*Session) SubscribeToWithTag ¶ added in v1.0.0
func (session *Session) SubscribeToWithTag(queue string, handler func(Delivery) error) (string, error)
SubscribeToWithTag registers a handler for the specified queue and returns the assigned consumer tag.
func (*Session) SubscribeWithTag ¶ added in v1.0.0
SubscribeWithTag registers a handler for the default queue and returns the assigned consumer tag.
func (*Session) UnsafePublish ¶ added in v0.7.0
UnsafePublish sends a message without waiting for a server confirmation. Returns an error if the session is not ready.
WARNING: This method does NOT guarantee delivery. The server may not receive the message due to network issues or buffer overflow.
WARNING: This method does NOT acquire publishMu and is NOT safe for concurrent use with Publish, PublishTo, PublishWithContext, or PublishToWithContext. Use it only when there are guaranteed no concurrent calls to Publish* methods.
func (*Session) UnsafePublishTo ¶ added in v1.0.0
UnsafePublishTo publishes a message without waiting for a server confirmation. Parameters are ordered as exchange, key, message to match PublishTo.
func (*Session) UnsafePublishToWithContext ¶ added in v1.0.0
func (session *Session) UnsafePublishToWithContext(ctx context.Context, exchange, key string, message []byte) error
UnsafePublishToWithContext publishes a message without waiting for a server confirmation and supports context cancellation.