event

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultNATSURL is the default connection URL for NATS server.
	// It points to a local NATS server running on the standard port.
	DefaultNATSURL = nats.DefaultURL

	// DefaultNATSMaxReconnects is the default maximum number of reconnect
	// attempts.
	// A value of -1 allows unlimited reconnection attempts.
	DefaultNATSMaxReconnects = -1

	// DefaultNATSReconnectWait is the default time to wait between reconnect
	// attempts.
	// This provides a reasonable backoff when the NATS server is temporarily
	// unavailable.
	DefaultNATSReconnectWait = 2 * time.Second
)
View Source
const DefaultAMQPExchange = "cosmos.events"

DefaultAMQPExchange is the default name for the topic exchange used by AMQPBroker when no custom exchange is specified.

View Source
const DefaultMQTTKeepAlive = 30

DefaultMQTTKeepAlive is the default keep-alive interval in seconds used to maintain the MQTT connection.

View Source
const DefaultMQTTQoS = 1

DefaultMQTTQoS is the default quality of service level used for MQTT publish and subscribe operations when not specified.

Variables

View Source
var (
	// ErrBrokerClosed is returned when attempting operations on a closed
	// broker.
	// Once a broker is closed, it cannot be reused and a new instance
	// must be created.
	ErrBrokerClosed = errors.New("broker is closed")
)

Functions

This section is empty.

Types

type AMQPBroker

type AMQPBroker struct {
	// contains filtered or unexported fields
}

AMQPBroker implements the EventBroker interface using RabbitMQ's AMQP protocol for publish/subscribe messaging. It uses a topic exchange to broadcast events to all subscribed consumers, with each subscriber receiving messages in their own exclusive queue.

The broker maintains a single connection with one channel for publishing and creates individual channels for each subscriber, following RabbitMQ best practices for concurrent access.

func NewAMQPBroker

func NewAMQPBroker(url string) (*AMQPBroker, error)

NewAMQPBroker creates a new AMQPBroker by establishing a connection to the RabbitMQ server at the given URL and using the default exchange name. The broker must be closed when no longer needed to release the connection and associated resources.

The URL should be in the format: amqp://username:password@host:port/vhost

func NewAMQPBrokerFrom

func NewAMQPBrokerFrom(
	conn *amqp091.Connection,
	exchange string,
) (*AMQPBroker, error)

NewAMQPBrokerFrom creates a new AMQPBroker using an existing AMQP connection and exchange name. This constructor is useful when you need to share a connection across multiple brokers or have custom connection configuration requirements.

The function creates a dedicated channel for publishing and declares the topic exchange. If the exchange already exists with matching configuration, the declaration is idempotent. The broker takes ownership of managing the connection lifecycle.

func NewAMQPBrokerWith

func NewAMQPBrokerWith(options *AMQPBrokerOptions) (*AMQPBroker, error)

NewAMQPBrokerWith creates a new AMQPBroker using the provided options for connection URL and exchange name. If no exchange name is specified in the options, DefaultAMQPExchange is used. The broker must be closed when no longer needed to release the connection and associated resources.

func (*AMQPBroker) Close

func (b *AMQPBroker) Close() error

Close closes the broker's publish channel and the underlying AMQP connection, releasing all associated resources. This will also cause all active subscriber channels to be closed.

If closing the publish channel fails, the connection is still closed and the channel close error is returned.

func (*AMQPBroker) Publish

func (b *AMQPBroker) Publish(
	ctx context.Context,
	event string,
	payload any,
) error

Publish sends an event with the given name and payload to all subscribers listening for that event. The payload is serialized to JSON before being published to the topic exchange using the event name as the routing key.

Publishing is thread-safe and respects context cancellation. If the context is cancelled before the publish completes, the operation will be aborted and an error returned.

func (*AMQPBroker) Subscribe

func (b *AMQPBroker) Subscribe(
	ctx context.Context,
	event string,
	handler contract.EventHandler,
) (contract.EventUnsubscribeFunc, error)

Subscribe registers a handler to receive events with the given name. Each subscription creates its own exclusive, auto-delete queue that is bound to the topic exchange with the event name as the routing key. Messages are automatically acknowledged.

The handler receives messages in a separate goroutine and will continue processing until the context is cancelled or the returned unsubscribe function is called. The handler receives an EventPayload function that can unmarshal the JSON message into the desired type.

If subscription setup fails, the returned unsubscribe function will return the setup error when called.

type AMQPBrokerOptions

type AMQPBrokerOptions struct {
	// URL is the AMQP connection string in the format:
	// amqp://username:password@host:port/vhost
	URL string

	// Exchange is the name of the topic exchange to use for events.
	// If empty, DefaultAMQPExchange is used.
	Exchange string
}

AMQPBrokerOptions configures the creation of a new AMQPBroker, allowing customization of the connection URL and exchange name.

type MQTTBroker

type MQTTBroker struct {
	// contains filtered or unexported fields
}

MQTTBroker implements the EventBroker interface using MQTT v5 protocol for publish/subscribe messaging. It uses the Eclipse Paho Go client with automatic reconnection support and always operates with clean sessions for simplicity.

Event names are automatically converted to MQTT topic format where dots become slashes and asterisks become plus signs for single-level wildcard matching. Hash symbols remain unchanged for multi-level wildcards.

The broker supports fan-out messaging where multiple handlers can subscribe to the same topic and all will receive messages.

func NewMQTTBroker

func NewMQTTBroker(url string) (*MQTTBroker, error)

NewMQTTBroker creates a new MQTTBroker by connecting to the MQTT broker at the given URL with default settings. The broker uses clean sessions, QoS 1, and automatic reconnection. It must be closed when no longer needed to release resources.

The URL should be in the format: mqtt://host:port or mqtts://host:port for TLS

func NewMQTTBrokerFrom

func NewMQTTBrokerFrom(
	client *autopaho.ConnectionManager,
	qos byte,
) *MQTTBroker

NewMQTTBrokerFrom creates a new MQTTBroker from an existing autopaho ConnectionManager and QoS level. This constructor is useful for advanced scenarios where the user needs full control over the MQTT connection configuration.

func NewMQTTBrokerWith

func NewMQTTBrokerWith(options *MQTTBrokerOptions) (*MQTTBroker, error)

NewMQTTBrokerWith creates a new MQTTBroker using the provided options for connection URLs, QoS level, and authentication. Multiple URLs enable automatic failover between brokers. The broker uses clean sessions and automatic reconnection.

func (*MQTTBroker) Close

func (b *MQTTBroker) Close() error

Close gracefully disconnects from the MQTT broker and releases all resources. This will terminate all active subscriptions and close the underlying connection.

func (*MQTTBroker) Publish

func (b *MQTTBroker) Publish(
	ctx context.Context,
	event string,
	payload any,
) error

Publish sends an event with the given name and payload to all subscribers listening for that event. The payload is serialized to JSON and the event name is converted to MQTT topic format.

Publishing is thread-safe and respects context cancellation. The operation uses the configured QoS level for delivery guarantees.

func (*MQTTBroker) Subscribe

func (b *MQTTBroker) Subscribe(
	ctx context.Context,
	event string,
	handler contract.EventHandler,
) (contract.EventUnsubscribeFunc, error)

Subscribe registers a handler to receive events with the given name or pattern. The event name is converted to MQTT topic format where dots become slashes and asterisks become plus signs for single-level wildcard matching. Hash symbols remain unchanged for multi-level wildcard matching.

Multiple handlers can subscribe to the same topic and all will receive messages (fan-out). Each handler is tracked individually so unsubscribing one handler does not affect others.

The returned unsubscribe function removes the specific handler and unsubscribes from the MQTT broker only when the last handler for the topic is removed.

type MQTTBrokerOptions

type MQTTBrokerOptions struct {
	// URLs is a slice of MQTT broker URLs to connect to.
	// Format: mqtt://host:port or mqtts://host:port for TLS.
	// Multiple URLs enable automatic failover between brokers.
	URLs []string

	// QoS is the quality of service level (0, 1, or 2).
	// 0: At most once delivery (fire and forget).
	// 1: At least once delivery (recommended default).
	// 2: Exactly once delivery (highest overhead).
	// Default: 1
	QoS byte

	// Username for MQTT broker authentication (optional).
	Username string

	// Password for MQTT broker authentication (optional).
	Password string

	// KeepAlive is the interval in seconds for keep-alive pings
	// to maintain the connection. Default: 30
	KeepAlive uint16
}

MQTTBrokerOptions configures the creation of a new MQTTBroker, allowing customization of connection URLs, QoS level, and authentication credentials.

type MemoryBroker

type MemoryBroker struct {
	// contains filtered or unexported fields
}

MemoryBroker implements the EventBroker interface using only in-memory data structures with no external dependencies. It provides a lightweight, zero-configuration broker ideal for testing, local development, and single-instance applications. All message delivery happens asynchronously in separate goroutines with panic recovery to ensure one handler's failure doesn't affect others.

func NewMemoryBroker

func NewMemoryBroker() *MemoryBroker

NewMemoryBroker creates a new in-memory event broker with no external dependencies or configuration required. The broker is ready to use immediately and supports concurrent access from multiple goroutines. It must be closed with Close when no longer needed to release resources.

func (*MemoryBroker) Close

func (b *MemoryBroker) Close() error

Close shuts down the broker and removes all subscribed handlers. After Close is called, all operations return ErrBrokerClosed and the broker cannot be reused. In-flight message deliveries may still complete after Close returns.

func (*MemoryBroker) Publish

func (b *MemoryBroker) Publish(
	ctx context.Context,
	event string,
	payload any,
) error

Publish sends an event with the given payload to all matching subscribers. The payload is JSON-encoded and delivered asynchronously to handlers whose subscription patterns match the event name. Handlers are invoked in separate goroutines with panic recovery, ensuring one handler's failure doesn't affect others.

Wildcard matching supports:

  • "*" matches a single token (e.g., "user.*.created" matches "user.123.created")
  • "#" matches zero or more tokens (e.g., "logs.#" matches "logs", "logs.error", "logs.error.database")

Returns an error if JSON encoding fails or if the broker is closed. The context is checked once at the start of the publish operation.

func (*MemoryBroker) Subscribe

func (b *MemoryBroker) Subscribe(
	ctx context.Context,
	event string,
	handler contract.EventHandler,
) (contract.EventUnsubscribeFunc, error)

Subscribe registers a handler for events matching the given pattern. The pattern supports wildcards:

  • "*" matches a single token (e.g., "user.*.created")
  • "#" matches zero or more tokens (e.g., "logs.#")

Multiple handlers can subscribe to the same pattern and all will receive messages (fan-out). Handlers are invoked asynchronously in separate goroutines.

Returns an unsubscribe function that removes only this specific handler subscription. Returns an error if the broker is closed. The context is used only for the subscription setup, not for the handler lifecycle.

type NATSBroker

type NATSBroker struct {
	// contains filtered or unexported fields
}

NATSBroker implements the EventBroker interface using NATS messaging system. It provides a lightweight, high-performance event broker with built-in fan-out support and wildcard subscriptions. NATS handles message routing natively, making this implementation simpler than brokers that require manual handler tracking.

func NewNATSBroker

func NewNATSBroker(url string) (*NATSBroker, error)

NewNATSBroker creates a new NATS broker connected to the specified URL. It applies sensible defaults for reconnection behavior. This constructor is suitable for simple use cases with a single NATS server.

For clustered deployments or custom authentication, use NewNATSBrokerWith instead.

func NewNATSBrokerFrom

func NewNATSBrokerFrom(conn *nats.Conn) *NATSBroker

NewNATSBrokerFrom creates a new NATS broker from an existing connection. This is useful when you need full control over connection creation or want to share a connection across multiple components. The broker takes ownership of the connection and will close it when Close is called.

func NewNATSBrokerWith

func NewNATSBrokerWith(options *NATSBrokerOptions) (*NATSBroker, error)

NewNATSBrokerWith creates a new NATS broker with custom configuration. It provides full control over connection behavior, authentication, and reliability. Applies sensible defaults for any unspecified options.

Returns an error if connection to the NATS server fails.

func (*NATSBroker) Close

func (b *NATSBroker) Close() error

Close gracefully shuts down the NATS connection. It drains all pending messages before closing, ensuring no messages are lost. After Close is called, the broker cannot be reused.

func (*NATSBroker) Publish

func (b *NATSBroker) Publish(
	ctx context.Context,
	event string,
	payload any,
) error

Publish sends an event with the given payload to all subscribers. The payload is JSON-encoded before transmission. The event name is used as the NATS subject.

Returns an error if JSON encoding fails or if the publish operation fails. The context is used for operation timeout and cancellation.

func (*NATSBroker) Subscribe

func (b *NATSBroker) Subscribe(
	ctx context.Context,
	event string,
	handler contract.EventHandler,
) (contract.EventUnsubscribeFunc, error)

Subscribe registers a handler for events matching the given pattern. The event pattern supports NATS wildcards:

  • "*" matches a single token (e.g., "users.*.created")
  • ">" matches multiple tokens (e.g., "users.>")

The "#" wildcard from other brokers is automatically converted to ">". Multiple subscribers to the same subject all receive messages (fan-out).

Returns an unsubscribe function that removes this specific handler. The context is used only for the subscription setup, not for the handler lifecycle.

type NATSBrokerOptions

type NATSBrokerOptions struct {
	// URLs is a list of NATS server URLs to connect to.
	// Multiple URLs enable automatic failover in clustered deployments.
	// If empty, defaults to DefaultNATSURL.
	URLs []string

	// Name identifies this client connection in NATS server logs and
	// monitoring.
	// Useful for debugging and tracing connection issues.
	Name string

	// MaxReconnects is the maximum number of reconnection attempts.
	// Use -1 for unlimited reconnects (default), 0 to disable reconnection.
	MaxReconnects int

	// ReconnectWait is the time to wait between reconnection attempts.
	// Defaults to DefaultNATSReconnectWait (2 seconds).
	ReconnectWait time.Duration

	// Timeout is the connection timeout for initial connection and
	// operations.
	// If zero, NATS uses its default timeout.
	Timeout time.Duration

	// Username is the username for basic authentication.
	// Used in combination with Password when the NATS server requires auth.
	Username string

	// Password is the password for basic authentication.
	// Used in combination with Username when the NATS server requires auth.
	Password string

	// Token is a bearer token for token-based authentication.
	// Alternative to username/password authentication.
	Token string

	// NKeySeed is the seed for NKey authentication.
	// NKey provides cryptographic authentication without transmitting secrets.
	NKeySeed string

	// CredentialsFile is the path to a credentials file containing JWT and
	// NKey.
	// This is the recommended authentication method for production
	// deployments.
	CredentialsFile string

	// TLSConfig enables TLS encryption for the NATS connection.
	// When set, all communication with the NATS server is encrypted.
	TLSConfig *tls.Config

	// RootCAs is a list of paths to root CA certificate files.
	// Used to verify the NATS server's certificate when using TLS.
	RootCAs []string
}

NATSBrokerOptions configures a NATS broker connection. It provides comprehensive control over connection behavior, authentication, and reliability features. All fields are optional; sensible defaults are applied when using NewNATSBrokerWith.

type RedisBroker

type RedisBroker struct {
	// contains filtered or unexported fields
}

func NewRedisBroker

func NewRedisBroker(options *redis.Options) *RedisBroker

func NewRedisBrokerFrom

func NewRedisBrokerFrom(client *redis.Client) *RedisBroker

func (*RedisBroker) Close

func (b *RedisBroker) Close() error

func (*RedisBroker) Publish

func (b *RedisBroker) Publish(ctx context.Context, event string, payload any) error

func (*RedisBroker) Subscribe

type RedisBrokerOptions

type RedisBrokerOptions = redis.Options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL