messaging

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2025 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package messaging provides a unified interface for message queue operations. It abstracts the underlying messaging implementation to allow for easy testing and future extensibility.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AMQPClient

type AMQPClient interface {
	Client

	// PublishToExchange publishes a message to a specific exchange with routing key.
	PublishToExchange(ctx context.Context, options PublishOptions, data []byte) error

	// ConsumeFromQueue consumes messages from a queue with specific options.
	ConsumeFromQueue(ctx context.Context, options ConsumeOptions) (<-chan amqp.Delivery, error)

	// DeclareQueue declares a queue with the given parameters.
	DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool) error

	// DeclareExchange declares an exchange with the given parameters.
	DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool) error

	// BindQueue binds a queue to an exchange with a routing key.
	BindQueue(queue, exchange, routingKey string, noWait bool) error
}

AMQPClient extends the basic Client interface with AMQP-specific functionality. This allows for more advanced AMQP features while maintaining the simple interface.

type AMQPClientImpl

type AMQPClientImpl struct {
	// contains filtered or unexported fields
}

AMQPClientImpl provides an AMQP implementation of the messaging client interface. It includes automatic reconnection, retry logic, and AMQP-specific features.

func NewAMQPClient

func NewAMQPClient(brokerURL string, log logger.Logger) *AMQPClientImpl

NewAMQPClient creates a new AMQP client instance. It automatically attempts to connect to the broker and handles reconnections.

func (*AMQPClientImpl) BindQueue

func (c *AMQPClientImpl) BindQueue(queue, exchange, routingKey string, noWait bool) error

BindQueue binds a queue to an exchange with a routing key.

func (*AMQPClientImpl) Close

func (c *AMQPClientImpl) Close() error

Close gracefully shuts down the AMQP client.

func (*AMQPClientImpl) Consume

func (c *AMQPClientImpl) Consume(ctx context.Context, destination string) (<-chan amqp.Delivery, error)

Consume starts consuming messages from the specified destination (queue name).

func (*AMQPClientImpl) ConsumeFromQueue

func (c *AMQPClientImpl) ConsumeFromQueue(_ context.Context, options ConsumeOptions) (<-chan amqp.Delivery, error)

ConsumeFromQueue consumes messages from a queue with specific options.

func (*AMQPClientImpl) DeclareExchange

func (c *AMQPClientImpl) DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool) error

DeclareExchange declares an exchange with the given parameters.

func (*AMQPClientImpl) DeclareQueue

func (c *AMQPClientImpl) DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool) error

DeclareQueue declares a queue with the given parameters.

func (*AMQPClientImpl) IsReady

func (c *AMQPClientImpl) IsReady() bool

IsReady returns true if the client is connected and ready to send/receive messages.

func (*AMQPClientImpl) Publish

func (c *AMQPClientImpl) Publish(ctx context.Context, destination string, data []byte) error

Publish sends a message to the specified destination (queue name). Uses default exchange ("") and destination as routing key.

func (*AMQPClientImpl) PublishToExchange

func (c *AMQPClientImpl) PublishToExchange(ctx context.Context, options PublishOptions, data []byte) error

PublishToExchange publishes a message to a specific exchange with routing key.

type BindingDeclaration

type BindingDeclaration struct {
	Queue      string         // Queue name
	Exchange   string         // Exchange name
	RoutingKey string         // Routing key pattern
	NoWait     bool           // Do not wait for server confirmation
	Args       map[string]any // Additional arguments
}

BindingDeclaration defines a queue-to-exchange binding

type Client

type Client interface {
	// Publish sends a message to the specified destination.
	// destination can be a queue name, exchange, or topic depending on the implementation.
	// Returns an error if the publish operation fails.
	Publish(ctx context.Context, destination string, data []byte) error

	// Consume starts consuming messages from the specified destination.
	// Returns a channel that delivers messages and an error if consumption setup fails.
	// Messages should be acknowledged by the consumer.
	Consume(ctx context.Context, destination string) (<-chan amqp.Delivery, error)

	// Close gracefully shuts down the messaging client.
	// It should clean up all connections and resources.
	Close() error

	// IsReady returns true if the client is connected and ready to send/receive messages.
	IsReady() bool
}

Client defines the interface for messaging operations. It provides a simple API for publishing and consuming messages while hiding the complexity of connection management, retries, and protocol-specific details.

type ClientFactory added in v0.9.0

type ClientFactory func(string, logger.Logger) AMQPClient

ClientFactory creates AMQP clients from URLs

type ConsumeOptions

type ConsumeOptions struct {
	Queue     string // Queue name to consume from
	Consumer  string // Consumer tag
	AutoAck   bool   // Auto-acknowledge messages
	Exclusive bool   // Exclusive consumer
	NoLocal   bool   // No-local flag
	NoWait    bool   // No-wait flag
}

ConsumeOptions contains options for consuming messages with AMQP-specific features.

type ConsumerDeclaration

type ConsumerDeclaration struct {
	Queue       string         // Queue to consume from
	Consumer    string         // Consumer tag
	AutoAck     bool           // Automatically acknowledge messages
	Exclusive   bool           // Exclusive consumer
	NoLocal     bool           // Do not deliver to the connection that published
	NoWait      bool           // Do not wait for server confirmation
	EventType   string         // Event type identifier
	Description string         // Human-readable description
	Handler     MessageHandler // Message handler (optional for documentation-only declarations)
}

ConsumerDeclaration defines what a module consumes and how to handle messages

type DeclarationStats added in v0.9.0

type DeclarationStats struct {
	Exchanges  int
	Queues     int
	Bindings   int
	Publishers int
	Consumers  int
}

Stats returns statistics about the declarations.

type Declarations added in v0.9.0

type Declarations struct {
	Exchanges  map[string]*ExchangeDeclaration
	Queues     map[string]*QueueDeclaration
	Bindings   []*BindingDeclaration
	Publishers []*PublisherDeclaration
	Consumers  []*ConsumerDeclaration
}

Declarations stores messaging infrastructure declarations made by modules at startup. This is a pure data structure (no client dependencies) that can be validated once and replayed to multiple per-tenant registries.

func NewDeclarations added in v0.9.0

func NewDeclarations() *Declarations

NewDeclarations creates a new empty declarations store.

func (*Declarations) Clone added in v0.9.0

func (d *Declarations) Clone() *Declarations

Clone creates a deep copy of the declarations. This is useful for creating per-tenant copies during replay.

func (*Declarations) RegisterBinding added in v0.9.0

func (d *Declarations) RegisterBinding(b *BindingDeclaration)

RegisterBinding adds a binding declaration to the store.

func (*Declarations) RegisterConsumer added in v0.9.0

func (d *Declarations) RegisterConsumer(c *ConsumerDeclaration)

RegisterConsumer adds a consumer declaration to the store.

func (*Declarations) RegisterExchange added in v0.9.0

func (d *Declarations) RegisterExchange(e *ExchangeDeclaration)

RegisterExchange adds an exchange declaration to the store.

func (*Declarations) RegisterPublisher added in v0.9.0

func (d *Declarations) RegisterPublisher(p *PublisherDeclaration)

RegisterPublisher adds a publisher declaration to the store.

func (*Declarations) RegisterQueue added in v0.9.0

func (d *Declarations) RegisterQueue(q *QueueDeclaration)

RegisterQueue adds a queue declaration to the store.

func (*Declarations) ReplayToRegistry added in v0.9.0

func (d *Declarations) ReplayToRegistry(reg RegistryInterface) error

ReplayToRegistry applies all declarations to a runtime registry. The order is important: exchanges first, then queues, then bindings, then publishers/consumers.

func (*Declarations) Stats added in v0.9.0

func (d *Declarations) Stats() DeclarationStats

Stats returns counts of each declaration type.

func (*Declarations) Validate added in v0.9.0

func (d *Declarations) Validate() error

Validate checks the integrity of all declarations. It ensures that references between declarations are valid.

type ExchangeDeclaration

type ExchangeDeclaration struct {
	Name       string         // Exchange name
	Type       string         // Exchange type (direct, topic, fanout, headers)
	Durable    bool           // Survive server restart
	AutoDelete bool           // Delete when no longer used
	Internal   bool           // Internal exchange
	NoWait     bool           // Do not wait for server confirmation
	Args       map[string]any // Additional arguments
}

ExchangeDeclaration defines an exchange to be declared

type Manager added in v0.9.0

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages AMQP clients by string keys with different lifecycle strategies. Publishers are cached with idle eviction (can be recreated easily). Consumers are long-lived (must stay alive to receive messages). The manager is key-agnostic - it doesn't know about tenants, just manages named clients.

func NewMessagingManager added in v0.9.0

func NewMessagingManager(resourceSource TenantMessagingResourceSource, log logger.Logger, opts ManagerOptions, clientFactory ClientFactory) *Manager

NewMessagingManager creates a new messaging manager

func (*Manager) Close added in v0.9.0

func (m *Manager) Close() error

Close closes all clients and stops cleanup

func (*Manager) EnsureConsumers added in v0.9.0

func (m *Manager) EnsureConsumers(ctx context.Context, key string, decls *Declarations) error

EnsureConsumers creates and starts consumers for the given key using the provided declarations. This should be called once per key to set up long-lived consumers. Subsequent calls for the same key are idempotent.

func (*Manager) GetPublisher added in v0.9.0

func (m *Manager) GetPublisher(ctx context.Context, key string) (AMQPClient, error)

GetPublisher returns a publisher client for the given key. Publishers are cached with LRU eviction and lazy initialization.

func (*Manager) StartCleanup added in v0.9.0

func (m *Manager) StartCleanup(interval time.Duration)

StartCleanup starts the background cleanup routine for idle publishers

func (*Manager) Stats added in v0.9.0

func (m *Manager) Stats() map[string]any

Stats returns statistics about the messaging manager

func (*Manager) StopCleanup added in v0.9.0

func (m *Manager) StopCleanup()

StopCleanup stops the background cleanup routine

type ManagerOptions added in v0.9.0

type ManagerOptions struct {
	MaxPublishers int           // Maximum number of publisher clients to keep cached
	IdleTTL       time.Duration // Time after which idle publishers are evicted
}

ManagerOptions configures the MessagingManager

type MessageHandler

type MessageHandler interface {
	// Handle processes a message and returns an error if processing fails.
	// If an error is returned, the message will be negatively acknowledged (nack).
	// If no error is returned, the message will be acknowledged (ack).
	// The delivery is passed by pointer for performance reasons.
	Handle(ctx context.Context, delivery *amqp.Delivery) error

	// EventType returns the event type this handler can process.
	// This is used for routing messages to the correct handler.
	EventType() string
}

MessageHandler defines the interface for processing consumed messages. Handlers should implement this interface to process specific message types.

type PublishOptions

type PublishOptions struct {
	Exchange   string         // AMQP exchange name
	RoutingKey string         // AMQP routing key
	Headers    map[string]any // Message headers
	Mandatory  bool           // AMQP mandatory flag
	Immediate  bool           // AMQP immediate flag
}

PublishOptions contains options for publishing messages with AMQP-specific features.

type PublisherDeclaration

type PublisherDeclaration struct {
	Exchange    string         // Target exchange
	RoutingKey  string         // Default routing key
	EventType   string         // Event type identifier
	Description string         // Human-readable description
	Mandatory   bool           // Message must be routed to a queue
	Immediate   bool           // Message must be delivered immediately
	Headers     map[string]any // Default headers
}

PublisherDeclaration defines what a module publishes

type QueueDeclaration

type QueueDeclaration struct {
	Name       string         // Queue name
	Durable    bool           // Survive server restart
	AutoDelete bool           // Delete when no consumers
	Exclusive  bool           // Only accessible by declaring connection
	NoWait     bool           // Do not wait for server confirmation
	Args       map[string]any // Additional arguments
}

QueueDeclaration defines a queue to be declared

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages messaging infrastructure declarations across modules. It ensures queues, exchanges, and bindings are properly declared before use. It also manages consumer lifecycle and handles message routing to handlers.

func NewRegistry

func NewRegistry(client AMQPClient, log logger.Logger) *Registry

NewRegistry creates a new messaging registry

func (*Registry) DeclareInfrastructure

func (r *Registry) DeclareInfrastructure(ctx context.Context) error

DeclareInfrastructure declares all registered messaging infrastructure

func (*Registry) GetBindings added in v0.8.0

func (r *Registry) GetBindings() []*BindingDeclaration

GetBindings returns all registered bindings (for testing/monitoring)

func (*Registry) GetConsumers

func (r *Registry) GetConsumers() []*ConsumerDeclaration

GetConsumers returns all registered consumers (for documentation/monitoring)

func (*Registry) GetExchanges added in v0.8.0

func (r *Registry) GetExchanges() map[string]*ExchangeDeclaration

GetExchanges returns all registered exchanges (for testing/monitoring)

func (*Registry) GetPublishers

func (r *Registry) GetPublishers() []*PublisherDeclaration

GetPublishers returns all registered publishers (for documentation/monitoring)

func (*Registry) GetQueues added in v0.8.0

func (r *Registry) GetQueues() map[string]*QueueDeclaration

GetQueues returns all registered queues (for testing/monitoring)

func (*Registry) RegisterBinding

func (r *Registry) RegisterBinding(declaration *BindingDeclaration)

RegisterBinding registers a binding for declaration

func (*Registry) RegisterConsumer

func (r *Registry) RegisterConsumer(declaration *ConsumerDeclaration)

RegisterConsumer registers a consumer declaration

func (*Registry) RegisterExchange

func (r *Registry) RegisterExchange(declaration *ExchangeDeclaration)

RegisterExchange registers an exchange for declaration

func (*Registry) RegisterPublisher

func (r *Registry) RegisterPublisher(declaration *PublisherDeclaration)

RegisterPublisher registers a publisher declaration

func (*Registry) RegisterQueue

func (r *Registry) RegisterQueue(declaration *QueueDeclaration)

RegisterQueue registers a queue for declaration

func (*Registry) StartConsumers

func (r *Registry) StartConsumers(ctx context.Context) error

StartConsumers starts all registered consumers with handlers. This should be called after DeclareInfrastructure and before starting the main application.

func (*Registry) StopConsumers

func (r *Registry) StopConsumers()

StopConsumers gracefully stops all running consumers.

func (*Registry) ValidateConsumer

func (r *Registry) ValidateConsumer(queue string) bool

ValidateConsumer checks if a consumer is registered for the given queue

func (*Registry) ValidatePublisher

func (r *Registry) ValidatePublisher(exchange, routingKey string) bool

ValidatePublisher checks if a publisher is registered for the given exchange/routing key

type RegistryInterface added in v0.8.0

type RegistryInterface interface {
	// Registration methods
	RegisterExchange(declaration *ExchangeDeclaration)
	RegisterQueue(declaration *QueueDeclaration)
	RegisterBinding(declaration *BindingDeclaration)
	RegisterPublisher(declaration *PublisherDeclaration)
	RegisterConsumer(declaration *ConsumerDeclaration)

	// Infrastructure lifecycle
	DeclareInfrastructure(ctx context.Context) error
	StartConsumers(ctx context.Context) error
	StopConsumers()

	// Getter methods for testing/monitoring
	GetExchanges() map[string]*ExchangeDeclaration
	GetQueues() map[string]*QueueDeclaration
	GetBindings() []*BindingDeclaration
	GetPublishers() []*PublisherDeclaration
	GetConsumers() []*ConsumerDeclaration

	// Validation methods
	ValidatePublisher(exchange, routingKey string) bool
	ValidateConsumer(queue string) bool
}

RegistryInterface defines the contract for messaging infrastructure management. This interface allows for easy mocking and testing of messaging infrastructure.

type TenantMessagingResourceSource added in v0.9.0

type TenantMessagingResourceSource interface {
	// AMQPURL returns the AMQP URL for the given key.
	// For single-tenant apps, key will be "". For multi-tenant, key will be the tenant ID.
	AMQPURL(ctx context.Context, key string) (string, error)
}

TenantMessagingResourceSource provides per-key AMQP configurations. This interface abstracts where tenant-specific messaging configs come from.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL