Documentation
¶
Overview ¶
Package eamqp provides RabbitMQ integration for Ego framework.
Index ¶
- Constants
- func DeclareExchangeAndQueue(ch *Channel, exchange, kind, queue string, routingKeys []string, durable bool) (amqp.Queue, error)
- func DeclarePubSub(ch *Channel, exchange string) (amqp.Queue, error)
- func DeclareWorkQueue(ch *Channel, name string) (amqp.Queue, error)
- func ExtractTraceContext(ctx context.Context, headers amqp.Table) context.Context
- func InjectTraceHeaders(ctx context.Context, headers amqp.Table) amqp.Table
- func ReconnectLoop(ctx context.Context, client *Client, connect func() error) error
- func SimpleConsume(client *Client, queue, consumerTag string, autoAck bool) (<-chan amqp.Delivery, error)
- func SimplePublish(client *Client, exchange, routingKey string, body []byte) error
- func SimpleRPC(client *Client, exchange, routingKey, replyTo string, body []byte, ...) ([]byte, error)
- type Authentication
- type BatchProducer
- type Channel
- func (ch *Channel) Ack(tag uint64, multiple bool) error
- func (ch *Channel) Cancel(consumer string, noWait bool) error
- func (ch *Channel) Close() error
- func (ch *Channel) Confirm(noWait bool) error
- func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, ...) (<-chan amqp.Delivery, error)
- func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, ...) (<-chan amqp.Delivery, error)
- func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
- func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
- func (ch *Channel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
- func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error
- func (ch *Channel) ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error
- func (ch *Channel) Flow(active bool) error
- func (ch *Channel) Get(queue string, autoAck bool) (amqp.Delivery, bool, error)
- func (ch *Channel) GetNextPublishSeqNo() uint64
- func (ch *Channel) IsClosed() bool
- func (ch *Channel) Nack(tag uint64, multiple, requeue bool) error
- func (ch *Channel) NotifyCancel() <-chan string
- func (ch *Channel) NotifyClose() <-chan *amqp.Error
- func (ch *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64)
- func (ch *Channel) NotifyFlow() <-chan bool
- func (ch *Channel) NotifyPublish() <-chan amqp.Confirmation
- func (ch *Channel) NotifyReturn() <-chan amqp.Return
- func (ch *Channel) Publish(exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) error
- func (ch *Channel) PublishWithContext(ctx context.Context, exchange, routingKey string, mandatory, immediate bool, ...) error
- func (ch *Channel) PublishWithDeferredConfirm(exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)
- func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, routingKey string, mandatory, immediate bool, ...) (*amqp.DeferredConfirmation, error)
- func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
- func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
- func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
- func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
- func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
- func (ch *Channel) QueueInspect(name string) (amqp.Queue, error)
- func (ch *Channel) QueuePurge(name string, noWait bool) (int, error)
- func (ch *Channel) QueueUnbind(name, key, exchange string, args amqp.Table) error
- func (ch *Channel) RawChannel() *amqp.Channel
- func (ch *Channel) Recover(requeue bool) error
- func (ch *Channel) Reject(tag uint64, requeue bool) error
- func (ch *Channel) Tx() error
- func (ch *Channel) TxCommit() error
- func (ch *Channel) TxRollback() error
- type ChannelPool
- type Client
- func (c *Client) AcquireChannel(ctx context.Context) (*Channel, func(), error)
- func (c *Client) Close() error
- func (c *Client) Config() *Config
- func (c *Client) GetLogger() Logger
- func (c *Client) GetMetrics() MetricsCollector
- func (c *Client) Health() bool
- func (c *Client) HealthStatus() HealthInfo
- func (c *Client) IsClosed() bool
- func (c *Client) NewChannel() (*Channel, error)
- func (c *Client) NotifyBlocked() <-chan amqp.Blocking
- func (c *Client) NotifyClose() <-chan *Error
- func (c *Client) Ping(ctx context.Context) error
- func (c *Client) RawConnection() *amqp.Connection
- func (c *Client) Reconnect() error
- func (c *Client) Stats() PoolStats
- type ClientInterface
- type Config
- type ConnectionPool
- func (p *ConnectionPool) AcquireChannel(ctx context.Context) (*amqp.Channel, func(), error)
- func (p *ConnectionPool) AcquireFromPool(ctx context.Context) (*amqp.Channel, func(), func(), error)
- func (p *ConnectionPool) Close() error
- func (p *ConnectionPool) GetConnection(idx int) *amqp.Connection
- func (p *ConnectionPool) IsClosed() bool
- func (p *ConnectionPool) Len() int
- func (p *ConnectionPool) NotifyBlocked() <-chan amqp.Blocking
- func (p *ConnectionPool) NotifyClose() <-chan *amqp.Error
- func (p *ConnectionPool) Stats() PoolStats
- type Consumer
- func (c *Consumer) Cancel(consumerTag string) error
- func (c *Consumer) Close() error
- func (c *Consumer) Consume(consumerTag string) (<-chan amqp.Delivery, error)
- func (c *Consumer) ConsumeWithHandler(ctx context.Context, consumerTag string, handler MessageHandler) error
- func (c *Consumer) ConsumeWithTimeout(consumerTag string, timeout time.Duration, handler MessageHandler) error
- func (c *Consumer) ConsumeWithWorkers(ctx context.Context, consumerTag string, numWorkers int, ...) error
- type ConsumerOption
- type Container
- type ContainerOption
- type EgoMetricsCollector
- func (m *EgoMetricsCollector) RecordChannelAcquired()
- func (m *EgoMetricsCollector) RecordChannelReturned()
- func (m *EgoMetricsCollector) RecordConnection(active bool)
- func (m *EgoMetricsCollector) RecordConnectionError()
- func (m *EgoMetricsCollector) RecordConsumeLatency(duration time.Duration)
- func (m *EgoMetricsCollector) RecordMessageConfirmed()
- func (m *EgoMetricsCollector) RecordMessageConsumed(size int)
- func (m *EgoMetricsCollector) RecordMessageNacked()
- func (m *EgoMetricsCollector) RecordMessagePublished(size int)
- func (m *EgoMetricsCollector) RecordPublishLatency(duration time.Duration)
- type Error
- type HealthInfo
- type HealthStatus
- type InstanceStats
- type Logger
- type MessageHandler
- type MetricsCollector
- type NoOpMetrics
- func (m *NoOpMetrics) RecordChannelAcquired()
- func (m *NoOpMetrics) RecordChannelReturned()
- func (m *NoOpMetrics) RecordConnection(active bool)
- func (m *NoOpMetrics) RecordConnectionError()
- func (m *NoOpMetrics) RecordConsumeLatency(duration time.Duration)
- func (m *NoOpMetrics) RecordMessageConfirmed()
- func (m *NoOpMetrics) RecordMessageConsumed(size int)
- func (m *NoOpMetrics) RecordMessageNacked()
- func (m *NoOpMetrics) RecordMessagePublished(size int)
- func (m *NoOpMetrics) RecordPublishLatency(duration time.Duration)
- type NopLogger
- type Option
- type Options
- type PoolStats
- type Producer
- func (p *Producer) Close() error
- func (p *Producer) Publish(exchange, routingKey string, msg amqp.Publishing) error
- func (p *Producer) PublishAsync(exchange, routingKey string, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)
- func (p *Producer) PublishWithContext(ctx context.Context, exchange, routingKey string, msg amqp.Publishing) error
- func (p *Producer) PublishWithContextOptions(ctx context.Context, exchange, routingKey string, mandatory, immediate bool, ...) error
- func (p *Producer) PublishWithOptions(exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) error
- type ProducerOption
- type QueueArgs
- func (a QueueArgs) WithDeadLetterExchange(dlx string) QueueArgs
- func (a QueueArgs) WithDeadLetterRoutingKey(key string) QueueArgs
- func (a QueueArgs) WithDurable(durable bool) QueueArgsdeprecated
- func (a QueueArgs) WithMaxLength(n int) QueueArgs
- func (a QueueArgs) WithMaxLengthBytes(n int) QueueArgs
- func (a QueueArgs) WithMessageTTL(ttl time.Duration) QueueArgs
- func (a QueueArgs) WithOverflow(behavior string) QueueArgs
- func (a QueueArgs) WithQueueTTL(ttl time.Duration) QueueArgs
- func (a QueueArgs) WithQueueType(qt string) QueueArgs
- func (a QueueArgs) WithSingleActiveConsumer() QueueArgs
- type ReconnectPolicy
- type RetryConsumer
- type TLSConfig
Constants ¶
const ( ExchangeDirect = "direct" ExchangeFanout = "fanout" ExchangeTopic = "topic" ExchangeHeaders = "headers" )
Exchange types.
const ( // Transient messages are lost on broker restart. Transient uint8 = 1 // Persistent messages survive broker restart. Persistent uint8 = 2 )
Delivery modes.
const ( QueueTypeArg = "x-queue-type" QueueMaxLenArg = "x-max-length" QueueMaxLenBytesArg = "x-max-length-bytes" QueueOverflowArg = "x-overflow" QueueMessageTTLArg = "x-message-ttl" QueueTTLArg = "x-expires" StreamMaxAgeArg = "x-max-age" StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes" QueueVersionArg = "x-queue-version" ConsumerTimeoutArg = "x-consumer-timeout" SingleActiveConsumerArg = "x-single-active-consumer" QueueExclusiveArg = "x-exclusive" )
Queue argument keys.
const ( QueueTypeClassic = "classic" QueueTypeQuorum = "quorum" QueueTypeStream = "stream" )
Queue type values.
const ( QueueOverflowDropHead = "drop-head" QueueOverflowRejectPublish = "reject-publish" QueueOverflowRejectPublishDLX = "reject-publish-dlx" )
Overflow behavior values.
const ( NeverExpire = "" ImmediatelyExpire = "0" )
Expiration constants.
const ComponentName = "eamqp"
ComponentName is kept as a short component type label for AMQP metrics.
const PackageName = "component.eamqp"
PackageName is the component identifier used by Ego logging and metrics.
Variables ¶
This section is empty.
Functions ¶
func DeclareExchangeAndQueue ¶
func DeclareExchangeAndQueue(ch *Channel, exchange, kind, queue string, routingKeys []string, durable bool) (amqp.Queue, error)
DeclareExchangeAndQueue declares an exchange and queue with binding.
func DeclarePubSub ¶
DeclarePubSub declares a fanout exchange with a unique queue.
func DeclareWorkQueue ¶
DeclareWorkQueue declares a durable work queue.
func ExtractTraceContext ¶
ExtractTraceContext extracts trace context from AMQP headers.
func InjectTraceHeaders ¶
InjectTraceHeaders injects the current trace context into AMQP headers.
func ReconnectLoop ¶
ReconnectLoop runs a reconnection loop, calling connect on each attempt. It blocks until the context is cancelled or reconnection is disabled.
func SimpleConsume ¶
func SimpleConsume(client *Client, queue, consumerTag string, autoAck bool) (<-chan amqp.Delivery, error)
SimpleConsume returns a delivery channel for simple consumption.
func SimplePublish ¶
SimplePublish publishes a single message.
func SimpleRPC ¶
func SimpleRPC(client *Client, exchange, routingKey, replyTo string, body []byte, timeout time.Duration) ([]byte, error)
SimpleRPC performs a simple RPC-style request/response. It is a lightweight helper: the reply is acknowledged before the response body is returned, so callers that need acknowledgement control should build on Channel.Consume directly or use a higher-level RPC abstraction.
Types ¶
type Authentication ¶
type Authentication struct {
// TLS configuration
TLS *TLSConfig
// SASL mechanism: "plain", "external", or empty for no SASL.
SASLMechanism string `json:"saslMechanism" toml:"saslMechanism"`
}
Authentication holds TLS and credentials configuration.
type BatchProducer ¶
type BatchProducer struct {
// contains filtered or unexported fields
}
BatchProducer provides batch publishing support.
func NewBatchProducer ¶
func NewBatchProducer(ch *Channel, exchange, routingKey string, maxSize int, opts ...ProducerOption) (*BatchProducer, error)
NewBatchProducer creates a batch producer.
func (*BatchProducer) Add ¶
func (bp *BatchProducer) Add(msg amqp.Publishing)
Add adds a message to the batch. It does not block or fail when the batch reaches maxSize; callers should use ShouldFlush after Add and call Flush when it returns true.
func (*BatchProducer) Close ¶
func (bp *BatchProducer) Close() error
Close closes the batch producer.
func (*BatchProducer) Flush ¶
func (bp *BatchProducer) Flush() error
Flush publishes all batched messages.
func (*BatchProducer) ShouldFlush ¶
func (bp *BatchProducer) ShouldFlush() bool
ShouldFlush returns true if the batch should be flushed.
func (*BatchProducer) Size ¶
func (bp *BatchProducer) Size() int
Size returns the current batch size.
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel wraps amqp.Channel with ego integration.
func (*Channel) Consume ¶
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
Consume starts consuming messages from a queue.
func (*Channel) ConsumeWithContext ¶
func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
ConsumeWithContext starts consuming messages with context.
func (*Channel) ExchangeBind ¶
func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
ExchangeBind binds an exchange to another exchange.
func (*Channel) ExchangeDeclare ¶
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
ExchangeDeclare declares an exchange.
func (*Channel) ExchangeDeclarePassive ¶
func (ch *Channel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
ExchangeDeclarePassive declares an exchange (passive = check existence only).
func (*Channel) ExchangeDelete ¶
ExchangeDelete deletes an exchange.
func (*Channel) ExchangeUnbind ¶
func (ch *Channel) ExchangeUnbind(destination, key, source string, noWait bool, args amqp.Table) error
ExchangeUnbind unbinds an exchange from another exchange.
func (*Channel) GetNextPublishSeqNo ¶
GetNextPublishSeqNo returns the sequence number for the next publish.
func (*Channel) NotifyCancel ¶
NotifyCancel returns a channel that receives consumer cancel notifications.
func (*Channel) NotifyClose ¶
NotifyClose returns a channel that receives close notifications. The returned channel must be consumed until it is closed, matching amqp091-go's asynchronous notification contract.
func (*Channel) NotifyConfirm ¶
NotifyConfirm returns ack and nack channels for publisher confirms.
func (*Channel) NotifyFlow ¶
NotifyFlow returns a channel that receives flow control notifications.
func (*Channel) NotifyPublish ¶
func (ch *Channel) NotifyPublish() <-chan amqp.Confirmation
NotifyPublish returns a channel that receives publish confirmations.
func (*Channel) NotifyReturn ¶
NotifyReturn returns a channel that receives undeliverable messages.
func (*Channel) Publish ¶
func (ch *Channel) Publish(exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) error
Publish publishes a message.
func (*Channel) PublishWithContext ¶
func (ch *Channel) PublishWithContext(ctx context.Context, exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) error
PublishWithContext publishes a message with context. Context currently carries trace propagation into AMQP headers; amqp091-go v1.9.0 does not guarantee publish timeout or cancellation from this context. Use publisher confirms when the caller must know whether the broker accepted the message.
func (*Channel) PublishWithDeferredConfirm ¶
func (ch *Channel) PublishWithDeferredConfirm(exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)
PublishWithDeferredConfirm publishes a message and returns a deferred confirmation.
func (*Channel) PublishWithDeferredConfirmWithContext ¶
func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)
PublishWithDeferredConfirmWithContext publishes and returns a deferred confirm. Context currently carries trace propagation into AMQP headers; amqp091-go v1.9.0 also accepts the context for future cancellation semantics.
func (*Channel) QueueDeclare ¶
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
QueueDeclare declares a queue.
func (*Channel) QueueDeclarePassive ¶
func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
QueueDeclarePassive declares a queue (passive = check existence only).
func (*Channel) QueueDelete ¶
QueueDelete deletes a queue.
func (*Channel) QueueInspect ¶
QueueInspect passively inspects a queue by name.
func (*Channel) QueuePurge ¶
QueuePurge purges all messages from a queue.
func (*Channel) QueueUnbind ¶
QueueUnbind unbinds a queue from an exchange.
func (*Channel) RawChannel ¶
RawChannel returns the underlying AMQP channel. The wrapper treats raw access as stateful and will discard pooled channels on Close.
func (*Channel) TxRollback ¶
TxRollback rolls back a transaction.
type ChannelPool ¶
type ChannelPool struct {
// contains filtered or unexported fields
}
ChannelPool manages a pool of AMQP channels for efficient reuse.
func (*ChannelPool) Close ¶
func (p *ChannelPool) Close() error
Close closes all channels in the pool.
func (*ChannelPool) IsClosed ¶
func (p *ChannelPool) IsClosed() bool
IsClosed returns true if the pool is closed.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the main AMQP client.
func LoadInstance ¶
LoadInstance returns a built client by component name.
func (*Client) AcquireChannel ¶
AcquireChannel acquires a channel from the pool (pool mode only).
func (*Client) GetMetrics ¶
func (c *Client) GetMetrics() MetricsCollector
GetMetrics returns the metrics collector.
func (*Client) Health ¶
Health returns true when the client has at least one usable AMQP connection.
func (*Client) HealthStatus ¶
func (c *Client) HealthStatus() HealthInfo
HealthStatus returns an in-memory health snapshot without opening a channel.
func (*Client) NewChannel ¶
NewChannel creates a new channel.
func (*Client) NotifyBlocked ¶
NotifyBlocked returns RabbitMQ connection blocked/unblocked notifications.
func (*Client) NotifyClose ¶
NotifyClose returns a channel that receives close notifications. The returned channel must be consumed until it is closed, matching amqp091-go's asynchronous notification contract.
func (*Client) RawConnection ¶
func (c *Client) RawConnection() *amqp.Connection
RawConnection returns the underlying AMQP connection.
type ClientInterface ¶
type ClientInterface interface {
GetLogger() Logger
GetMetrics() MetricsCollector
}
ClientInterface defines the methods needed by Channel.
type Config ¶
type Config struct {
// Addr is the AMQP URI(s). Multiple URIs separated by comma enable
// basic load balancing. Use amqps:// for TLS.
// Environment variable: EGO_CONFIG_AMQP_ADDR
Addr string `json:"addr" toml:"addr"`
// Vhost overrides the virtual host from Addr. Empty keeps the URI vhost.
Vhost string `json:"vhost" toml:"vhost"`
// TLS configuration.
TLSConfig *tls.Config `json:"-" toml:"-"`
// TLSFileConfig enables file-based TLS. Alternative to TLSConfig.
TLSCertFile string `json:"tlsCertFile" toml:"tlsCertFile"`
TLSKeyFile string `json:"tlsKeyFile" toml:"tlsKeyFile"`
TLSCACert string `json:"tlsCaCert" toml:"tlsCaCert"`
TLSServerName string `json:"tlsServerName" toml:"tlsServerName"`
// Auth overrides credentials from Addr.
Username string `json:"username" toml:"username"`
Password string `json:"password" toml:"password"`
// Tuning parameters.
Heartbeat time.Duration `json:"heartbeat" toml:"heartbeat"`
ChannelMax uint16 `json:"channelMax" toml:"channelMax"`
FrameSize int `json:"frameSize" toml:"frameSize"`
Locale string `json:"locale" toml:"locale"`
// Connection pool (0 = single connection, N = pool size).
PoolSize int `json:"poolSize" toml:"poolSize"`
PoolMaxIdle int `json:"poolMaxIdle" toml:"poolMaxIdle"`
PoolMaxLife time.Duration `json:"poolMaxLife" toml:"poolMaxLife"`
// Channel pool (0 = single channel, N = pool size per connection).
ChannelPoolSize int `json:"channelPoolSize" toml:"channelPoolSize"`
ChannelPoolMaxIdle int `json:"channelPoolMaxIdle" toml:"channelPoolMaxIdle"`
ChannelPoolMaxLife time.Duration `json:"channelPoolMaxLife" toml:"channelPoolMaxLife"`
// Manual reconnect policy. These fields configure helper policies only;
// the component does not run background reconnect or topology recovery.
ReconnectInterval time.Duration `json:"reconnectInterval" toml:"reconnectInterval"`
ReconnectMaxAttempts int `json:"reconnectMaxAttempts" toml:"reconnectMaxAttempts"`
// Observability.
EnableAccessInterceptor bool `json:"enableAccessInterceptor" toml:"enableAccessInterceptor"`
EnableMetricInterceptor bool `json:"enableMetricInterceptor" toml:"enableMetricInterceptor"`
EnableTraceInterceptor bool `json:"enableTraceInterceptor" toml:"enableTraceInterceptor"`
// Debug.
ClientName string `json:"clientName" toml:"clientName"`
// OnFail controls startup failure behavior: "panic" or "error".
OnFail string `json:"onFail" toml:"onFail"`
}
Config holds all configuration for the AMQP client.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a config with sensible defaults.
func (Config) ReconnectPolicy ¶
func (c Config) ReconnectPolicy() ReconnectPolicy
ReconnectPolicy returns the explicit reconnect helper policy derived from Config.
type ConnectionPool ¶
type ConnectionPool struct {
// contains filtered or unexported fields
}
ConnectionPool manages multiple AMQP connections for high availability.
func (*ConnectionPool) AcquireChannel ¶
AcquireChannel gets a channel from the next connection (round-robin).
func (*ConnectionPool) AcquireFromPool ¶
func (p *ConnectionPool) AcquireFromPool(ctx context.Context) (*amqp.Channel, func(), func(), error)
AcquireFromPool gets a channel from the pool of the next connection.
func (*ConnectionPool) GetConnection ¶
func (p *ConnectionPool) GetConnection(idx int) *amqp.Connection
GetConnection returns the connection at the given index.
func (*ConnectionPool) IsClosed ¶
func (p *ConnectionPool) IsClosed() bool
IsClosed returns true if the pool is closed.
func (*ConnectionPool) Len ¶
func (p *ConnectionPool) Len() int
Len returns the number of connections in the pool.
func (*ConnectionPool) NotifyBlocked ¶
func (p *ConnectionPool) NotifyBlocked() <-chan amqp.Blocking
NotifyBlocked returns a channel that receives RabbitMQ blocked notifications.
func (*ConnectionPool) NotifyClose ¶
func (p *ConnectionPool) NotifyClose() <-chan *amqp.Error
NotifyClose returns a channel that receives connection close notifications.
func (*ConnectionPool) Stats ¶
func (p *ConnectionPool) Stats() PoolStats
Stats returns connection pool statistics.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer provides high-level message consumption.
func NewConsumer ¶
func NewConsumer(ch *Channel, queue string, opts ...ConsumerOption) *Consumer
NewConsumer creates a new consumer.
func (*Consumer) ConsumeWithHandler ¶
func (c *Consumer) ConsumeWithHandler(ctx context.Context, consumerTag string, handler MessageHandler) error
ConsumeWithHandler starts consuming and processes messages with a handler.
func (*Consumer) ConsumeWithTimeout ¶
func (c *Consumer) ConsumeWithTimeout(consumerTag string, timeout time.Duration, handler MessageHandler) error
ConsumeWithTimeout starts consuming with per-message timeout.
func (*Consumer) ConsumeWithWorkers ¶
func (c *Consumer) ConsumeWithWorkers(ctx context.Context, consumerTag string, numWorkers int, handler MessageHandler) error
ConsumeWithWorkers starts consuming with a worker pool.
type ConsumerOption ¶
type ConsumerOption func(*consumerOptions)
ConsumerOption configures a consumer.
func WithConsumerArgs ¶
func WithConsumerArgs(args amqp.Table) ConsumerOption
WithConsumerArgs sets consumer arguments.
func WithConsumerAutoAck ¶
func WithConsumerAutoAck() ConsumerOption
WithConsumerAutoAck sets auto-acknowledge mode.
func WithConsumerExclusive ¶
func WithConsumerExclusive() ConsumerOption
WithConsumerExclusive sets exclusive consumer.
type Container ¶
type Container struct {
// contains filtered or unexported fields
}
Container wraps the Config and ego logging for configuration-driven initialization.
func DefaultContainer ¶
func DefaultContainer() *Container
DefaultContainer returns a Container with default configuration.
func Load ¶
Load loads configuration from ego's config manager and returns a Container. The key parameter corresponds to the config section name (e.g., "eamqp").
func (*Container) Build ¶
func (c *Container) Build(options ...ContainerOption) *Client
Build constructs the AMQP Client from the loaded configuration. It applies all configured interceptors and handles connection errors according to OnFail.
type ContainerOption ¶
type ContainerOption func(c *Container)
ContainerOption configures the Container.
func WithOnFail ¶
func WithOnFail(onFail string) ContainerOption
WithOnFail sets the failure behavior: "panic" (default) or "error".
type EgoMetricsCollector ¶
type EgoMetricsCollector struct {
// contains filtered or unexported fields
}
EgoMetricsCollector records AMQP client metrics using Ego's shared metrics.
func NewEgoMetrics ¶
func NewEgoMetrics(name, peer string) *EgoMetricsCollector
NewEgoMetrics creates a MetricsCollector backed by Ego emetric.
func (*EgoMetricsCollector) RecordChannelAcquired ¶
func (m *EgoMetricsCollector) RecordChannelAcquired()
func (*EgoMetricsCollector) RecordChannelReturned ¶
func (m *EgoMetricsCollector) RecordChannelReturned()
func (*EgoMetricsCollector) RecordConnection ¶
func (m *EgoMetricsCollector) RecordConnection(active bool)
func (*EgoMetricsCollector) RecordConnectionError ¶
func (m *EgoMetricsCollector) RecordConnectionError()
func (*EgoMetricsCollector) RecordConsumeLatency ¶
func (m *EgoMetricsCollector) RecordConsumeLatency(duration time.Duration)
func (*EgoMetricsCollector) RecordMessageConfirmed ¶
func (m *EgoMetricsCollector) RecordMessageConfirmed()
func (*EgoMetricsCollector) RecordMessageConsumed ¶
func (m *EgoMetricsCollector) RecordMessageConsumed(size int)
func (*EgoMetricsCollector) RecordMessageNacked ¶
func (m *EgoMetricsCollector) RecordMessageNacked()
func (*EgoMetricsCollector) RecordMessagePublished ¶
func (m *EgoMetricsCollector) RecordMessagePublished(size int)
func (*EgoMetricsCollector) RecordPublishLatency ¶
func (m *EgoMetricsCollector) RecordPublishLatency(duration time.Duration)
type Error ¶
type Error struct {
Component string // "connection", "channel", "publish", "consume"
Op string // Operation name
// contains filtered or unexported fields
}
Error wraps amqp.Error with component context.
func (*Error) IsRetryable ¶
IsRetryable returns true if the error is temporary and can be retried.
type HealthInfo ¶
type HealthInfo struct {
Status HealthStatus
Reason string
ConnectionsActive int
ConnectionsTotal int
}
HealthInfo contains a lightweight health snapshot for readiness checks.
type HealthStatus ¶
type HealthStatus string
HealthStatus describes the lightweight client health state.
const ( // HealthStatusUp means all known AMQP connections are open. HealthStatusUp HealthStatus = "up" // HealthStatusDegraded means at least one pooled connection is open, but not all are healthy. HealthStatusDegraded HealthStatus = "degraded" // HealthStatusDown means the client has no usable AMQP connection. HealthStatusDown HealthStatus = "down" // HealthStatusClosed means the client or connection pool was explicitly closed. HealthStatusClosed HealthStatus = "closed" )
type InstanceStats ¶
type InstanceStats struct {
Health HealthInfo `json:"health"`
Pool PoolStats `json:"pool"`
}
InstanceStats is the governor/debug snapshot for a built eamqp client.
type Logger ¶
type Logger interface {
Debug(msg string, keyvals ...any)
Info(msg string, keyvals ...any)
Warn(msg string, keyvals ...any)
Error(msg string, keyvals ...any)
}
Logger interface for structured logging.
func NewEgoLogger ¶
NewEgoLogger adapts an Ego logger component to the eamqp Logger interface.
type MessageHandler ¶
MessageHandler is a function that handles a delivery.
type MetricsCollector ¶
type MetricsCollector interface {
RecordConnection(active bool)
RecordConnectionError()
RecordChannelAcquired()
RecordChannelReturned()
RecordMessagePublished(size int)
RecordMessageConfirmed()
RecordMessageNacked()
RecordMessageConsumed(size int)
RecordPublishLatency(duration time.Duration)
RecordConsumeLatency(duration time.Duration)
}
MetricsCollector collects AMQP metrics.
type NoOpMetrics ¶
type NoOpMetrics struct{}
NoOpMetrics is a no-op metrics collector.
func (*NoOpMetrics) RecordChannelAcquired ¶
func (m *NoOpMetrics) RecordChannelAcquired()
func (*NoOpMetrics) RecordChannelReturned ¶
func (m *NoOpMetrics) RecordChannelReturned()
func (*NoOpMetrics) RecordConnection ¶
func (m *NoOpMetrics) RecordConnection(active bool)
func (*NoOpMetrics) RecordConnectionError ¶
func (m *NoOpMetrics) RecordConnectionError()
func (*NoOpMetrics) RecordConsumeLatency ¶
func (m *NoOpMetrics) RecordConsumeLatency(duration time.Duration)
func (*NoOpMetrics) RecordMessageConfirmed ¶
func (m *NoOpMetrics) RecordMessageConfirmed()
func (*NoOpMetrics) RecordMessageConsumed ¶
func (m *NoOpMetrics) RecordMessageConsumed(size int)
func (*NoOpMetrics) RecordMessageNacked ¶
func (m *NoOpMetrics) RecordMessageNacked()
func (*NoOpMetrics) RecordMessagePublished ¶
func (m *NoOpMetrics) RecordMessagePublished(size int)
func (*NoOpMetrics) RecordPublishLatency ¶
func (m *NoOpMetrics) RecordPublishLatency(duration time.Duration)
type Option ¶
type Option func(*Options)
Option configures the client.
func WithMetrics ¶
func WithMetrics(m MetricsCollector) Option
WithMetrics sets a custom metrics collector.
type Options ¶
type Options struct {
// Dial is a custom dial function. If set, it is used instead of net.Dial.
// The addr parameter is the host:port from the URI.
Dial func(network, addr string) (net.Conn, error)
// Auth specifies SASL authentication mechanisms.
// If set, this overrides the default PLAIN auth.
Auth []amqp.Authentication
// ConnectionName sets the RabbitMQ connection name for management UI.
ConnectionName string
// ChannelOptions is called after each raw channel is opened.
// In pooled mode it runs when the raw channel is created, not on every checkout.
ChannelOptions func(ch *amqp.Channel) error
// OnReconnect is reserved for a future lifecycle supervisor.
// Client.Reconnect is explicit and does not invoke this callback today.
OnReconnect func(attempt int)
// OnDisconnect is reserved for a future lifecycle supervisor.
// Use NotifyClose today to observe connection close events.
OnDisconnect func(err error)
// OnChannelError is reserved for a future lifecycle supervisor.
// Use Channel.NotifyClose today to observe channel close events.
OnChannelError func(channelID uint16, err error)
// Logger overrides the component logger.
Logger Logger
// Metrics overrides the component metrics collector.
Metrics MetricsCollector
}
Options holds optional configuration for the AMQP client.
type PoolStats ¶
type PoolStats struct {
ConnectionsActive int
ConnectionsTotal int
ChannelsActive int
ChannelsAcquired int64
ChannelsReturned int64
Reconnects int64
}
PoolStats holds connection/channel pool statistics.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer provides high-level publishing with confirms support.
func NewProducer ¶
func NewProducer(ch *Channel, opts ...ProducerOption) (*Producer, error)
NewProducer creates a new producer.
func (*Producer) Publish ¶
func (p *Producer) Publish(exchange, routingKey string, msg amqp.Publishing) error
Publish publishes a message.
func (*Producer) PublishAsync ¶
func (p *Producer) PublishAsync(exchange, routingKey string, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)
PublishAsync publishes without waiting for confirm.
func (*Producer) PublishWithContext ¶
func (p *Producer) PublishWithContext(ctx context.Context, exchange, routingKey string, msg amqp.Publishing) error
PublishWithContext publishes with context.
func (*Producer) PublishWithContextOptions ¶
func (p *Producer) PublishWithContextOptions(ctx context.Context, exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) error
PublishWithContextOptions publishes with context and full options.
func (*Producer) PublishWithOptions ¶
func (p *Producer) PublishWithOptions(exchange, routingKey string, mandatory, immediate bool, msg amqp.Publishing) error
PublishWithOptions publishes a message with full control.
type ProducerOption ¶
type ProducerOption func(*Producer)
ProducerOption configures a producer.
func WithConfirm ¶
func WithConfirm(timeout time.Duration) ProducerOption
WithConfirm enables publisher confirms with the given timeout.
type QueueArgs ¶
QueueArgs holds optional arguments for QueueDeclare.
func NewQueueArgs ¶
func NewQueueArgs() QueueArgs
NewQueueArgs creates a new QueueArgs with common defaults.
func (QueueArgs) WithDeadLetterExchange ¶
WithDeadLetterExchange sets the dead letter exchange.
func (QueueArgs) WithDeadLetterRoutingKey ¶
WithDeadLetterRoutingKey sets the dead letter routing key.
func (QueueArgs) WithDurable
deprecated
func (QueueArgs) WithMaxLength ¶
WithMaxLength sets the maximum number of messages.
func (QueueArgs) WithMaxLengthBytes ¶
WithMaxLengthBytes sets the maximum total body size.
func (QueueArgs) WithMessageTTL ¶
WithMessageTTL sets the per-message TTL in milliseconds.
func (QueueArgs) WithOverflow ¶
WithOverflow sets the overflow behavior.
func (QueueArgs) WithQueueTTL ¶
WithQueueTTL sets the queue TTL (auto-delete after idle time).
func (QueueArgs) WithQueueType ¶
WithQueueType sets the queue type (classic, quorum, stream).
func (QueueArgs) WithSingleActiveConsumer ¶
WithSingleActiveConsumer enables single active consumer.
type ReconnectPolicy ¶
type ReconnectPolicy struct {
Enabled bool
Initial time.Duration // Initial backoff interval
Max time.Duration // Maximum backoff interval
Multiplier float64 // Backoff multiplier
MaxAttempts int // 0 = infinite
}
ReconnectPolicy defines the reconnection strategy.
func DefaultReconnectPolicy ¶
func DefaultReconnectPolicy() ReconnectPolicy
DefaultReconnectPolicy returns the default reconnection policy.
type RetryConsumer ¶
type RetryConsumer struct {
// contains filtered or unexported fields
}
RetryConsumer provides consumption with automatic retry.
func NewRetryConsumer ¶
func NewRetryConsumer(ch *Channel, queue string, maxRetries int, retryDelay time.Duration, opts ...ConsumerOption) (*RetryConsumer, error)
NewRetryConsumer creates a retry consumer.
func (*RetryConsumer) ConsumeWithRetry ¶
func (rc *RetryConsumer) ConsumeWithRetry(ctx context.Context, consumerTag string, handler MessageHandler) error
ConsumeWithRetry starts consuming with automatic retry.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
batch-producer
command
batch-producer demonstrates the high-level BatchProducer helper with publisher confirms enabled.
|
batch-producer demonstrates the high-level BatchProducer helper with publisher confirms enabled. |
|
connection-pool
command
connection-pool demonstrates high-availability connection pooling with round-robin load distribution across multiple connections.
|
connection-pool demonstrates high-availability connection pooling with round-robin load distribution across multiple connections. |
|
consumer
command
consumer example demonstrates consuming with manual acks.
|
consumer example demonstrates consuming with manual acks. |
|
dead-letter
command
dead-letter demonstrates Dead Letter Exchange (DLX) and Dead Letter Queue (DLQ) patterns for handling failed message processing.
|
dead-letter demonstrates Dead Letter Exchange (DLX) and Dead Letter Queue (DLQ) patterns for handling failed message processing. |
|
producer
command
producer example demonstrates publishing with confirms.
|
producer example demonstrates publishing with confirms. |
|
producer-confirm
command
producer-confirm demonstrates the high-level Producer API with publisher confirms.
|
producer-confirm demonstrates the high-level Producer API with publisher confirms. |
|
pubsub
command
pubsub example demonstrates a simple publish/subscribe pattern with RabbitMQ.
|
pubsub example demonstrates a simple publish/subscribe pattern with RabbitMQ. |
|
pubsub-fanout
command
pubsub-fanout demonstrates the fanout exchange pattern where each subscriber gets its own copy of every message.
|
pubsub-fanout demonstrates the fanout exchange pattern where each subscriber gets its own copy of every message. |
|
qos
command
qos demonstrates Quality of Service (QoS) settings for consumer-side message prefetching.
|
qos demonstrates Quality of Service (QoS) settings for consumer-side message prefetching. |
|
reconnect
command
reconnect demonstrates connection close notifications and the explicit reconnect boundary.
|
reconnect demonstrates connection close notifications and the explicit reconnect boundary. |
|
retry-consumer-listener
command
listener demonstrates automatic message retry with configurable max attempts using a worker pool for concurrent processing.
|
listener demonstrates automatic message retry with configurable max attempts using a worker pool for concurrent processing. |
|
retry-consumer-sender
command
sender publishes test messages to the retry queue.
|
sender publishes test messages to the retry queue. |
|
rpc
command
rpc demonstrates RPC-style request/response over RabbitMQ using the Direct Reply-To pattern.
|
rpc demonstrates RPC-style request/response over RabbitMQ using the Direct Reply-To pattern. |
|
transaction
command
transaction demonstrates AMQP channel transactions.
|
transaction demonstrates AMQP channel transactions. |
|
workqueue-publisher
command
publisher sends tasks to the workqueue.
|
publisher sends tasks to the workqueue. |
|
workqueue-worker
command
worker is one of the workqueue example.
|
worker is one of the workqueue example. |