Versions in this module Expand all Collapse all v0 v0.1.1 Apr 20, 2026 v0.1.0 Apr 19, 2026 Changes in this version + const ComponentName + const ConsumerTimeoutArg + const ExchangeDirect + const ExchangeFanout + const ExchangeHeaders + const ExchangeTopic + const ImmediatelyExpire + const NeverExpire + const PackageName + const Persistent + const QueueExclusiveArg + const QueueMaxLenArg + const QueueMaxLenBytesArg + const QueueMessageTTLArg + const QueueOverflowArg + const QueueOverflowDropHead + const QueueOverflowRejectPublish + const QueueOverflowRejectPublishDLX + const QueueTTLArg + const QueueTypeArg + const QueueTypeClassic + const QueueTypeQuorum + const QueueTypeStream + const QueueVersionArg + const SingleActiveConsumerArg + const StreamMaxAgeArg + const StreamMaxSegmentSizeBytesArg + const Transient + func DeclareExchangeAndQueue(ch *Channel, exchange, kind, queue string, routingKeys []string, durable bool) (amqp.Queue, error) + func DeclarePubSub(ch *Channel, exchange string) (amqp.Queue, error) + func DeclareWorkQueue(ch *Channel, name string) (amqp.Queue, error) + func ExtractTraceContext(ctx context.Context, headers amqp.Table) context.Context + func InjectTraceHeaders(ctx context.Context, headers amqp.Table) amqp.Table + func ReconnectLoop(ctx context.Context, client *Client, connect func() error) error + func SimpleConsume(client *Client, queue, consumerTag string, autoAck bool) (<-chan amqp.Delivery, error) + func SimplePublish(client *Client, exchange, routingKey string, body []byte) error + func SimpleRPC(client *Client, exchange, routingKey, replyTo string, body []byte, ...) ([]byte, error) + type Authentication struct + SASLMechanism string + TLS *TLSConfig + type BatchProducer struct + func NewBatchProducer(ch *Channel, exchange, routingKey string, maxSize int, opts ...ProducerOption) (*BatchProducer, error) + func (bp *BatchProducer) Add(msg amqp.Publishing) + func (bp *BatchProducer) Close() error + func (bp *BatchProducer) Flush() error + func (bp *BatchProducer) ShouldFlush() bool + func (bp *BatchProducer) Size() int + type Channel struct + func (ch *Channel) Ack(tag uint64, multiple bool) error + func (ch *Channel) Cancel(consumer string, noWait bool) error + func (ch *Channel) Close() error + func (ch *Channel) Confirm(noWait bool) error + func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, ...) (<-chan amqp.Delivery, error) + func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, ...) (<-chan amqp.Delivery, error) + func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error + func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error + func (ch *Channel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error + func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error + func (ch *Channel) ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error + func (ch *Channel) Flow(active bool) error + func (ch *Channel) Get(queue string, autoAck bool) (amqp.Delivery, bool, error) + func (ch *Channel) GetNextPublishSeqNo() uint64 + func (ch *Channel) IsClosed() bool + func (ch *Channel) Nack(tag uint64, multiple, requeue bool) error + func (ch *Channel) NotifyCancel() <-chan string + func (ch *Channel) NotifyClose() <-chan *amqp.Error + func (ch *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64) + func (ch *Channel) NotifyFlow() <-chan bool + func (ch *Channel) NotifyPublish() <-chan amqp.Confirmation + func (ch *Channel) NotifyReturn() <-chan amqp.Return + func (ch *Channel) Publish(exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) error + func (ch *Channel) PublishWithContext(ctx context.Context, exchange, routingKey string, mandatory, immediate bool, ...) error + func (ch *Channel) PublishWithDeferredConfirm(exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error) + func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, routingKey string, mandatory, immediate bool, ...) (*amqp.DeferredConfirmation, error) + func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error + func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error + func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) + func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) + func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error) + func (ch *Channel) QueueInspect(name string) (amqp.Queue, error) + func (ch *Channel) QueuePurge(name string, noWait bool) (int, error) + func (ch *Channel) QueueUnbind(name, key, exchange string, args amqp.Table) error + func (ch *Channel) RawChannel() *amqp.Channel + func (ch *Channel) Recover(requeue bool) error + func (ch *Channel) Reject(tag uint64, requeue bool) error + func (ch *Channel) Tx() error + func (ch *Channel) TxCommit() error + func (ch *Channel) TxRollback() error + type ChannelPool struct + func (p *ChannelPool) Acquire(ctx context.Context) (*amqp.Channel, func(), func(), error) + func (p *ChannelPool) Close() error + func (p *ChannelPool) IsClosed() bool + func (p *ChannelPool) Stats() PoolStats + type Client struct + func LoadInstance(name string) *Client + func MustNew(config Config, opts ...Option) *Client + func New(config Config, opts ...Option) (*Client, error) + func (c *Client) AcquireChannel(ctx context.Context) (*Channel, func(), error) + func (c *Client) Close() error + func (c *Client) Config() *Config + func (c *Client) GetLogger() Logger + func (c *Client) GetMetrics() MetricsCollector + func (c *Client) Health() bool + func (c *Client) HealthStatus() HealthInfo + func (c *Client) IsClosed() bool + func (c *Client) NewChannel() (*Channel, error) + func (c *Client) NotifyBlocked() <-chan amqp.Blocking + func (c *Client) NotifyClose() <-chan *Error + func (c *Client) Ping(ctx context.Context) error + func (c *Client) RawConnection() *amqp.Connection + func (c *Client) Reconnect() error + func (c *Client) Stats() PoolStats + type ClientInterface interface + GetLogger func() Logger + GetMetrics func() MetricsCollector + type Config struct + Addr string + ChannelMax uint16 + ChannelPoolMaxIdle int + ChannelPoolMaxLife time.Duration + ChannelPoolSize int + ClientName string + EnableAccessInterceptor bool + EnableMetricInterceptor bool + EnableTraceInterceptor bool + FrameSize int + Heartbeat time.Duration + Locale string + OnFail string + Password string + PoolMaxIdle int + PoolMaxLife time.Duration + PoolSize int + ReconnectInterval time.Duration + ReconnectMaxAttempts int + TLSCACert string + TLSCertFile string + TLSConfig *tls.Config + TLSKeyFile string + TLSServerName string + Username string + Vhost string + func DefaultConfig() Config + func (c *Config) Validate() error + func (c Config) ReconnectPolicy() ReconnectPolicy + type ConnectionPool struct + func (p *ConnectionPool) AcquireChannel(ctx context.Context) (*amqp.Channel, func(), error) + func (p *ConnectionPool) AcquireFromPool(ctx context.Context) (*amqp.Channel, func(), func(), error) + func (p *ConnectionPool) Close() error + func (p *ConnectionPool) GetConnection(idx int) *amqp.Connection + func (p *ConnectionPool) IsClosed() bool + func (p *ConnectionPool) Len() int + func (p *ConnectionPool) NotifyBlocked() <-chan amqp.Blocking + func (p *ConnectionPool) NotifyClose() <-chan *amqp.Error + func (p *ConnectionPool) Stats() PoolStats + type Consumer struct + func NewConsumer(ch *Channel, queue string, opts ...ConsumerOption) *Consumer + func (c *Consumer) Cancel(consumerTag string) error + func (c *Consumer) Close() error + func (c *Consumer) Consume(consumerTag string) (<-chan amqp.Delivery, error) + func (c *Consumer) ConsumeWithHandler(ctx context.Context, consumerTag string, handler MessageHandler) error + func (c *Consumer) ConsumeWithTimeout(consumerTag string, timeout time.Duration, handler MessageHandler) error + func (c *Consumer) ConsumeWithWorkers(ctx context.Context, consumerTag string, numWorkers int, ...) error + type ConsumerOption func(*consumerOptions) + func WithConsumerArgs(args amqp.Table) ConsumerOption + func WithConsumerAutoAck() ConsumerOption + func WithConsumerExclusive() ConsumerOption + type Container struct + func DefaultContainer() *Container + func Load(key string) *Container + func (c *Container) Build(options ...ContainerOption) *Client + func (c *Container) BuildE(options ...ContainerOption) (*Client, error) + type ContainerOption func(c *Container) + func WithOnFail(onFail string) ContainerOption + type EgoMetricsCollector struct + func NewEgoMetrics(name, peer string) *EgoMetricsCollector + func (m *EgoMetricsCollector) RecordChannelAcquired() + func (m *EgoMetricsCollector) RecordChannelReturned() + func (m *EgoMetricsCollector) RecordConnection(active bool) + func (m *EgoMetricsCollector) RecordConnectionError() + func (m *EgoMetricsCollector) RecordConsumeLatency(duration time.Duration) + func (m *EgoMetricsCollector) RecordMessageConfirmed() + func (m *EgoMetricsCollector) RecordMessageConsumed(size int) + func (m *EgoMetricsCollector) RecordMessageNacked() + func (m *EgoMetricsCollector) RecordMessagePublished(size int) + func (m *EgoMetricsCollector) RecordPublishLatency(duration time.Duration) + type Error struct + Component string + Op string + func (e *Error) Error() string + func (e *Error) IsRetryable() bool + func (e *Error) Unwrap() error + type HealthInfo struct + ConnectionsActive int + ConnectionsTotal int + Reason string + Status HealthStatus + type HealthStatus string + const HealthStatusClosed + const HealthStatusDegraded + const HealthStatusDown + const HealthStatusUp + type InstanceStats struct + Health HealthInfo + Pool PoolStats + type Logger interface + Debug func(msg string, keyvals ...any) + Error func(msg string, keyvals ...any) + Info func(msg string, keyvals ...any) + Warn func(msg string, keyvals ...any) + func NewEgoLogger(component *elog.Component) Logger + type MessageHandler func(delivery amqp.Delivery) error + type MetricsCollector interface + RecordChannelAcquired func() + RecordChannelReturned func() + RecordConnection func(active bool) + RecordConnectionError func() + RecordConsumeLatency func(duration time.Duration) + RecordMessageConfirmed func() + RecordMessageConsumed func(size int) + RecordMessageNacked func() + RecordMessagePublished func(size int) + RecordPublishLatency func(duration time.Duration) + type NoOpMetrics struct + func (m *NoOpMetrics) RecordChannelAcquired() + func (m *NoOpMetrics) RecordChannelReturned() + func (m *NoOpMetrics) RecordConnection(active bool) + func (m *NoOpMetrics) RecordConnectionError() + func (m *NoOpMetrics) RecordConsumeLatency(duration time.Duration) + func (m *NoOpMetrics) RecordMessageConfirmed() + func (m *NoOpMetrics) RecordMessageConsumed(size int) + func (m *NoOpMetrics) RecordMessageNacked() + func (m *NoOpMetrics) RecordMessagePublished(size int) + func (m *NoOpMetrics) RecordPublishLatency(duration time.Duration) + type NopLogger struct + func (NopLogger) Debug(msg string, keyvals ...any) + func (NopLogger) Error(msg string, keyvals ...any) + func (NopLogger) Info(msg string, keyvals ...any) + func (NopLogger) Warn(msg string, keyvals ...any) + type Option func(*Options) + func WithLogger(log Logger) Option + func WithMetrics(m MetricsCollector) Option + func WithOptions(opts *Options) Option + type Options struct + Auth []amqp.Authentication + ChannelOptions func(ch *amqp.Channel) error + ConnectionName string + Dial func(network, addr string) (net.Conn, error) + Logger Logger + Metrics MetricsCollector + OnChannelError func(channelID uint16, err error) + OnDisconnect func(err error) + OnReconnect func(attempt int) + type PoolStats struct + ChannelsAcquired int64 + ChannelsActive int + ChannelsReturned int64 + ConnectionsActive int + ConnectionsTotal int + Reconnects int64 + type Producer struct + func NewProducer(ch *Channel, opts ...ProducerOption) (*Producer, error) + func (p *Producer) Close() error + func (p *Producer) Publish(exchange, routingKey string, msg amqp.Publishing) error + func (p *Producer) PublishAsync(exchange, routingKey string, msg amqp.Publishing) (*amqp.DeferredConfirmation, error) + func (p *Producer) PublishWithContext(ctx context.Context, exchange, routingKey string, msg amqp.Publishing) error + func (p *Producer) PublishWithContextOptions(ctx context.Context, exchange, routingKey string, mandatory, immediate bool, ...) error + func (p *Producer) PublishWithOptions(exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) error + type ProducerOption func(*Producer) + func WithConfirm(timeout time.Duration) ProducerOption + type QueueArgs map[string]any + func NewQueueArgs() QueueArgs + func (a QueueArgs) WithDeadLetterExchange(dlx string) QueueArgs + func (a QueueArgs) WithDeadLetterRoutingKey(key string) QueueArgs + func (a QueueArgs) WithDurable(durable bool) QueueArgs + func (a QueueArgs) WithMaxLength(n int) QueueArgs + func (a QueueArgs) WithMaxLengthBytes(n int) QueueArgs + func (a QueueArgs) WithMessageTTL(ttl time.Duration) QueueArgs + func (a QueueArgs) WithOverflow(behavior string) QueueArgs + func (a QueueArgs) WithQueueTTL(ttl time.Duration) QueueArgs + func (a QueueArgs) WithQueueType(qt string) QueueArgs + func (a QueueArgs) WithSingleActiveConsumer() QueueArgs + type ReconnectPolicy struct + Enabled bool + Initial time.Duration + Max time.Duration + MaxAttempts int + Multiplier float64 + func DefaultReconnectPolicy() ReconnectPolicy + func (p ReconnectPolicy) Backoff(attempt int) time.Duration + type RetryConsumer struct + func NewRetryConsumer(ch *Channel, queue string, maxRetries int, retryDelay time.Duration, ...) (*RetryConsumer, error) + func (rc *RetryConsumer) ConsumeWithRetry(ctx context.Context, consumerTag string, handler MessageHandler) error + type TLSConfig struct + CACert string + CertFile string + Insecure bool + KeyFile string + func (c *TLSConfig) LoadTLSConfig() (*tls.Config, error)