Documentation
¶
Overview ¶
Package messaging provides a unified interface for message queue operations. It abstracts the underlying messaging implementation to allow for easy testing and future extensibility.
Index ¶
- type AMQPClient
- type AMQPClientImpl
- func (c *AMQPClientImpl) BindQueue(queue, exchange, routingKey string, noWait bool) error
- func (c *AMQPClientImpl) Close() error
- func (c *AMQPClientImpl) Consume(ctx context.Context, destination string) (<-chan amqp.Delivery, error)
- func (c *AMQPClientImpl) ConsumeFromQueue(_ context.Context, options ConsumeOptions) (<-chan amqp.Delivery, error)
- func (c *AMQPClientImpl) DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool) error
- func (c *AMQPClientImpl) DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool) error
- func (c *AMQPClientImpl) IsReady() bool
- func (c *AMQPClientImpl) Publish(ctx context.Context, destination string, data []byte) error
- func (c *AMQPClientImpl) PublishToExchange(ctx context.Context, options PublishOptions, data []byte) error
- type BindingDeclaration
- type Client
- type ClientFactory
- type ConsumeOptions
- type ConsumerDeclaration
- type DeclarationStats
- type Declarations
- func (d *Declarations) Clone() *Declarations
- func (d *Declarations) RegisterBinding(b *BindingDeclaration)
- func (d *Declarations) RegisterConsumer(c *ConsumerDeclaration)
- func (d *Declarations) RegisterExchange(e *ExchangeDeclaration)
- func (d *Declarations) RegisterPublisher(p *PublisherDeclaration)
- func (d *Declarations) RegisterQueue(q *QueueDeclaration)
- func (d *Declarations) ReplayToRegistry(reg RegistryInterface) error
- func (d *Declarations) Stats() DeclarationStats
- func (d *Declarations) Validate() error
- type ExchangeDeclaration
- type Manager
- func (m *Manager) Close() error
- func (m *Manager) EnsureConsumers(ctx context.Context, key string, decls *Declarations) error
- func (m *Manager) GetPublisher(ctx context.Context, key string) (AMQPClient, error)
- func (m *Manager) StartCleanup(interval time.Duration)
- func (m *Manager) Stats() map[string]any
- func (m *Manager) StopCleanup()
- type ManagerOptions
- type MessageHandler
- type PublishOptions
- type PublisherDeclaration
- type QueueDeclaration
- type Registry
- func (r *Registry) DeclareInfrastructure(ctx context.Context) error
- func (r *Registry) GetBindings() []*BindingDeclaration
- func (r *Registry) GetConsumers() []*ConsumerDeclaration
- func (r *Registry) GetExchanges() map[string]*ExchangeDeclaration
- func (r *Registry) GetPublishers() []*PublisherDeclaration
- func (r *Registry) GetQueues() map[string]*QueueDeclaration
- func (r *Registry) RegisterBinding(declaration *BindingDeclaration)
- func (r *Registry) RegisterConsumer(declaration *ConsumerDeclaration)
- func (r *Registry) RegisterExchange(declaration *ExchangeDeclaration)
- func (r *Registry) RegisterPublisher(declaration *PublisherDeclaration)
- func (r *Registry) RegisterQueue(declaration *QueueDeclaration)
- func (r *Registry) StartConsumers(ctx context.Context) error
- func (r *Registry) StopConsumers()
- func (r *Registry) ValidateConsumer(queue string) bool
- func (r *Registry) ValidatePublisher(exchange, routingKey string) bool
- type RegistryInterface
- type TenantMessagingResourceSource
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AMQPClient ¶
type AMQPClient interface { Client // PublishToExchange publishes a message to a specific exchange with routing key. PublishToExchange(ctx context.Context, options PublishOptions, data []byte) error // ConsumeFromQueue consumes messages from a queue with specific options. ConsumeFromQueue(ctx context.Context, options ConsumeOptions) (<-chan amqp.Delivery, error) // DeclareQueue declares a queue with the given parameters. DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool) error // DeclareExchange declares an exchange with the given parameters. DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool) error // BindQueue binds a queue to an exchange with a routing key. BindQueue(queue, exchange, routingKey string, noWait bool) error }
AMQPClient extends the basic Client interface with AMQP-specific functionality. This allows for more advanced AMQP features while maintaining the simple interface.
type AMQPClientImpl ¶
type AMQPClientImpl struct {
// contains filtered or unexported fields
}
AMQPClientImpl provides an AMQP implementation of the messaging client interface. It includes automatic reconnection, retry logic, and AMQP-specific features.
func NewAMQPClient ¶
func NewAMQPClient(brokerURL string, log logger.Logger) *AMQPClientImpl
NewAMQPClient creates a new AMQP client instance. It automatically attempts to connect to the broker and handles reconnections.
func (*AMQPClientImpl) BindQueue ¶
func (c *AMQPClientImpl) BindQueue(queue, exchange, routingKey string, noWait bool) error
BindQueue binds a queue to an exchange with a routing key.
func (*AMQPClientImpl) Close ¶
func (c *AMQPClientImpl) Close() error
Close gracefully shuts down the AMQP client.
func (*AMQPClientImpl) Consume ¶
func (c *AMQPClientImpl) Consume(ctx context.Context, destination string) (<-chan amqp.Delivery, error)
Consume starts consuming messages from the specified destination (queue name).
func (*AMQPClientImpl) ConsumeFromQueue ¶
func (c *AMQPClientImpl) ConsumeFromQueue(_ context.Context, options ConsumeOptions) (<-chan amqp.Delivery, error)
ConsumeFromQueue consumes messages from a queue with specific options.
func (*AMQPClientImpl) DeclareExchange ¶
func (c *AMQPClientImpl) DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool) error
DeclareExchange declares an exchange with the given parameters.
func (*AMQPClientImpl) DeclareQueue ¶
func (c *AMQPClientImpl) DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool) error
DeclareQueue declares a queue with the given parameters.
func (*AMQPClientImpl) IsReady ¶
func (c *AMQPClientImpl) IsReady() bool
IsReady returns true if the client is connected and ready to send/receive messages.
func (*AMQPClientImpl) Publish ¶
Publish sends a message to the specified destination (queue name). Uses default exchange ("") and destination as routing key.
func (*AMQPClientImpl) PublishToExchange ¶
func (c *AMQPClientImpl) PublishToExchange(ctx context.Context, options PublishOptions, data []byte) error
PublishToExchange publishes a message to a specific exchange with routing key.
type BindingDeclaration ¶
type BindingDeclaration struct { Queue string // Queue name Exchange string // Exchange name RoutingKey string // Routing key pattern NoWait bool // Do not wait for server confirmation Args map[string]any // Additional arguments }
BindingDeclaration defines a queue-to-exchange binding
type Client ¶
type Client interface { // Publish sends a message to the specified destination. // destination can be a queue name, exchange, or topic depending on the implementation. // Returns an error if the publish operation fails. Publish(ctx context.Context, destination string, data []byte) error // Consume starts consuming messages from the specified destination. // Returns a channel that delivers messages and an error if consumption setup fails. // Messages should be acknowledged by the consumer. Consume(ctx context.Context, destination string) (<-chan amqp.Delivery, error) // Close gracefully shuts down the messaging client. // It should clean up all connections and resources. Close() error // IsReady returns true if the client is connected and ready to send/receive messages. IsReady() bool }
Client defines the interface for messaging operations. It provides a simple API for publishing and consuming messages while hiding the complexity of connection management, retries, and protocol-specific details.
type ClientFactory ¶ added in v0.9.0
type ClientFactory func(string, logger.Logger) AMQPClient
ClientFactory creates AMQP clients from URLs
type ConsumeOptions ¶
type ConsumeOptions struct { Queue string // Queue name to consume from Consumer string // Consumer tag AutoAck bool // Auto-acknowledge messages Exclusive bool // Exclusive consumer NoLocal bool // No-local flag NoWait bool // No-wait flag }
ConsumeOptions contains options for consuming messages with AMQP-specific features.
type ConsumerDeclaration ¶
type ConsumerDeclaration struct { Queue string // Queue to consume from Consumer string // Consumer tag AutoAck bool // Automatically acknowledge messages Exclusive bool // Exclusive consumer NoLocal bool // Do not deliver to the connection that published NoWait bool // Do not wait for server confirmation EventType string // Event type identifier Description string // Human-readable description Handler MessageHandler // Message handler (optional for documentation-only declarations) }
ConsumerDeclaration defines what a module consumes and how to handle messages
type DeclarationStats ¶ added in v0.9.0
Stats returns statistics about the declarations.
type Declarations ¶ added in v0.9.0
type Declarations struct { Exchanges map[string]*ExchangeDeclaration Queues map[string]*QueueDeclaration Bindings []*BindingDeclaration Publishers []*PublisherDeclaration Consumers []*ConsumerDeclaration }
Declarations stores messaging infrastructure declarations made by modules at startup. This is a pure data structure (no client dependencies) that can be validated once and replayed to multiple per-tenant registries.
func NewDeclarations ¶ added in v0.9.0
func NewDeclarations() *Declarations
NewDeclarations creates a new empty declarations store.
func (*Declarations) Clone ¶ added in v0.9.0
func (d *Declarations) Clone() *Declarations
Clone creates a deep copy of the declarations. This is useful for creating per-tenant copies during replay.
func (*Declarations) RegisterBinding ¶ added in v0.9.0
func (d *Declarations) RegisterBinding(b *BindingDeclaration)
RegisterBinding adds a binding declaration to the store.
func (*Declarations) RegisterConsumer ¶ added in v0.9.0
func (d *Declarations) RegisterConsumer(c *ConsumerDeclaration)
RegisterConsumer adds a consumer declaration to the store.
func (*Declarations) RegisterExchange ¶ added in v0.9.0
func (d *Declarations) RegisterExchange(e *ExchangeDeclaration)
RegisterExchange adds an exchange declaration to the store.
func (*Declarations) RegisterPublisher ¶ added in v0.9.0
func (d *Declarations) RegisterPublisher(p *PublisherDeclaration)
RegisterPublisher adds a publisher declaration to the store.
func (*Declarations) RegisterQueue ¶ added in v0.9.0
func (d *Declarations) RegisterQueue(q *QueueDeclaration)
RegisterQueue adds a queue declaration to the store.
func (*Declarations) ReplayToRegistry ¶ added in v0.9.0
func (d *Declarations) ReplayToRegistry(reg RegistryInterface) error
ReplayToRegistry applies all declarations to a runtime registry. The order is important: exchanges first, then queues, then bindings, then publishers/consumers.
func (*Declarations) Stats ¶ added in v0.9.0
func (d *Declarations) Stats() DeclarationStats
Stats returns counts of each declaration type.
func (*Declarations) Validate ¶ added in v0.9.0
func (d *Declarations) Validate() error
Validate checks the integrity of all declarations. It ensures that references between declarations are valid.
type ExchangeDeclaration ¶
type ExchangeDeclaration struct { Name string // Exchange name Type string // Exchange type (direct, topic, fanout, headers) Durable bool // Survive server restart AutoDelete bool // Delete when no longer used Internal bool // Internal exchange NoWait bool // Do not wait for server confirmation Args map[string]any // Additional arguments }
ExchangeDeclaration defines an exchange to be declared
type Manager ¶ added in v0.9.0
type Manager struct {
// contains filtered or unexported fields
}
Manager manages AMQP clients by string keys with different lifecycle strategies. Publishers are cached with idle eviction (can be recreated easily). Consumers are long-lived (must stay alive to receive messages). The manager is key-agnostic - it doesn't know about tenants, just manages named clients.
func NewMessagingManager ¶ added in v0.9.0
func NewMessagingManager(resourceSource TenantMessagingResourceSource, log logger.Logger, opts ManagerOptions, clientFactory ClientFactory) *Manager
NewMessagingManager creates a new messaging manager
func (*Manager) EnsureConsumers ¶ added in v0.9.0
EnsureConsumers creates and starts consumers for the given key using the provided declarations. This should be called once per key to set up long-lived consumers. Subsequent calls for the same key are idempotent.
func (*Manager) GetPublisher ¶ added in v0.9.0
GetPublisher returns a publisher client for the given key. Publishers are cached with LRU eviction and lazy initialization.
func (*Manager) StartCleanup ¶ added in v0.9.0
StartCleanup starts the background cleanup routine for idle publishers
func (*Manager) StopCleanup ¶ added in v0.9.0
func (m *Manager) StopCleanup()
StopCleanup stops the background cleanup routine
type ManagerOptions ¶ added in v0.9.0
type ManagerOptions struct { MaxPublishers int // Maximum number of publisher clients to keep cached IdleTTL time.Duration // Time after which idle publishers are evicted }
ManagerOptions configures the MessagingManager
type MessageHandler ¶
type MessageHandler interface { // Handle processes a message and returns an error if processing fails. // If an error is returned, the message will be negatively acknowledged (nack). // If no error is returned, the message will be acknowledged (ack). // The delivery is passed by pointer for performance reasons. Handle(ctx context.Context, delivery *amqp.Delivery) error // EventType returns the event type this handler can process. // This is used for routing messages to the correct handler. EventType() string }
MessageHandler defines the interface for processing consumed messages. Handlers should implement this interface to process specific message types.
type PublishOptions ¶
type PublishOptions struct { Exchange string // AMQP exchange name RoutingKey string // AMQP routing key Headers map[string]any // Message headers Mandatory bool // AMQP mandatory flag Immediate bool // AMQP immediate flag }
PublishOptions contains options for publishing messages with AMQP-specific features.
type PublisherDeclaration ¶
type PublisherDeclaration struct { Exchange string // Target exchange RoutingKey string // Default routing key EventType string // Event type identifier Description string // Human-readable description Mandatory bool // Message must be routed to a queue Immediate bool // Message must be delivered immediately Headers map[string]any // Default headers }
PublisherDeclaration defines what a module publishes
type QueueDeclaration ¶
type QueueDeclaration struct { Name string // Queue name Durable bool // Survive server restart AutoDelete bool // Delete when no consumers Exclusive bool // Only accessible by declaring connection NoWait bool // Do not wait for server confirmation Args map[string]any // Additional arguments }
QueueDeclaration defines a queue to be declared
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages messaging infrastructure declarations across modules. It ensures queues, exchanges, and bindings are properly declared before use. It also manages consumer lifecycle and handles message routing to handlers.
func NewRegistry ¶
func NewRegistry(client AMQPClient, log logger.Logger) *Registry
NewRegistry creates a new messaging registry
func (*Registry) DeclareInfrastructure ¶
DeclareInfrastructure declares all registered messaging infrastructure
func (*Registry) GetBindings ¶ added in v0.8.0
func (r *Registry) GetBindings() []*BindingDeclaration
GetBindings returns all registered bindings (for testing/monitoring)
func (*Registry) GetConsumers ¶
func (r *Registry) GetConsumers() []*ConsumerDeclaration
GetConsumers returns all registered consumers (for documentation/monitoring)
func (*Registry) GetExchanges ¶ added in v0.8.0
func (r *Registry) GetExchanges() map[string]*ExchangeDeclaration
GetExchanges returns all registered exchanges (for testing/monitoring)
func (*Registry) GetPublishers ¶
func (r *Registry) GetPublishers() []*PublisherDeclaration
GetPublishers returns all registered publishers (for documentation/monitoring)
func (*Registry) GetQueues ¶ added in v0.8.0
func (r *Registry) GetQueues() map[string]*QueueDeclaration
GetQueues returns all registered queues (for testing/monitoring)
func (*Registry) RegisterBinding ¶
func (r *Registry) RegisterBinding(declaration *BindingDeclaration)
RegisterBinding registers a binding for declaration
func (*Registry) RegisterConsumer ¶
func (r *Registry) RegisterConsumer(declaration *ConsumerDeclaration)
RegisterConsumer registers a consumer declaration
func (*Registry) RegisterExchange ¶
func (r *Registry) RegisterExchange(declaration *ExchangeDeclaration)
RegisterExchange registers an exchange for declaration
func (*Registry) RegisterPublisher ¶
func (r *Registry) RegisterPublisher(declaration *PublisherDeclaration)
RegisterPublisher registers a publisher declaration
func (*Registry) RegisterQueue ¶
func (r *Registry) RegisterQueue(declaration *QueueDeclaration)
RegisterQueue registers a queue for declaration
func (*Registry) StartConsumers ¶
StartConsumers starts all registered consumers with handlers. This should be called after DeclareInfrastructure and before starting the main application.
func (*Registry) StopConsumers ¶
func (r *Registry) StopConsumers()
StopConsumers gracefully stops all running consumers.
func (*Registry) ValidateConsumer ¶
ValidateConsumer checks if a consumer is registered for the given queue
func (*Registry) ValidatePublisher ¶
ValidatePublisher checks if a publisher is registered for the given exchange/routing key
type RegistryInterface ¶ added in v0.8.0
type RegistryInterface interface { // Registration methods RegisterExchange(declaration *ExchangeDeclaration) RegisterQueue(declaration *QueueDeclaration) RegisterBinding(declaration *BindingDeclaration) RegisterPublisher(declaration *PublisherDeclaration) RegisterConsumer(declaration *ConsumerDeclaration) // Infrastructure lifecycle DeclareInfrastructure(ctx context.Context) error StartConsumers(ctx context.Context) error StopConsumers() // Getter methods for testing/monitoring GetExchanges() map[string]*ExchangeDeclaration GetQueues() map[string]*QueueDeclaration GetBindings() []*BindingDeclaration GetPublishers() []*PublisherDeclaration GetConsumers() []*ConsumerDeclaration // Validation methods ValidatePublisher(exchange, routingKey string) bool ValidateConsumer(queue string) bool }
RegistryInterface defines the contract for messaging infrastructure management. This interface allows for easy mocking and testing of messaging infrastructure.
type TenantMessagingResourceSource ¶ added in v0.9.0
type TenantMessagingResourceSource interface { // AMQPURL returns the AMQP URL for the given key. // For single-tenant apps, key will be "". For multi-tenant, key will be the tenant ID. AMQPURL(ctx context.Context, key string) (string, error) }
TenantMessagingResourceSource provides per-key AMQP configurations. This interface abstracts where tenant-specific messaging configs come from.