Documentation
¶
Overview ¶
Package eventbus provides a flexible event-driven messaging system for the modular framework.
This module enables decoupled communication between application components through an event bus pattern. It supports both synchronous and asynchronous event processing, multiple event bus engines, and configurable event handling strategies.
Features ¶
The eventbus module offers the following capabilities:
- Topic-based event publishing and subscription
- Synchronous and asynchronous event processing
- Multiple engine support (memory, Redis, Kafka)
- Configurable worker pools for async processing
- Event metadata and lifecycle tracking
- Subscription management with unique identifiers
- Event TTL and retention policies
Configuration ¶
The module can be configured through the EventBusConfig structure:
config := &EventBusConfig{ Engine: "memory", // or "redis", "kafka" MaxEventQueueSize: 1000, // events per topic queue DefaultEventBufferSize: 10, // subscription channel buffer WorkerCount: 5, // async processing workers EventTTL: 3600, // event time-to-live in seconds RetentionDays: 7, // event history retention ExternalBrokerURL: "", // for external brokers ExternalBrokerUser: "", // broker authentication ExternalBrokerPassword: "", // broker password }
Service Registration ¶
The module registers itself as a service for dependency injection:
// Get the event bus service eventBus := app.GetService("eventbus.provider").(*EventBusModule) // Publish an event err := eventBus.Publish(ctx, "user.created", userData) // Subscribe to events subscription, err := eventBus.Subscribe(ctx, "user.*", userEventHandler)
Usage Examples ¶
Basic event publishing:
// Publish a simple event err := eventBus.Publish(ctx, "order.placed", orderData) // Publish with custom metadata event := Event{ Topic: "payment.processed", Payload: paymentData, Metadata: map[string]interface{}{ "source": "payment-service", "version": "1.2.0", }, } err := eventBus.Publish(ctx, event.Topic, event.Payload)
Event subscription patterns:
// Synchronous subscription subscription, err := eventBus.Subscribe(ctx, "user.updated", func(ctx context.Context, event Event) error { user := event.Payload.(UserData) return updateUserCache(user) }) // Asynchronous subscription for heavy processing asyncSub, err := eventBus.SubscribeAsync(ctx, "image.uploaded", func(ctx context.Context, event Event) error { imageData := event.Payload.(ImageData) return processImageThumbnails(imageData) }) // Wildcard subscriptions allOrdersSub, err := eventBus.Subscribe(ctx, "order.*", orderEventHandler)
Subscription management:
// Check subscription details fmt.Printf("Subscribed to: %s (ID: %s, Async: %v)", subscription.Topic(), subscription.ID(), subscription.IsAsync()) // Cancel specific subscriptions err := eventBus.Unsubscribe(ctx, subscription) // Or cancel through the subscription itself err := subscription.Cancel()
Event Processing Patterns ¶
The module supports different event processing patterns:
**Synchronous Processing**: Events are processed immediately in the same goroutine that published them. Best for lightweight operations and when ordering is important.
**Asynchronous Processing**: Events are queued and processed by worker goroutines. Best for heavy operations, external API calls, or when you don't want to block the publisher.
Engine Support ¶
Currently supported engines:
- **memory**: In-process event bus using Go channels
- **redis**: Distributed event bus using Redis pub/sub (planned)
- **kafka**: Enterprise event bus using Apache Kafka (planned)
Index ¶
- Constants
- Variables
- func GetRegisteredEngines() []string
- func NewModule() modular.Module
- func RegisterEngine(engineType string, factory EngineFactory)
- type CustomMemoryConfig
- type CustomMemoryEventBus
- func (c *CustomMemoryEventBus) GetMetrics() *EventMetrics
- func (c *CustomMemoryEventBus) Publish(ctx context.Context, event Event) error
- func (c *CustomMemoryEventBus) Start(ctx context.Context) error
- func (c *CustomMemoryEventBus) Stop(ctx context.Context) error
- func (c *CustomMemoryEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (c *CustomMemoryEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (c *CustomMemoryEventBus) SubscriberCount(topic string) int
- func (c *CustomMemoryEventBus) Topics() []string
- func (c *CustomMemoryEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error
- type DatadogStatsdExporter
- type DeliveryStats
- type EngineConfig
- type EngineFactory
- type EngineRouter
- func (r *EngineRouter) CollectPerEngineStats() map[string]DeliveryStats
- func (r *EngineRouter) CollectStats() (delivered uint64, dropped uint64)
- func (r *EngineRouter) GetEngineForTopic(topic string) string
- func (r *EngineRouter) GetEngineNames() []string
- func (r *EngineRouter) Publish(ctx context.Context, event Event) error
- func (r *EngineRouter) SetModuleReference(module *EventBusModule)
- func (r *EngineRouter) Start(ctx context.Context) error
- func (r *EngineRouter) Stop(ctx context.Context) error
- func (r *EngineRouter) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (r *EngineRouter) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (r *EngineRouter) SubscriberCount(topic string) int
- func (r *EngineRouter) Topics() []string
- func (r *EngineRouter) Unsubscribe(ctx context.Context, subscription Subscription) error
- type Event
- type EventBus
- type EventBusConfig
- type EventBusModule
- func (m *EventBusModule) Constructor() modular.ModuleConstructor
- func (m *EventBusModule) Dependencies() []string
- func (m *EventBusModule) EmitEvent(ctx context.Context, event cloudevents.Event) error
- func (m *EventBusModule) GetRegisteredEventTypes() []string
- func (m *EventBusModule) GetRouter() *EngineRouter
- func (m *EventBusModule) Init(app modular.Application) error
- func (m *EventBusModule) Name() string
- func (m *EventBusModule) PerEngineStats() map[string]DeliveryStats
- func (m *EventBusModule) ProvidesServices() []modular.ServiceProvider
- func (m *EventBusModule) Publish(ctx context.Context, topic string, payload interface{}) error
- func (m *EventBusModule) RegisterConfig(app modular.Application) error
- func (m *EventBusModule) RegisterObservers(subject modular.Subject) error
- func (m *EventBusModule) RequiresServices() []modular.ServiceDependency
- func (m *EventBusModule) Start(ctx context.Context) error
- func (m *EventBusModule) Stats() (delivered uint64, dropped uint64)
- func (m *EventBusModule) Stop(ctx context.Context) error
- func (m *EventBusModule) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (m *EventBusModule) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (m *EventBusModule) SubscriberCount(topic string) int
- func (m *EventBusModule) Topics() []string
- func (m *EventBusModule) Unsubscribe(ctx context.Context, subscription Subscription) error
- type EventFilter
- type EventHandler
- type EventMetrics
- type KafkaConfig
- type KafkaConsumerGroupHandler
- type KafkaEventBus
- func (k *KafkaEventBus) Publish(ctx context.Context, event Event) error
- func (k *KafkaEventBus) Start(ctx context.Context) error
- func (k *KafkaEventBus) Stop(ctx context.Context) error
- func (k *KafkaEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (k *KafkaEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (k *KafkaEventBus) SubscriberCount(topic string) int
- func (k *KafkaEventBus) Topics() []string
- func (k *KafkaEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error
- type KinesisConfig
- type KinesisEventBus
- func (k *KinesisEventBus) Publish(ctx context.Context, event Event) error
- func (k *KinesisEventBus) Start(ctx context.Context) error
- func (k *KinesisEventBus) Stop(ctx context.Context) error
- func (k *KinesisEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (k *KinesisEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (k *KinesisEventBus) SubscriberCount(topic string) int
- func (k *KinesisEventBus) Topics() []string
- func (k *KinesisEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error
- type MemoryEventBus
- func (m *MemoryEventBus) Publish(ctx context.Context, event Event) error
- func (m *MemoryEventBus) SetModule(module *EventBusModule)
- func (m *MemoryEventBus) Start(ctx context.Context) error
- func (m *MemoryEventBus) Stats() (delivered uint64, dropped uint64)
- func (m *MemoryEventBus) Stop(ctx context.Context) error
- func (m *MemoryEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (m *MemoryEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (m *MemoryEventBus) SubscriberCount(topic string) int
- func (m *MemoryEventBus) Topics() []string
- func (m *MemoryEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error
- type PrometheusCollector
- type RedisConfig
- type RedisEventBus
- func (r *RedisEventBus) Publish(ctx context.Context, event Event) error
- func (r *RedisEventBus) Start(ctx context.Context) error
- func (r *RedisEventBus) Stop(ctx context.Context) error
- func (r *RedisEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (r *RedisEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (r *RedisEventBus) SubscriberCount(topic string) int
- func (r *RedisEventBus) Topics() []string
- func (r *RedisEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error
- type RoutingRule
- type Subscription
- type TopicPrefixFilter
Constants ¶
const ( // Message events EventTypeMessagePublished = "com.modular.eventbus.message.published" EventTypeMessageReceived = "com.modular.eventbus.message.received" EventTypeMessageFailed = "com.modular.eventbus.message.failed" // Topic events EventTypeTopicCreated = "com.modular.eventbus.topic.created" EventTypeTopicDeleted = "com.modular.eventbus.topic.deleted" // Subscription events EventTypeSubscriptionCreated = "com.modular.eventbus.subscription.created" EventTypeSubscriptionRemoved = "com.modular.eventbus.subscription.removed" // Bus lifecycle events EventTypeBusStarted = "com.modular.eventbus.bus.started" EventTypeBusStopped = "com.modular.eventbus.bus.stopped" // Configuration events EventTypeConfigLoaded = "com.modular.eventbus.config.loaded" )
Event type constants for eventbus module events. Following CloudEvents specification reverse domain notation.
const ModuleName = "eventbus"
ModuleName is the unique identifier for the eventbus module.
const ServiceName = "eventbus.provider"
ServiceName is the name of the service provided by this module. Other modules can use this name to request the event bus service through dependency injection.
Variables ¶
var ( ErrDuplicateEngineName = errors.New("duplicate engine name") ErrUnknownEngineRef = errors.New("routing rule references unknown engine") )
Static errors for validation
var ( ErrUnknownEngineType = errors.New("unknown engine type") ErrEngineNotFound = errors.New("engine not found") ErrSubscriptionNotFound = errors.New("subscription not found in any engine") )
Static errors for engine registry
var ( ErrEventBusNotStarted = errors.New("event bus not started") ErrEventBusShutdownTimeout = errors.New("event bus shutdown timed out") ErrEventHandlerNil = errors.New("event handler cannot be nil") ErrInvalidSubscriptionType = errors.New("invalid subscription type") )
EventBus errors
var (
ErrInvalidShardCount = errors.New("invalid shard count")
)
Static errors for Kinesis
var ( // ErrNoSubjectForEventEmission is returned when trying to emit events without a subject ErrNoSubjectForEventEmission = errors.New("no subject available for event emission") )
Module-specific errors for eventbus module. These errors are defined locally to ensure proper linting compliance.
Functions ¶
func GetRegisteredEngines ¶ added in v0.1.1
func GetRegisteredEngines() []string
GetRegisteredEngines returns a list of all registered engine types.
func NewModule ¶
NewModule creates a new instance of the event bus module. This is the primary constructor for the eventbus module and should be used when registering the module with the application.
Example:
app.RegisterModule(eventbus.NewModule())
func RegisterEngine ¶ added in v0.1.1
func RegisterEngine(engineType string, factory EngineFactory)
RegisterEngine registers a new engine type with its factory function. This allows custom engines to be registered at runtime.
Example:
eventbus.RegisterEngine("custom", func(config map[string]interface{}) (EventBus, error) { return NewCustomEngine(config), nil })
Types ¶
type CustomMemoryConfig ¶ added in v0.1.1
type CustomMemoryConfig struct { MaxEventQueueSize int `json:"maxEventQueueSize"` DefaultEventBufferSize int `json:"defaultEventBufferSize"` EnableMetrics bool `json:"enableMetrics"` MetricsInterval time.Duration `json:"metricsInterval"` EventFilters []map[string]interface{} `json:"eventFilters"` }
CustomMemoryConfig holds configuration for the custom memory engine
type CustomMemoryEventBus ¶ added in v0.1.1
type CustomMemoryEventBus struct {
// contains filtered or unexported fields
}
CustomMemoryEventBus is an example custom implementation of the EventBus interface. This demonstrates how to create and register custom engines. Unlike the standard memory engine, this one includes additional features like event metrics collection, custom event filtering, and enhanced subscription management.
func (*CustomMemoryEventBus) GetMetrics ¶ added in v0.1.1
func (c *CustomMemoryEventBus) GetMetrics() *EventMetrics
GetMetrics returns current event metrics (additional method not in EventBus interface)
func (*CustomMemoryEventBus) Publish ¶ added in v0.1.1
func (c *CustomMemoryEventBus) Publish(ctx context.Context, event Event) error
Publish sends an event to the specified topic with custom filtering and metrics
func (*CustomMemoryEventBus) Start ¶ added in v0.1.1
func (c *CustomMemoryEventBus) Start(ctx context.Context) error
Start initializes the custom memory event bus
func (*CustomMemoryEventBus) Stop ¶ added in v0.1.1
func (c *CustomMemoryEventBus) Stop(ctx context.Context) error
Stop shuts down the custom memory event bus
func (*CustomMemoryEventBus) Subscribe ¶ added in v0.1.1
func (c *CustomMemoryEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
Subscribe registers a handler for a topic
func (*CustomMemoryEventBus) SubscribeAsync ¶ added in v0.1.1
func (c *CustomMemoryEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
SubscribeAsync registers a handler for a topic with asynchronous processing
func (*CustomMemoryEventBus) SubscriberCount ¶ added in v0.1.1
func (c *CustomMemoryEventBus) SubscriberCount(topic string) int
SubscriberCount returns the number of subscribers for a topic
func (*CustomMemoryEventBus) Topics ¶ added in v0.1.1
func (c *CustomMemoryEventBus) Topics() []string
Topics returns a list of all active topics
func (*CustomMemoryEventBus) Unsubscribe ¶ added in v0.1.1
func (c *CustomMemoryEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error
Unsubscribe removes a subscription
type DatadogStatsdExporter ¶ added in v0.1.1
type DatadogStatsdExporter struct {
// contains filtered or unexported fields
}
func NewDatadogStatsdExporter ¶ added in v0.1.1
func NewDatadogStatsdExporter(eventBus *EventBusModule, prefix, addr string, interval time.Duration, baseTags []string) (*DatadogStatsdExporter, error)
NewDatadogStatsdExporter creates a new exporter. addr example: "127.0.0.1:8125". prefix defaults to "eventbus" if empty. interval must be > 0.
func (*DatadogStatsdExporter) Close ¶ added in v0.1.1
func (e *DatadogStatsdExporter) Close() error
Close closes underlying statsd client.
func (*DatadogStatsdExporter) Run ¶ added in v0.1.1
func (e *DatadogStatsdExporter) Run(ctx context.Context)
Run starts the export loop until context cancellation.
type DeliveryStats ¶ added in v0.1.1
type DeliveryStats struct { Delivered uint64 `json:"delivered" yaml:"delivered"` Dropped uint64 `json:"dropped" yaml:"dropped"` }
DeliveryStats represents basic delivery outcomes for an engine or aggregate. These counters are monotonically increasing from module start. They are intentionally simple (uint64) to keep overhead negligible; consumers wanting rates should compute deltas externally.
type EngineConfig ¶ added in v0.1.1
type EngineConfig struct { // Name is the unique identifier for this engine instance. // Used for routing and engine selection. Name string `json:"name" yaml:"name" validate:"required"` // Type specifies the engine implementation to use. // Supported values: "memory", "redis", "kafka", "kinesis", "custom" Type string `json:"type" yaml:"type" validate:"required,oneof=memory redis kafka kinesis custom"` // Config contains engine-specific configuration as a map. // The structure depends on the engine type. Config map[string]interface{} `json:"config,omitempty" yaml:"config,omitempty"` }
EngineConfig defines the configuration for an individual event bus engine. Each engine can have its own specific configuration requirements.
type EngineFactory ¶ added in v0.1.1
EngineFactory is a function that creates an EventBus implementation. It receives the engine configuration and returns a configured EventBus instance.
type EngineRouter ¶ added in v0.1.1
type EngineRouter struct {
// contains filtered or unexported fields
}
EngineRouter manages multiple event bus engines and routes events based on configuration.
func NewEngineRouter ¶ added in v0.1.1
func NewEngineRouter(config *EventBusConfig) (*EngineRouter, error)
NewEngineRouter creates a new engine router with the given configuration.
func (*EngineRouter) CollectPerEngineStats ¶ added in v0.1.1
func (r *EngineRouter) CollectPerEngineStats() map[string]DeliveryStats
CollectPerEngineStats returns per-engine delivery statistics for engines that expose them (currently only the in-memory engine). Engines that do not implement statistics are omitted from the returned map. This is useful for fine‑grained monitoring and test verification without exposing internal engine details elsewhere.
func (*EngineRouter) CollectStats ¶ added in v0.1.1
func (r *EngineRouter) CollectStats() (delivered uint64, dropped uint64)
CollectStats aggregates delivery statistics from engines that expose them. At present only the in-memory engine exposes Stats(). Engines that don't implement Stats() are simply skipped. This keeps the method safe to call in multi-engine configurations mixing different backend types.
func (*EngineRouter) GetEngineForTopic ¶ added in v0.1.1
func (r *EngineRouter) GetEngineForTopic(topic string) string
GetEngineForTopic returns the name of the engine that handles the specified topic. This is useful for debugging and monitoring.
func (*EngineRouter) GetEngineNames ¶ added in v0.1.1
func (r *EngineRouter) GetEngineNames() []string
GetEngineNames returns the names of all configured engines.
func (*EngineRouter) Publish ¶ added in v0.1.1
func (r *EngineRouter) Publish(ctx context.Context, event Event) error
Publish publishes an event to the appropriate engine based on routing rules.
func (*EngineRouter) SetModuleReference ¶ added in v0.1.1
func (r *EngineRouter) SetModuleReference(module *EventBusModule)
SetModuleReference sets the module reference for all memory event buses This enables memory engines to emit events through the module
func (*EngineRouter) Start ¶ added in v0.1.1
func (r *EngineRouter) Start(ctx context.Context) error
Start starts all managed engines.
func (*EngineRouter) Stop ¶ added in v0.1.1
func (r *EngineRouter) Stop(ctx context.Context) error
Stop stops all managed engines.
func (*EngineRouter) Subscribe ¶ added in v0.1.1
func (r *EngineRouter) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
Subscribe subscribes to a topic using the appropriate engine. The subscription is created on the engine that handles the specified topic.
func (*EngineRouter) SubscribeAsync ¶ added in v0.1.1
func (r *EngineRouter) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
SubscribeAsync subscribes to a topic asynchronously using the appropriate engine.
func (*EngineRouter) SubscriberCount ¶ added in v0.1.1
func (r *EngineRouter) SubscriberCount(topic string) int
SubscriberCount returns the total number of subscribers for a topic across all engines.
func (*EngineRouter) Topics ¶ added in v0.1.1
func (r *EngineRouter) Topics() []string
Topics returns all active topics from all engines.
func (*EngineRouter) Unsubscribe ¶ added in v0.1.1
func (r *EngineRouter) Unsubscribe(ctx context.Context, subscription Subscription) error
Unsubscribe removes a subscription from its engine.
type Event ¶
type Event struct { // Topic is the channel or subject of the event. // Topics are used for routing events to the appropriate subscribers. // Topic names can use hierarchical patterns like "user.created" or "order.payment.failed". Topic string `json:"topic"` // Payload is the data associated with the event. // This can be any serializable data structure that represents // the event's information. The payload type should be consistent // for events within the same topic. Payload interface{} `json:"payload"` // Metadata contains additional information about the event. // This can include source information, correlation IDs, version numbers, // or any other contextual data that doesn't belong in the main payload. // Optional field that can be nil if no metadata is needed. Metadata map[string]interface{} `json:"metadata,omitempty"` // CreatedAt is when the event was created. // This timestamp is set automatically when the event is published // and can be used for event ordering, TTL calculations, and debugging. CreatedAt time.Time `json:"createdAt"` // ProcessingStarted is when the event processing started. // This field is set when an event handler begins processing the event. // Used for performance monitoring and timeout detection. ProcessingStarted *time.Time `json:"processingStarted,omitempty"` // ProcessingCompleted is when the event processing completed. // This field is set when an event handler finishes processing the event, // whether successfully or with an error. Used for performance monitoring // and event lifecycle tracking. ProcessingCompleted *time.Time `json:"processingCompleted,omitempty"` }
Event represents a message in the event bus. Events are the core data structure used for communication between publishers and subscribers. They contain the message data along with metadata for tracking and processing.
type EventBus ¶
type EventBus interface { // Start initializes the event bus. // This method is called during module startup and should prepare // the event bus for publishing and subscribing operations. // For memory buses, this might initialize internal data structures. // For network-based buses, this establishes connections. Start(ctx context.Context) error // Stop shuts down the event bus. // This method is called during module shutdown and should cleanup // all resources, close connections, and stop background processes. // It should ensure all in-flight events are processed before returning. Stop(ctx context.Context) error // Publish sends an event to the specified topic. // The event will be delivered to all active subscribers of the topic. // The method should handle event queuing, topic routing, and delivery // according to the engine's semantics. Publish(ctx context.Context, event Event) error // Subscribe registers a handler for a topic with synchronous processing. // Events matching the topic will be delivered immediately to the handler // in the same goroutine that published them. The publisher will wait // for the handler to complete before continuing. Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error) // SubscribeAsync registers a handler for a topic with asynchronous processing. // Events matching the topic will be queued for processing by worker goroutines. // The publisher can continue immediately without waiting for processing. // This is preferred for heavy operations or non-critical event handling. SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error) // Unsubscribe removes a subscription. // After unsubscribing, the subscription will no longer receive events. // This method should be idempotent and not return errors for // subscriptions that are already cancelled. Unsubscribe(ctx context.Context, subscription Subscription) error // Topics returns a list of all active topics. // This includes only topics that currently have at least one subscriber. // Useful for monitoring, debugging, and administrative interfaces. Topics() []string // SubscriberCount returns the number of subscribers for a topic. // This includes both synchronous and asynchronous subscriptions. // Returns 0 if the topic has no subscribers or doesn't exist. SubscriberCount(topic string) int }
EventBus defines the interface for an event bus implementation. This interface abstracts the underlying messaging mechanism, allowing the eventbus module to support multiple backends (memory, Redis, Kafka) through a common API.
All operations are context-aware to support cancellation and timeouts. Implementations should be thread-safe and handle concurrent access properly.
func NewCustomMemoryEventBus ¶ added in v0.1.1
NewCustomMemoryEventBus creates a new custom memory-based event bus
func NewKafkaEventBus ¶ added in v0.1.1
NewKafkaEventBus creates a new Kafka-based event bus
func NewKinesisEventBus ¶ added in v0.1.1
NewKinesisEventBus creates a new Kinesis-based event bus
func NewRedisEventBus ¶ added in v0.1.1
NewRedisEventBus creates a new Redis-based event bus
type EventBusConfig ¶
type EventBusConfig struct { // Engine specifies the event bus engine to use for single-engine mode. // Supported values: "memory", "redis", "kafka", "kinesis" // Default: "memory" // Note: This field is used only when Engines is empty (legacy mode) Engine string `json:"engine,omitempty" yaml:"engine,omitempty" validate:"omitempty,oneof=memory redis kafka kinesis" env:"ENGINE"` // MaxEventQueueSize is the maximum number of events to queue per topic. // When this limit is reached, new events may be dropped or publishers // may be blocked, depending on the engine implementation. // Must be at least 1. Used in single-engine mode. MaxEventQueueSize int `json:"maxEventQueueSize,omitempty" yaml:"maxEventQueueSize,omitempty" validate:"omitempty,min=1" env:"MAX_EVENT_QUEUE_SIZE"` // DefaultEventBufferSize is the default buffer size for subscription channels. // This affects how many events can be buffered for each subscription before // blocking. Larger buffers can improve performance but use more memory. // Must be at least 1. Used in single-engine mode. DefaultEventBufferSize int `` /* 138-byte string literal not displayed */ // WorkerCount is the number of worker goroutines for async event processing. // These workers process events from asynchronous subscriptions. More workers // can increase throughput but also increase resource usage. // Must be at least 1. Used in single-engine mode. WorkerCount int `json:"workerCount,omitempty" yaml:"workerCount,omitempty" validate:"omitempty,min=1" env:"WORKER_COUNT"` // DeliveryMode controls how publish behaves when a subscriber queue is full. // drop - (default) non-blocking send; event is dropped if subscriber channel is full // block - block indefinitely until space is available in the subscriber channel // timeout - block up to PublishBlockTimeout then drop if still full // This applies to the memory engine. Other engines may implement differently. DeliveryMode string `json:"deliveryMode,omitempty" yaml:"deliveryMode,omitempty" validate:"omitempty,oneof=drop block timeout" env:"DELIVERY_MODE"` // PublishBlockTimeout is used when DeliveryMode == "timeout". Zero means no wait. PublishBlockTimeout time.Duration `json:"publishBlockTimeout,omitempty" yaml:"publishBlockTimeout,omitempty" env:"PUBLISH_BLOCK_TIMEOUT"` // RotateSubscriberOrder when true rotates the ordering of subscribers per publish // to reduce starvation and provide fairer drop distribution. This is now OPT-IN. // Historical note: an earlier revision forced this to true during validation which // made it impossible for users to explicitly disable the feature (a plain bool // cannot distinguish an "unset" zero value from an explicitly configured false). // We intentionally removed the auto-enable logic so that leaving the field absent // (or false) will NOT enable rotation. Users that want fairness rotation must set // rotateSubscriberOrder: true explicitly in configuration. This trades a changed // default for honoring explicit operator intent. RotateSubscriberOrder bool `json:"rotateSubscriberOrder,omitempty" yaml:"rotateSubscriberOrder,omitempty" env:"ROTATE_SUBSCRIBER_ORDER"` // EventTTL is the time to live for events. // Events older than this value may be automatically removed from queues // or marked as expired. Used for event cleanup and storage management. EventTTL time.Duration `json:"eventTTL,omitempty" yaml:"eventTTL,omitempty" env:"EVENT_TTL" default:"3600s"` // RetentionDays is how many days to retain event history. // This affects event storage and cleanup policies. Longer retention // allows for event replay and debugging but requires more storage. // Must be at least 1. Used in single-engine mode. RetentionDays int `json:"retentionDays,omitempty" yaml:"retentionDays,omitempty" validate:"omitempty,min=1" env:"RETENTION_DAYS"` // ExternalBrokerURL is the connection URL for external message brokers. // Used when the engine is set to "redis", "kafka", or "kinesis". The format depends // on the specific broker type. // Examples: // Redis: "redis://localhost:6379" or "redis://user:pass@host:port/db" // Kafka: "kafka://localhost:9092" or "kafka://broker1:9092,broker2:9092" // Kinesis: "https://kinesis.us-east-1.amazonaws.com" ExternalBrokerURL string `json:"externalBrokerURL,omitempty" yaml:"externalBrokerURL,omitempty" env:"EXTERNAL_BROKER_URL"` // ExternalBrokerUser is the username for external broker authentication. // Used when the external broker requires authentication. // Leave empty if the broker doesn't require authentication. ExternalBrokerUser string `json:"externalBrokerUser,omitempty" yaml:"externalBrokerUser,omitempty" env:"EXTERNAL_BROKER_USER"` // ExternalBrokerPassword is the password for external broker authentication. // Used when the external broker requires authentication. // Leave empty if the broker doesn't require authentication. // This should be kept secure and may be provided via environment variables. ExternalBrokerPassword string `json:"externalBrokerPassword,omitempty" yaml:"externalBrokerPassword,omitempty" env:"EXTERNAL_BROKER_PASSWORD"` // Engines defines multiple event bus engines that can be used simultaneously. // When this field is populated, it takes precedence over the single-engine fields above. Engines []EngineConfig `json:"engines,omitempty" yaml:"engines,omitempty" validate:"dive"` // Routing defines how topics are routed to different engines. // Rules are evaluated in order, and the first matching rule is used. // If no routing rules are specified and multiple engines are configured, // all topics will be routed to the first engine. Routing []RoutingRule `json:"routing,omitempty" yaml:"routing,omitempty" validate:"dive"` }
EventBusConfig defines the configuration for the event bus module. This structure supports both single-engine (legacy) and multi-engine configurations.
Example single-engine YAML configuration (legacy, still supported):
engine: "memory" maxEventQueueSize: 1000 workerCount: 5
Example multi-engine YAML configuration:
engines: - name: "memory" type: "memory" config: workerCount: 5 maxEventQueueSize: 1000 - name: "redis" type: "redis" config: url: "redis://localhost:6379" db: 0 routing: - topics: ["user.*", "auth.*"] engine: "memory" - topics: ["*"] engine: "redis"
func (*EventBusConfig) GetDefaultEngine ¶ added in v0.1.1
func (c *EventBusConfig) GetDefaultEngine() string
GetDefaultEngine returns the name of the default engine to use. For single-engine mode, returns "default". For multi-engine mode, returns the name of the first engine.
func (*EventBusConfig) IsMultiEngine ¶ added in v0.1.1
func (c *EventBusConfig) IsMultiEngine() bool
IsMultiEngine returns true if this configuration uses multiple engines.
func (*EventBusConfig) ValidateConfig ¶ added in v0.1.1
func (c *EventBusConfig) ValidateConfig() error
ValidateConfig performs additional validation on the configuration. This is called after basic struct tag validation.
type EventBusModule ¶
type EventBusModule struct {
// contains filtered or unexported fields
}
EventBusModule provides event-driven messaging capabilities for the modular framework. It implements a publish-subscribe pattern with support for multiple event bus engines, asynchronous processing, and flexible subscription management.
The module implements the following interfaces:
- modular.Module: Basic module lifecycle
- modular.Configurable: Configuration management
- modular.ServiceAware: Service dependency management
- modular.Startable: Startup logic
- modular.Stoppable: Shutdown logic
- modular.ObservableModule: Event observation and emission
- EventBus: Event publishing and subscription interface
Event processing is thread-safe and supports concurrent publishers and subscribers.
func (*EventBusModule) Constructor ¶
func (m *EventBusModule) Constructor() modular.ModuleConstructor
Constructor provides a dependency injection constructor for the module. This method is used by the dependency injection system to create the module instance with any required services.
func (*EventBusModule) Dependencies ¶
func (m *EventBusModule) Dependencies() []string
Dependencies returns the names of modules this module depends on. The eventbus module operates independently and has no dependencies.
func (*EventBusModule) EmitEvent ¶ added in v0.1.1
func (m *EventBusModule) EmitEvent(ctx context.Context, event cloudevents.Event) error
EmitEvent implements the ObservableModule interface. This allows the eventbus module to emit events to registered observers.
func (*EventBusModule) GetRegisteredEventTypes ¶ added in v0.1.1
func (m *EventBusModule) GetRegisteredEventTypes() []string
GetRegisteredEventTypes implements the ObservableModule interface. Returns all event types that this eventbus module can emit.
func (*EventBusModule) GetRouter ¶ added in v0.1.1
func (m *EventBusModule) GetRouter() *EngineRouter
GetRouter returns the underlying engine router for advanced operations. This method provides access to engine-specific functionality like checking which engine a topic routes to.
Example:
router := eventBus.GetRouter() engine := router.GetEngineForTopic("user.created") fmt.Printf("Topic routes to engine: %s", engine)
func (*EventBusModule) Init ¶
func (m *EventBusModule) Init(app modular.Application) error
Init initializes the eventbus module with the application context. This method is called after all modules have been registered and their configurations loaded. It sets up the event bus engine(s) based on configuration.
The initialization process:
- Retrieves the module's configuration
- Sets up logging
- Validates configuration
- Initializes the engine router with configured engines
- Prepares the event bus for startup
Supported engines:
- "memory": In-process event bus using Go channels
- "redis": Distributed event bus using Redis pub/sub
- "kafka": Enterprise event bus using Apache Kafka
- "kinesis": AWS Kinesis streams
- "custom": Custom engine implementations
func (*EventBusModule) Name ¶
func (m *EventBusModule) Name() string
Name returns the unique identifier for this module. This name is used for service registration, dependency resolution, and configuration section identification.
func (*EventBusModule) PerEngineStats ¶ added in v0.1.1
func (m *EventBusModule) PerEngineStats() map[string]DeliveryStats
PerEngineStats returns delivery statistics broken down per configured engine (only engines that expose stats are included). Safe to call before Start; returns an empty map if router not yet built.
func (*EventBusModule) ProvidesServices ¶
func (m *EventBusModule) ProvidesServices() []modular.ServiceProvider
ProvidesServices declares services provided by this module. The eventbus module provides an event bus service that can be injected into other modules for event-driven communication.
Provided services:
- "eventbus.provider": The main event bus service interface
func (*EventBusModule) Publish ¶
func (m *EventBusModule) Publish(ctx context.Context, topic string, payload interface{}) error
Publish publishes an event to the event bus. Creates an Event struct with the provided topic and payload, then sends it through the event bus for processing by subscribers.
The event will be delivered to all active subscribers of the topic. Topic patterns and wildcards may be supported depending on the engine. With multiple engines, the event is routed to the appropriate engine based on the configured routing rules.
Example:
err := eventBus.Publish(ctx, "user.created", userData) err := eventBus.Publish(ctx, "order.payment.failed", paymentData)
func (*EventBusModule) RegisterConfig ¶
func (m *EventBusModule) RegisterConfig(app modular.Application) error
RegisterConfig registers the module's configuration structure. This method is called during application initialization to register the default configuration values for the eventbus module.
Default configuration:
- Engine: "memory"
- MaxEventQueueSize: 1000 events per topic
- DefaultEventBufferSize: 10 events per subscription channel
- WorkerCount: 5 async processing workers
- EventTTL: 3600 seconds (1 hour)
- RetentionDays: 7 days for event history
- ExternalBroker settings: empty (not used for memory engine)
func (*EventBusModule) RegisterObservers ¶ added in v0.1.1
func (m *EventBusModule) RegisterObservers(subject modular.Subject) error
RegisterObservers implements the ObservableModule interface. This allows the eventbus module to register as an observer for events it's interested in.
func (*EventBusModule) RequiresServices ¶
func (m *EventBusModule) RequiresServices() []modular.ServiceDependency
RequiresServices declares services required by this module. The eventbus module operates independently and requires no external services.
func (*EventBusModule) Start ¶
func (m *EventBusModule) Start(ctx context.Context) error
Start performs startup logic for the module. This method starts all configured event bus engines and begins processing events. It's called after all modules have been initialized and are ready to start.
The startup process:
- Checks if already started (idempotent)
- Starts all underlying event bus engines
- Initializes worker pools for async processing
- Prepares topic management and subscription tracking
This method is thread-safe and can be called multiple times safely.
func (*EventBusModule) Stats ¶ added in v0.1.1
func (m *EventBusModule) Stats() (delivered uint64, dropped uint64)
Stats returns aggregated delivery statistics for all underlying engines that support them (currently only the in-memory engine). This is intended for lightweight monitoring/metrics and testing. Returns zeros if the module has not been started yet or no engines expose stats.
func (*EventBusModule) Stop ¶
func (m *EventBusModule) Stop(ctx context.Context) error
Stop performs shutdown logic for the module. This method gracefully shuts down all event bus engines, ensuring all in-flight events are processed and all subscriptions are properly cleaned up.
The shutdown process:
- Checks if already stopped (idempotent)
- Stops accepting new events
- Waits for in-flight events to complete
- Cancels all active subscriptions
- Shuts down worker pools
- Closes all underlying event bus engines
This method is thread-safe and can be called multiple times safely.
func (*EventBusModule) Subscribe ¶
func (m *EventBusModule) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
Subscribe subscribes to a topic on the event bus with synchronous processing. The provided handler will be called immediately when an event is published to the specified topic. The handler blocks the event delivery until it completes.
With multiple engines, the subscription is created on the engine that handles the specified topic according to the routing configuration.
Use synchronous subscriptions for:
- Lightweight event processing
- When event ordering is important
- Critical event handlers that must complete before continuing
Example:
subscription, err := eventBus.Subscribe(ctx, "user.login", func(ctx context.Context, event Event) error { user := event.Payload.(UserData) return updateLastLoginTime(user.ID) })
func (*EventBusModule) SubscribeAsync ¶
func (m *EventBusModule) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
SubscribeAsync subscribes to a topic with asynchronous event processing. The provided handler will be queued for processing by worker goroutines, allowing the event publisher to continue without waiting for processing.
With multiple engines, the subscription is created on the engine that handles the specified topic according to the routing configuration.
Use asynchronous subscriptions for:
- Heavy processing operations
- External API calls
- Non-critical event handlers
- When you want to avoid blocking publishers
Example:
subscription, err := eventBus.SubscribeAsync(ctx, "image.uploaded", func(ctx context.Context, event Event) error { imageData := event.Payload.(ImageData) return generateThumbnails(imageData) })
func (*EventBusModule) SubscriberCount ¶
func (m *EventBusModule) SubscriberCount(topic string) int
SubscriberCount returns the number of active subscribers for a topic. This includes both synchronous and asynchronous subscriptions. Returns 0 if the topic has no subscribers.
Example:
count := eventBus.SubscriberCount("user.created") if count == 0 { log.Warn("No subscribers for user creation events") }
func (*EventBusModule) Topics ¶
func (m *EventBusModule) Topics() []string
Topics returns a list of all active topics that have subscribers. This can be useful for debugging, monitoring, or building administrative interfaces that show current event bus activity.
Example:
activeTopics := eventBus.Topics() for _, topic := range activeTopics { count := eventBus.SubscriberCount(topic) fmt.Printf("Topic: %s, Subscribers: %d\n", topic, count) }
func (*EventBusModule) Unsubscribe ¶
func (m *EventBusModule) Unsubscribe(ctx context.Context, subscription Subscription) error
Unsubscribe cancels a subscription and stops receiving events. The subscription will be removed from the event bus and no longer receive events for its topic.
This method is idempotent - calling it multiple times on the same subscription is safe and will not cause errors.
Example:
err := eventBus.Unsubscribe(ctx, subscription)
type EventFilter ¶ added in v0.1.1
EventFilter defines a filter that can be applied to events
type EventHandler ¶
EventHandler is a function that handles an event. Event handlers are called when an event matching their subscription topic is published. Handlers should be idempotent when possible and handle errors gracefully.
The context can be used for cancellation, timeouts, and passing request-scoped values. Handlers should respect context cancellation and return promptly when the context is cancelled.
Example handler:
func userCreatedHandler(ctx context.Context, event Event) error { user := event.Payload.(UserData) return sendWelcomeEmail(ctx, user.Email) }
type EventMetrics ¶ added in v0.1.1
type EventMetrics struct { TotalEvents int64 `json:"totalEvents"` EventsPerTopic map[string]int64 `json:"eventsPerTopic"` AverageProcessingTime time.Duration `json:"averageProcessingTime"` LastResetTime time.Time `json:"lastResetTime"` // contains filtered or unexported fields }
EventMetrics holds metrics about event processing
type KafkaConfig ¶ added in v0.1.1
type KafkaConfig struct { Brokers []string `json:"brokers"` GroupID string `json:"groupId"` SecurityConfig map[string]string `json:"security"` ProducerConfig map[string]string `json:"producer"` ConsumerConfig map[string]string `json:"consumer"` }
KafkaConfig holds Kafka-specific configuration
type KafkaConsumerGroupHandler ¶ added in v0.1.1
type KafkaConsumerGroupHandler struct {
// contains filtered or unexported fields
}
KafkaConsumerGroupHandler implements sarama.ConsumerGroupHandler
func (*KafkaConsumerGroupHandler) Cleanup ¶ added in v0.1.1
func (h *KafkaConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is called at the end of a session, once all ConsumeClaim goroutines have exited
func (*KafkaConsumerGroupHandler) ConsumeClaim ¶ added in v0.1.1
func (h *KafkaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim processes messages from a Kafka partition
func (*KafkaConsumerGroupHandler) Setup ¶ added in v0.1.1
func (h *KafkaConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
Setup is called at the beginning of a new session, before ConsumeClaim
type KafkaEventBus ¶ added in v0.1.1
type KafkaEventBus struct {
// contains filtered or unexported fields
}
KafkaEventBus implements EventBus using Apache Kafka
func (*KafkaEventBus) Publish ¶ added in v0.1.1
func (k *KafkaEventBus) Publish(ctx context.Context, event Event) error
Publish sends an event to the specified topic using Kafka
func (*KafkaEventBus) Start ¶ added in v0.1.1
func (k *KafkaEventBus) Start(ctx context.Context) error
Start initializes the Kafka event bus
func (*KafkaEventBus) Stop ¶ added in v0.1.1
func (k *KafkaEventBus) Stop(ctx context.Context) error
Stop shuts down the Kafka event bus
func (*KafkaEventBus) Subscribe ¶ added in v0.1.1
func (k *KafkaEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
Subscribe registers a handler for a topic
func (*KafkaEventBus) SubscribeAsync ¶ added in v0.1.1
func (k *KafkaEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
SubscribeAsync registers a handler for a topic with asynchronous processing
func (*KafkaEventBus) SubscriberCount ¶ added in v0.1.1
func (k *KafkaEventBus) SubscriberCount(topic string) int
SubscriberCount returns the number of subscribers for a topic
func (*KafkaEventBus) Topics ¶ added in v0.1.1
func (k *KafkaEventBus) Topics() []string
Topics returns a list of all active topics
func (*KafkaEventBus) Unsubscribe ¶ added in v0.1.1
func (k *KafkaEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error
Unsubscribe removes a subscription
type KinesisConfig ¶ added in v0.1.1
type KinesisConfig struct { Region string `json:"region"` StreamName string `json:"streamName"` AccessKeyID string `json:"accessKeyId"` SecretAccessKey string `json:"secretAccessKey"` SessionToken string `json:"sessionToken"` ShardCount int32 `json:"shardCount"` }
KinesisConfig holds Kinesis-specific configuration
type KinesisEventBus ¶ added in v0.1.1
type KinesisEventBus struct {
// contains filtered or unexported fields
}
KinesisEventBus implements EventBus using AWS Kinesis
func (*KinesisEventBus) Publish ¶ added in v0.1.1
func (k *KinesisEventBus) Publish(ctx context.Context, event Event) error
Publish sends an event to the specified topic using Kinesis
func (*KinesisEventBus) Start ¶ added in v0.1.1
func (k *KinesisEventBus) Start(ctx context.Context) error
Start initializes the Kinesis event bus
func (*KinesisEventBus) Stop ¶ added in v0.1.1
func (k *KinesisEventBus) Stop(ctx context.Context) error
Stop shuts down the Kinesis event bus
func (*KinesisEventBus) Subscribe ¶ added in v0.1.1
func (k *KinesisEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
Subscribe registers a handler for a topic
func (*KinesisEventBus) SubscribeAsync ¶ added in v0.1.1
func (k *KinesisEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
SubscribeAsync registers a handler for a topic with asynchronous processing
func (*KinesisEventBus) SubscriberCount ¶ added in v0.1.1
func (k *KinesisEventBus) SubscriberCount(topic string) int
SubscriberCount returns the number of subscribers for a topic
func (*KinesisEventBus) Topics ¶ added in v0.1.1
func (k *KinesisEventBus) Topics() []string
Topics returns a list of all active topics
func (*KinesisEventBus) Unsubscribe ¶ added in v0.1.1
func (k *KinesisEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error
Unsubscribe removes a subscription
type MemoryEventBus ¶
type MemoryEventBus struct {
// contains filtered or unexported fields
}
MemoryEventBus implements EventBus using in-memory channels
func NewMemoryEventBus ¶
func NewMemoryEventBus(config *EventBusConfig) *MemoryEventBus
NewMemoryEventBus creates a new in-memory event bus
func (*MemoryEventBus) Publish ¶
func (m *MemoryEventBus) Publish(ctx context.Context, event Event) error
Publish sends an event to the specified topic
func (*MemoryEventBus) SetModule ¶ added in v0.1.1
func (m *MemoryEventBus) SetModule(module *EventBusModule)
SetModule sets the parent module for event emission
func (*MemoryEventBus) Start ¶
func (m *MemoryEventBus) Start(ctx context.Context) error
Start initializes the event bus
func (*MemoryEventBus) Stats ¶ added in v0.1.1
func (m *MemoryEventBus) Stats() (delivered uint64, dropped uint64)
Stats returns basic delivery stats for monitoring/testing.
func (*MemoryEventBus) Stop ¶
func (m *MemoryEventBus) Stop(ctx context.Context) error
Stop shuts down the event bus
func (*MemoryEventBus) Subscribe ¶
func (m *MemoryEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
Subscribe registers a handler for a topic
func (*MemoryEventBus) SubscribeAsync ¶
func (m *MemoryEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
SubscribeAsync registers a handler for a topic with asynchronous processing
func (*MemoryEventBus) SubscriberCount ¶
func (m *MemoryEventBus) SubscriberCount(topic string) int
SubscriberCount returns the number of subscribers for a topic
func (*MemoryEventBus) Topics ¶
func (m *MemoryEventBus) Topics() []string
Topics returns a list of all active topics
func (*MemoryEventBus) Unsubscribe ¶
func (m *MemoryEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error
Unsubscribe removes a subscription
type PrometheusCollector ¶ added in v0.1.1
type PrometheusCollector struct {
// contains filtered or unexported fields
}
func NewPrometheusCollector ¶ added in v0.1.1
func NewPrometheusCollector(eventBus *EventBusModule, namespace string) *PrometheusCollector
NewPrometheusCollector creates a new collector for the given event bus. namespace is used as metric prefix (default if empty: modular_eventbus).
func (*PrometheusCollector) Collect ¶ added in v0.1.1
func (c *PrometheusCollector) Collect(ch chan<- prometheus.Metric)
Collect gathers current stats and emits ConstMetrics.
func (*PrometheusCollector) Describe ¶ added in v0.1.1
func (c *PrometheusCollector) Describe(ch chan<- *prometheus.Desc)
Describe sends metric descriptors.
type RedisConfig ¶ added in v0.1.1
type RedisConfig struct { URL string `json:"url"` DB int `json:"db"` Username string `json:"username"` Password string `json:"password"` PoolSize int `json:"poolSize"` }
RedisConfig holds Redis-specific configuration
type RedisEventBus ¶ added in v0.1.1
type RedisEventBus struct {
// contains filtered or unexported fields
}
RedisEventBus implements EventBus using Redis pub/sub
func (*RedisEventBus) Publish ¶ added in v0.1.1
func (r *RedisEventBus) Publish(ctx context.Context, event Event) error
Publish sends an event to the specified topic using Redis pub/sub
func (*RedisEventBus) Start ¶ added in v0.1.1
func (r *RedisEventBus) Start(ctx context.Context) error
Start initializes the Redis event bus
func (*RedisEventBus) Stop ¶ added in v0.1.1
func (r *RedisEventBus) Stop(ctx context.Context) error
Stop shuts down the Redis event bus
func (*RedisEventBus) Subscribe ¶ added in v0.1.1
func (r *RedisEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
Subscribe registers a handler for a topic
func (*RedisEventBus) SubscribeAsync ¶ added in v0.1.1
func (r *RedisEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
SubscribeAsync registers a handler for a topic with asynchronous processing
func (*RedisEventBus) SubscriberCount ¶ added in v0.1.1
func (r *RedisEventBus) SubscriberCount(topic string) int
SubscriberCount returns the number of subscribers for a topic
func (*RedisEventBus) Topics ¶ added in v0.1.1
func (r *RedisEventBus) Topics() []string
Topics returns a list of all active topics
func (*RedisEventBus) Unsubscribe ¶ added in v0.1.1
func (r *RedisEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error
Unsubscribe removes a subscription
type RoutingRule ¶ added in v0.1.1
type RoutingRule struct { // Topics is a list of topic patterns to match. // Supports wildcards like "user.*" or exact matches. Topics []string `json:"topics" yaml:"topics" validate:"required,min=1"` // Engine is the name of the engine to route matching topics to. // Must match the name of a configured engine. Engine string `json:"engine" yaml:"engine" validate:"required"` }
RoutingRule defines how topics are routed to engines.
type Subscription ¶
type Subscription interface { // Topic returns the topic being subscribed to. // This may include wildcard patterns depending on the engine implementation. Topic() string // ID returns the unique identifier for this subscription. // Each subscription gets a unique ID that can be used for tracking, // logging, and debugging purposes. ID() string // IsAsync returns true if this is an asynchronous subscription. // Asynchronous subscriptions process events in background workers, // while synchronous subscriptions process events immediately. IsAsync() bool // Cancel cancels the subscription. // After calling Cancel, the subscription will no longer receive events. // This is equivalent to calling Unsubscribe on the event bus. // The method is idempotent and safe to call multiple times. Cancel() error }
Subscription represents a subscription to a topic. Subscriptions are created when a handler is registered for a topic and provide methods for managing the subscription lifecycle.
type TopicPrefixFilter ¶ added in v0.1.1
type TopicPrefixFilter struct { AllowedPrefixes []string // contains filtered or unexported fields }
TopicPrefixFilter filters events based on topic prefix
func (*TopicPrefixFilter) Name ¶ added in v0.1.1
func (f *TopicPrefixFilter) Name() string
func (*TopicPrefixFilter) ShouldProcess ¶ added in v0.1.1
func (f *TopicPrefixFilter) ShouldProcess(event Event) bool