Documentation
¶
Index ¶
Constants ¶
const ( // DefaultNATSURL is the default connection URL for NATS server. // It points to a local NATS server running on the standard port. DefaultNATSURL = nats.DefaultURL // DefaultNATSMaxReconnects is the default maximum number of reconnect // attempts. // A value of -1 allows unlimited reconnection attempts. DefaultNATSMaxReconnects = -1 // DefaultNATSReconnectWait is the default time to wait between reconnect // attempts. // This provides a reasonable backoff when the NATS server is temporarily // unavailable. DefaultNATSReconnectWait = 2 * time.Second )
const DefaultAMQPExchange = "cosmos.events"
DefaultAMQPExchange is the default name for the topic exchange used by AMQPBroker when no custom exchange is specified.
const DefaultMQTTKeepAlive = 30
DefaultMQTTKeepAlive is the default keep-alive interval in seconds used to maintain the MQTT connection.
const DefaultMQTTQoS = 1
DefaultMQTTQoS is the default quality of service level used for MQTT publish and subscribe operations when not specified.
Variables ¶
var ( // ErrBrokerClosed is returned when attempting operations on a closed // broker. // Once a broker is closed, it cannot be reused and a new instance // must be created. ErrBrokerClosed = errors.New("broker is closed") )
Functions ¶
This section is empty.
Types ¶
type AMQPBroker ¶
type AMQPBroker struct {
// contains filtered or unexported fields
}
AMQPBroker implements the EventBroker interface using RabbitMQ's AMQP protocol for publish/subscribe messaging. It uses a topic exchange to broadcast events to all subscribed consumers, with each subscriber receiving messages in their own exclusive queue.
The broker maintains a single connection with one channel for publishing and creates individual channels for each subscriber, following RabbitMQ best practices for concurrent access.
func NewAMQPBroker ¶
func NewAMQPBroker(url string) (*AMQPBroker, error)
NewAMQPBroker creates a new AMQPBroker by establishing a connection to the RabbitMQ server at the given URL and using the default exchange name. The broker must be closed when no longer needed to release the connection and associated resources.
The URL should be in the format: amqp://username:password@host:port/vhost
func NewAMQPBrokerFrom ¶
func NewAMQPBrokerFrom( conn *amqp091.Connection, exchange string, ) (*AMQPBroker, error)
NewAMQPBrokerFrom creates a new AMQPBroker using an existing AMQP connection and exchange name. This constructor is useful when you need to share a connection across multiple brokers or have custom connection configuration requirements.
The function creates a dedicated channel for publishing and declares the topic exchange. If the exchange already exists with matching configuration, the declaration is idempotent. The broker takes ownership of managing the connection lifecycle.
func NewAMQPBrokerWith ¶
func NewAMQPBrokerWith(options *AMQPBrokerOptions) (*AMQPBroker, error)
NewAMQPBrokerWith creates a new AMQPBroker using the provided options for connection URL and exchange name. If no exchange name is specified in the options, DefaultAMQPExchange is used. The broker must be closed when no longer needed to release the connection and associated resources.
func (*AMQPBroker) Close ¶
func (b *AMQPBroker) Close() error
Close closes the broker's publish channel and the underlying AMQP connection, releasing all associated resources. This will also cause all active subscriber channels to be closed.
If closing the publish channel fails, the connection is still closed and the channel close error is returned.
func (*AMQPBroker) Publish ¶
Publish sends an event with the given name and payload to all subscribers listening for that event. The payload is serialized to JSON before being published to the topic exchange using the event name as the routing key.
Publishing is thread-safe and respects context cancellation. If the context is cancelled before the publish completes, the operation will be aborted and an error returned.
func (*AMQPBroker) Subscribe ¶
func (b *AMQPBroker) Subscribe( ctx context.Context, event string, handler contract.EventHandler, ) (contract.EventUnsubscribeFunc, error)
Subscribe registers a handler to receive events with the given name. Each subscription creates its own exclusive, auto-delete queue that is bound to the topic exchange with the event name as the routing key. Messages are automatically acknowledged.
The handler receives messages in a separate goroutine and will continue processing until the context is cancelled or the returned unsubscribe function is called. The handler receives an EventPayload function that can unmarshal the JSON message into the desired type.
If subscription setup fails, the returned unsubscribe function will return the setup error when called.
type AMQPBrokerOptions ¶
type AMQPBrokerOptions struct {
// URL is the AMQP connection string in the format:
// amqp://username:password@host:port/vhost
URL string
// Exchange is the name of the topic exchange to use for events.
// If empty, DefaultAMQPExchange is used.
Exchange string
}
AMQPBrokerOptions configures the creation of a new AMQPBroker, allowing customization of the connection URL and exchange name.
type MQTTBroker ¶
type MQTTBroker struct {
// contains filtered or unexported fields
}
MQTTBroker implements the EventBroker interface using MQTT v5 protocol for publish/subscribe messaging. It uses the Eclipse Paho Go client with automatic reconnection support and always operates with clean sessions for simplicity.
Event names are automatically converted to MQTT topic format where dots become slashes and asterisks become plus signs for single-level wildcard matching. Hash symbols remain unchanged for multi-level wildcards.
The broker supports fan-out messaging where multiple handlers can subscribe to the same topic and all will receive messages.
func NewMQTTBroker ¶
func NewMQTTBroker(url string) (*MQTTBroker, error)
NewMQTTBroker creates a new MQTTBroker by connecting to the MQTT broker at the given URL with default settings. The broker uses clean sessions, QoS 1, and automatic reconnection. It must be closed when no longer needed to release resources.
The URL should be in the format: mqtt://host:port or mqtts://host:port for TLS
func NewMQTTBrokerFrom ¶
func NewMQTTBrokerFrom( client *autopaho.ConnectionManager, qos byte, ) *MQTTBroker
NewMQTTBrokerFrom creates a new MQTTBroker from an existing autopaho ConnectionManager and QoS level. This constructor is useful for advanced scenarios where the user needs full control over the MQTT connection configuration.
func NewMQTTBrokerWith ¶
func NewMQTTBrokerWith(options *MQTTBrokerOptions) (*MQTTBroker, error)
NewMQTTBrokerWith creates a new MQTTBroker using the provided options for connection URLs, QoS level, and authentication. Multiple URLs enable automatic failover between brokers. The broker uses clean sessions and automatic reconnection.
func (*MQTTBroker) Close ¶
func (b *MQTTBroker) Close() error
Close gracefully disconnects from the MQTT broker and releases all resources. This will terminate all active subscriptions and close the underlying connection.
func (*MQTTBroker) Publish ¶
Publish sends an event with the given name and payload to all subscribers listening for that event. The payload is serialized to JSON and the event name is converted to MQTT topic format.
Publishing is thread-safe and respects context cancellation. The operation uses the configured QoS level for delivery guarantees.
func (*MQTTBroker) Subscribe ¶
func (b *MQTTBroker) Subscribe( ctx context.Context, event string, handler contract.EventHandler, ) (contract.EventUnsubscribeFunc, error)
Subscribe registers a handler to receive events with the given name or pattern. The event name is converted to MQTT topic format where dots become slashes and asterisks become plus signs for single-level wildcard matching. Hash symbols remain unchanged for multi-level wildcard matching.
Multiple handlers can subscribe to the same topic and all will receive messages (fan-out). Each handler is tracked individually so unsubscribing one handler does not affect others.
The returned unsubscribe function removes the specific handler and unsubscribes from the MQTT broker only when the last handler for the topic is removed.
type MQTTBrokerOptions ¶
type MQTTBrokerOptions struct {
// URLs is a slice of MQTT broker URLs to connect to.
// Format: mqtt://host:port or mqtts://host:port for TLS.
// Multiple URLs enable automatic failover between brokers.
URLs []string
// QoS is the quality of service level (0, 1, or 2).
// 0: At most once delivery (fire and forget).
// 1: At least once delivery (recommended default).
// 2: Exactly once delivery (highest overhead).
// Default: 1
QoS byte
// Username for MQTT broker authentication (optional).
Username string
// Password for MQTT broker authentication (optional).
Password string
// KeepAlive is the interval in seconds for keep-alive pings
// to maintain the connection. Default: 30
KeepAlive uint16
}
MQTTBrokerOptions configures the creation of a new MQTTBroker, allowing customization of connection URLs, QoS level, and authentication credentials.
type MemoryBroker ¶
type MemoryBroker struct {
// contains filtered or unexported fields
}
MemoryBroker implements the EventBroker interface using only in-memory data structures with no external dependencies. It provides a lightweight, zero-configuration broker ideal for testing, local development, and single-instance applications. All message delivery happens asynchronously in separate goroutines with panic recovery to ensure one handler's failure doesn't affect others.
func NewMemoryBroker ¶
func NewMemoryBroker() *MemoryBroker
NewMemoryBroker creates a new in-memory event broker with no external dependencies or configuration required. The broker is ready to use immediately and supports concurrent access from multiple goroutines. It must be closed with Close when no longer needed to release resources.
func (*MemoryBroker) Close ¶
func (b *MemoryBroker) Close() error
Close shuts down the broker and removes all subscribed handlers. After Close is called, all operations return ErrBrokerClosed and the broker cannot be reused. In-flight message deliveries may still complete after Close returns.
func (*MemoryBroker) Publish ¶
Publish sends an event with the given payload to all matching subscribers. The payload is JSON-encoded and delivered asynchronously to handlers whose subscription patterns match the event name. Handlers are invoked in separate goroutines with panic recovery, ensuring one handler's failure doesn't affect others.
Wildcard matching supports:
- "*" matches a single token (e.g., "user.*.created" matches "user.123.created")
- "#" matches zero or more tokens (e.g., "logs.#" matches "logs", "logs.error", "logs.error.database")
Returns an error if JSON encoding fails or if the broker is closed. The context is checked once at the start of the publish operation.
func (*MemoryBroker) Subscribe ¶
func (b *MemoryBroker) Subscribe( ctx context.Context, event string, handler contract.EventHandler, ) (contract.EventUnsubscribeFunc, error)
Subscribe registers a handler for events matching the given pattern. The pattern supports wildcards:
- "*" matches a single token (e.g., "user.*.created")
- "#" matches zero or more tokens (e.g., "logs.#")
Multiple handlers can subscribe to the same pattern and all will receive messages (fan-out). Handlers are invoked asynchronously in separate goroutines.
Returns an unsubscribe function that removes only this specific handler subscription. Returns an error if the broker is closed. The context is used only for the subscription setup, not for the handler lifecycle.
type NATSBroker ¶
type NATSBroker struct {
// contains filtered or unexported fields
}
NATSBroker implements the EventBroker interface using NATS messaging system. It provides a lightweight, high-performance event broker with built-in fan-out support and wildcard subscriptions. NATS handles message routing natively, making this implementation simpler than brokers that require manual handler tracking.
func NewNATSBroker ¶
func NewNATSBroker(url string) (*NATSBroker, error)
NewNATSBroker creates a new NATS broker connected to the specified URL. It applies sensible defaults for reconnection behavior. This constructor is suitable for simple use cases with a single NATS server.
For clustered deployments or custom authentication, use NewNATSBrokerWith instead.
func NewNATSBrokerFrom ¶
func NewNATSBrokerFrom(conn *nats.Conn) *NATSBroker
NewNATSBrokerFrom creates a new NATS broker from an existing connection. This is useful when you need full control over connection creation or want to share a connection across multiple components. The broker takes ownership of the connection and will close it when Close is called.
func NewNATSBrokerWith ¶
func NewNATSBrokerWith(options *NATSBrokerOptions) (*NATSBroker, error)
NewNATSBrokerWith creates a new NATS broker with custom configuration. It provides full control over connection behavior, authentication, and reliability. Applies sensible defaults for any unspecified options.
Returns an error if connection to the NATS server fails.
func (*NATSBroker) Close ¶
func (b *NATSBroker) Close() error
Close gracefully shuts down the NATS connection. It drains all pending messages before closing, ensuring no messages are lost. After Close is called, the broker cannot be reused.
func (*NATSBroker) Publish ¶
Publish sends an event with the given payload to all subscribers. The payload is JSON-encoded before transmission. The event name is used as the NATS subject.
Returns an error if JSON encoding fails or if the publish operation fails. The context is used for operation timeout and cancellation.
func (*NATSBroker) Subscribe ¶
func (b *NATSBroker) Subscribe( ctx context.Context, event string, handler contract.EventHandler, ) (contract.EventUnsubscribeFunc, error)
Subscribe registers a handler for events matching the given pattern. The event pattern supports NATS wildcards:
- "*" matches a single token (e.g., "users.*.created")
- ">" matches multiple tokens (e.g., "users.>")
The "#" wildcard from other brokers is automatically converted to ">". Multiple subscribers to the same subject all receive messages (fan-out).
Returns an unsubscribe function that removes this specific handler. The context is used only for the subscription setup, not for the handler lifecycle.
type NATSBrokerOptions ¶
type NATSBrokerOptions struct {
// URLs is a list of NATS server URLs to connect to.
// Multiple URLs enable automatic failover in clustered deployments.
// If empty, defaults to DefaultNATSURL.
URLs []string
// Name identifies this client connection in NATS server logs and
// monitoring.
// Useful for debugging and tracing connection issues.
Name string
// MaxReconnects is the maximum number of reconnection attempts.
// Use -1 for unlimited reconnects (default), 0 to disable reconnection.
MaxReconnects int
// ReconnectWait is the time to wait between reconnection attempts.
// Defaults to DefaultNATSReconnectWait (2 seconds).
ReconnectWait time.Duration
// Timeout is the connection timeout for initial connection and
// operations.
// If zero, NATS uses its default timeout.
Timeout time.Duration
// Username is the username for basic authentication.
// Used in combination with Password when the NATS server requires auth.
Username string
// Password is the password for basic authentication.
// Used in combination with Username when the NATS server requires auth.
Password string
// Token is a bearer token for token-based authentication.
// Alternative to username/password authentication.
Token string
// NKeySeed is the seed for NKey authentication.
// NKey provides cryptographic authentication without transmitting secrets.
NKeySeed string
// CredentialsFile is the path to a credentials file containing JWT and
// NKey.
// This is the recommended authentication method for production
// deployments.
CredentialsFile string
// TLSConfig enables TLS encryption for the NATS connection.
// When set, all communication with the NATS server is encrypted.
TLSConfig *tls.Config
// RootCAs is a list of paths to root CA certificate files.
// Used to verify the NATS server's certificate when using TLS.
RootCAs []string
}
NATSBrokerOptions configures a NATS broker connection. It provides comprehensive control over connection behavior, authentication, and reliability features. All fields are optional; sensible defaults are applied when using NewNATSBrokerWith.
type RedisBroker ¶
type RedisBroker struct {
// contains filtered or unexported fields
}
func NewRedisBroker ¶
func NewRedisBroker(options *redis.Options) *RedisBroker
func NewRedisBrokerFrom ¶
func NewRedisBrokerFrom(client *redis.Client) *RedisBroker
func (*RedisBroker) Close ¶
func (b *RedisBroker) Close() error
func (*RedisBroker) Subscribe ¶
func (b *RedisBroker) Subscribe(ctx context.Context, event string, handler contract.EventHandler) (contract.EventUnsubscribeFunc, error)