eventbus

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2025 License: MIT Imports: 19 Imported by: 1

README

EventBus Module

Go Reference

The EventBus Module provides a publish-subscribe messaging system for Modular applications with support for multiple concurrent engines, topic-based routing, and flexible configuration. It enables decoupled communication between components through a powerful event-driven architecture.

Features

Core Capabilities
  • Multi-Engine Support: Run multiple event bus engines simultaneously (Memory, Redis, Kafka, Kinesis, Custom)
  • Topic-Based Routing: Route events to different engines based on topic patterns
  • Synchronous & Asynchronous Processing: Support for both immediate and background event processing
  • Wildcard Topics: Subscribe to topic patterns like user.* or analytics.*
  • Event History & TTL: Configurable event retention and cleanup policies
  • Worker Pool Management: Configurable worker pools for async event processing
Supported Engines
  • Memory: In-process event bus using Go channels (default)
  • Redis: Distributed messaging using Redis pub/sub
  • Kafka: Enterprise messaging using Apache Kafka
  • Kinesis: AWS-native streaming using Amazon Kinesis
  • Custom: Support for custom engine implementations
Advanced Features
  • Custom Engine Registration: Register your own engine types
  • Configuration-Based Routing: Route topics to engines via configuration
  • Engine-Specific Configuration: Each engine can have its own settings
  • Metrics & Monitoring: Built-in metrics collection (custom engines)
  • Tenant Isolation: Support for multi-tenant applications
  • Graceful Shutdown: Proper cleanup of all engines and subscriptions
  • Delivery Stats API: Lightweight counters for delivered vs dropped events (memory engine) aggregated per-engine and module-wide
  • Metrics Exporters: Prometheus collector and Datadog StatsD exporter for delivery statistics

Installation

import (
    "github.com/GoCodeAlone/modular"
    "github.com/GoCodeAlone/modular/modules/eventbus"
)

// Register the eventbus module with your Modular application
app.RegisterModule(eventbus.NewModule())

Configuration

Single Engine Configuration (Legacy)
eventbus:
  engine: memory              # Event bus engine (memory, redis, kafka, kinesis)
  maxEventQueueSize: 1000     # Maximum events to queue per topic
  defaultEventBufferSize: 10  # Default buffer size for subscription channels
  workerCount: 5              # Worker goroutines for async event processing
  eventTTL: 3600s             # TTL for events (duration)
  retentionDays: 7            # Days to retain event history
  externalBrokerURL: ""       # URL for external message broker
  externalBrokerUser: ""      # Username for authentication
  externalBrokerPassword: ""  # Password for authentication
Multi-Engine Configuration
eventbus:
  engines:
    - name: "memory-fast"
      type: "memory"
      config:
        maxEventQueueSize: 500
        defaultEventBufferSize: 10
        workerCount: 3
        retentionDays: 1
    - name: "redis-durable"
      type: "redis"
      config:
        url: "redis://localhost:6379"
        db: 0
        poolSize: 10
    - name: "kafka-analytics"
      type: "kafka"
      config:
        brokers: ["localhost:9092"]
        groupId: "eventbus-analytics"
    - name: "kinesis-stream"
      type: "kinesis"
      config:
        region: "us-east-1"
        streamName: "events-stream"
        shardCount: 2
    - name: "custom-engine"
      type: "custom"
      config:
        enableMetrics: true
        metricsInterval: "30s"
  routing:
    - topics: ["user.*", "auth.*"]
      engine: "memory-fast"
    - topics: ["analytics.*", "metrics.*"]
      engine: "kafka-analytics"  
    - topics: ["stream.*"]
      engine: "kinesis-stream"
    - topics: ["*"]  # Fallback for all other topics
      engine: "redis-durable"

### Delivery Modes & Backpressure (Memory Engine)

The in-process memory engine supports configurable delivery semantics to balance throughput, fairness, and reliability when subscriber channels become congested.

Configuration fields (per engine config.map for a memory engine):

```yaml
eventbus:
  engines:
    - name: "memory-fast"
      type: "memory"
      config:
        # Existing settings...
        workerCount: 5
        maxEventQueueSize: 1000
        defaultEventBufferSize: 32

        # New delivery / fairness controls
        deliveryMode: drop            # drop | block | timeout (default: drop)
        publishBlockTimeout: 250ms    # only used when deliveryMode: timeout
        rotateSubscriberOrder: true   # fairness rotation (default: true)

Modes:

  • drop (default): Non-blocking send. If a subscriber channel buffer is full the event is dropped for that subscriber (other subscribers still attempted). Highest throughput, possible per-subscriber loss under bursty load.
  • block: Publisher goroutine blocks until each subscriber accepts the event (or context cancelled). Provides strongest delivery at the cost of publisher backpressure; a slow subscriber stalls publishers.
  • timeout: Like block but each subscriber send has an upper bound (publishBlockTimeout). If the timeout elapses the event is dropped for that subscriber and publishing proceeds. Reduces head-of-line blocking risk while greatly lowering starvation compared to pure drop mode.

Fairness:

  • When rotateSubscriberOrder is true (default) the memory engine performs a deterministic rotation of the subscriber slice based on a monotonically increasing publish counter. This gives each subscription a chance to be first periodically, preventing chronic starvation when buffers are near capacity.
  • When false, iteration order is the static registration order (legacy behavior) and early subscribers can dominate under sustained pressure. A light random shuffle is applied per publish as a best-effort mitigation.

Observability:

  • The memory engine maintains internal delivered and dropped counters (exposed via a Stats() method).
  • Module-level helpers expose aggregate (eventBus.Stats()) and per-engine (eventBus.PerEngineStats()) delivery counts suitable for exporting to metrics backends (Prometheus, Datadog, etc.). Example:
delivered, dropped := eventBus.Stats()
perEngine := eventBus.PerEngineStats() // map[engineName]DeliveryStats
for name, s := range perEngine {
  fmt.Printf("engine=%s delivered=%d dropped=%d\n", name, s.Delivered, s.Dropped)
}

Test Stability Note: Async subscriptions are processed via a worker pool so their delivered count may lag momentarily after publishers finish. When writing tests that compare sync vs async distribution, allow a short settling period (poll until async count stops increasing) and use wide fairness bounds (e.g. async within 25%–300% of sync) to avoid flakiness while still detecting pathological starvation.

Backward Compatibility:

  • If you do not set any of the new fields, behavior remains equivalent to previous versions (drop mode with fairness rotation enabled by default, which improves starvation resilience without changing loss semantics).

Tuning Guidance:

  • Start with drop in high-throughput low-criticality paths where occasional loss is acceptable.
  • Use timeout with a modest publishBlockTimeout (e.g. 5-50ms) for balanced fairness and latency in mixed-speed subscriber sets.
  • Reserve block for critical fan-out where all subscribers must process every event and you are comfortable applying backpressure to publishers.

Example (balanced):

eventbus:
  engines:
    - name: "memory-balanced"
      type: "memory"
      config:
        workerCount: 8
        defaultEventBufferSize: 64
        deliveryMode: timeout
        publishBlockTimeout: 25ms
        rotateSubscriberOrder: true
Metrics Export (Prometheus & Datadog)

Delivery statistics (delivered vs dropped) can be exported via the built-in Prometheus Collector or a Datadog StatsD exporter.

Prometheus

Register the collector with your Prometheus registry (global or custom):

import (
  "github.com/GoCodeAlone/modular/modules/eventbus"
  prom "github.com/prometheus/client_golang/prometheus"
  promhttp "github.com/prometheus/client_golang/prometheus/promhttp"
  "net/http"
)

// After module start and obtaining eventBus reference
collector := eventbus.NewPrometheusCollector(eventBus, "modular_eventbus")
prom.MustRegister(collector)

http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":2112", nil)

Emitted metrics (Counter):

  • modular_eventbus_delivered_total{engine="_all"} – Aggregate delivered (processed) events
  • modular_eventbus_dropped_total{engine="_all"} – Aggregate dropped (not processed) events
  • Per-engine variants with engine="<engineName>"

Example PromQL:

rate(modular_eventbus_delivered_total{engine!="_all"}[5m])
rate(modular_eventbus_dropped_total{engine="_all"}[5m])
Datadog (DogStatsD)

Start the exporter in a background goroutine. It periodically snapshots stats and emits gauges.

import (
  "time"
  "github.com/GoCodeAlone/modular/modules/eventbus"
)

exporter, err := eventbus.NewDatadogStatsdExporter(eventBus, eventbus.DatadogExporterConfig{
  Address:           "127.0.0.1:8125", // DogStatsD agent address
  Namespace:         "modular.eventbus.",
  FlushInterval:     5 * time.Second,
  MaxPacketSize:     1432,
  IncludePerEngine:  true,
  IncludeGoroutines: true,
})
if err != nil { panic(err) }
go exporter.Run() // call exporter.Close() on shutdown

Emitted gauges (namespace-prefixed):

  • delivered_total / dropped_total (tags: engine:<name> plus aggregate engine:_all)
  • go.goroutines (optional) for exporter process health

Datadog query examples:

avg:modular.eventbus.delivered_total{engine:_all}.as_count()
top(avg:modular.eventbus.dropped_total{*} by {engine}, 5, 'mean', 'desc')
Semantics

delivered counts events whose handlers executed (success or failure). dropped counts events that could not be enqueued or processed (channel full, timeout, worker pool saturation). These sets are disjoint per subscription, so delivered + dropped approximates total published events actually observed by subscribers.

Shutdown

Always call exporter.Close() (Datadog) during module/application shutdown to flush final metrics.

Extensibility

You can build custom exporters by polling eventBus.PerEngineStats() periodically and forwarding the numbers to your metrics system of choice.


## Usage

### Basic Event Publishing and Subscription

```go
// Get the eventbus service
var eventBus *eventbus.EventBusModule
err := app.GetService("eventbus.provider", &eventBus)
if err != nil {
    return fmt.Errorf("failed to get eventbus service: %w", err)
}

// Publish an event
err = eventBus.Publish(ctx, "user.created", userData)
if err != nil {
    return fmt.Errorf("failed to publish event: %w", err)
}

// Subscribe to events
subscription, err := eventBus.Subscribe(ctx, "user.created", func(ctx context.Context, event eventbus.Event) error {
    user := event.Payload.(UserData)
    fmt.Printf("User created: %s\n", user.Name)
    return nil
})
Multi-Engine Routing
// Events are automatically routed based on configured rules
eventBus.Publish(ctx, "user.login", userData)      // -> memory-fast engine
eventBus.Publish(ctx, "analytics.click", clickData) // -> kafka-analytics engine  
eventBus.Publish(ctx, "custom.event", customData)  // -> redis-durable engine (fallback)

// Check which engine handles a specific topic
router := eventBus.GetRouter()
engine := router.GetEngineForTopic("user.created")
fmt.Printf("Topic 'user.created' routes to engine: %s\n", engine)
Custom Engine Registration
// Register a custom engine type
eventbus.RegisterEngine("myengine", func(config map[string]interface{}) (eventbus.EventBus, error) {
    return NewMyCustomEngine(config), nil
})

// Use in configuration
engines:
  - name: "my-custom"
    type: "myengine" 
    config:
      customSetting: "value"

Examples

Multi-Engine Application

See examples/multi-engine-eventbus/ for a complete application demonstrating:

  • Multiple concurrent engines
  • Topic-based routing
  • Different processing patterns
  • Engine-specific configuration
  • Real-world event types
cd examples/multi-engine-eventbus
go run main.go

Sample output:

🚀 Started Multi-Engine EventBus Demo in development environment
📊 Multi-Engine EventBus Configuration:
  - memory-fast: Handles user.* and auth.* topics
  - memory-reliable: Handles analytics.*, metrics.*, and fallback topics

🔵 [MEMORY-FAST] User registered: user123 (action: register)
📈 [MEMORY-RELIABLE] Page view: /dashboard (session: sess123)
⚙️  [MEMORY-RELIABLE] System info: database - Connection established

Engine Implementations

Memory Engine (Built-in)
  • Fast in-process messaging using Go channels
  • Configurable worker pools and buffer sizes
  • Event history and TTL support
  • Perfect for single-process applications
Redis Engine
  • Distributed messaging using Redis pub/sub
  • Supports Redis authentication and connection pooling
  • Wildcard subscriptions via Redis pattern matching
  • Good for distributed applications with moderate throughput
Kafka Engine
  • Enterprise messaging using Apache Kafka
  • Consumer group support for load balancing
  • SASL authentication and SSL/TLS support
  • Ideal for high-throughput, durable messaging
Kinesis Engine
  • AWS-native streaming using Amazon Kinesis
  • Multiple shard support for scalability
  • Automatic stream management
  • Perfect for AWS-based applications with analytics needs
Custom Engine
  • Example implementation with metrics and filtering
  • Demonstrates custom engine development patterns
  • Includes event metrics collection and topic filtering
  • Template for building specialized engines

Testing

The module includes comprehensive BDD tests covering:

  • Single and multi-engine configurations
  • Topic routing and engine selection
  • Custom engine registration
  • Synchronous and asynchronous processing
  • Error handling and recovery
  • Tenant isolation scenarios
cd modules/eventbus
go test ./... -v

Migration from Single-Engine

Existing single-engine configurations continue to work unchanged. To migrate to multi-engine:

# Before (single-engine)
eventbus:
  engine: memory
  workerCount: 5

# After (multi-engine with same behavior) 
eventbus:
  engines:
    - name: "default"
      type: "memory"
      config:
        workerCount: 5

Performance Considerations

Engine Selection Guidelines
  • Memory: Best for high-performance, low-latency scenarios
  • Redis: Good for distributed applications with moderate throughput
  • Kafka: Ideal for high-throughput, durable messaging
  • Kinesis: Best for AWS-native applications with streaming analytics
  • Custom: Use for specialized requirements
Configuration Tuning
# High-throughput configuration
eventbus:
  engines:
    - name: "high-perf"
      type: "memory" 
      config:
        maxEventQueueSize: 10000
        defaultEventBufferSize: 100
        workerCount: 20

Contributing

When contributing to the eventbus module:

  1. Add tests for new engine implementations
  2. Update BDD scenarios for new features
  3. Document configuration options thoroughly
  4. Ensure backward compatibility
  5. Add examples demonstrating new capabilities

License

This module is part of the Modular framework and follows the same license terms.

Documentation

Overview

Package eventbus provides a flexible event-driven messaging system for the modular framework.

This module enables decoupled communication between application components through an event bus pattern. It supports both synchronous and asynchronous event processing, multiple event bus engines, and configurable event handling strategies.

Features

The eventbus module offers the following capabilities:

  • Topic-based event publishing and subscription
  • Synchronous and asynchronous event processing
  • Multiple engine support (memory, Redis, Kafka)
  • Configurable worker pools for async processing
  • Event metadata and lifecycle tracking
  • Subscription management with unique identifiers
  • Event TTL and retention policies

Configuration

The module can be configured through the EventBusConfig structure:

config := &EventBusConfig{
    Engine:                 "memory",    // or "redis", "kafka"
    MaxEventQueueSize:      1000,        // events per topic queue
    DefaultEventBufferSize: 10,          // subscription channel buffer
    WorkerCount:            5,           // async processing workers
    EventTTL:               3600,        // event time-to-live in seconds
    RetentionDays:          7,           // event history retention
    ExternalBrokerURL:      "",          // for external brokers
    ExternalBrokerUser:     "",          // broker authentication
    ExternalBrokerPassword: "",          // broker password
}

Service Registration

The module registers itself as a service for dependency injection:

// Get the event bus service
eventBus := app.GetService("eventbus.provider").(*EventBusModule)

// Publish an event
err := eventBus.Publish(ctx, "user.created", userData)

// Subscribe to events
subscription, err := eventBus.Subscribe(ctx, "user.*", userEventHandler)

Usage Examples

Basic event publishing:

// Publish a simple event
err := eventBus.Publish(ctx, "order.placed", orderData)

// Publish with custom metadata
event := Event{
    Topic:   "payment.processed",
    Payload: paymentData,
    Metadata: map[string]interface{}{
        "source": "payment-service",
        "version": "1.2.0",
    },
}
err := eventBus.Publish(ctx, event.Topic, event.Payload)

Event subscription patterns:

// Synchronous subscription
subscription, err := eventBus.Subscribe(ctx, "user.updated", func(ctx context.Context, event Event) error {
    user := event.Payload.(UserData)
    return updateUserCache(user)
})

// Asynchronous subscription for heavy processing
asyncSub, err := eventBus.SubscribeAsync(ctx, "image.uploaded", func(ctx context.Context, event Event) error {
    imageData := event.Payload.(ImageData)
    return processImageThumbnails(imageData)
})

// Wildcard subscriptions
allOrdersSub, err := eventBus.Subscribe(ctx, "order.*", orderEventHandler)

Subscription management:

// Check subscription details
fmt.Printf("Subscribed to: %s (ID: %s, Async: %v)",
    subscription.Topic(), subscription.ID(), subscription.IsAsync())

// Cancel specific subscriptions
err := eventBus.Unsubscribe(ctx, subscription)

// Or cancel through the subscription itself
err := subscription.Cancel()

Event Processing Patterns

The module supports different event processing patterns:

**Synchronous Processing**: Events are processed immediately in the same goroutine that published them. Best for lightweight operations and when ordering is important.

**Asynchronous Processing**: Events are queued and processed by worker goroutines. Best for heavy operations, external API calls, or when you don't want to block the publisher.

Engine Support

Currently supported engines:

  • **memory**: In-process event bus using Go channels
  • **redis**: Distributed event bus using Redis pub/sub (planned)
  • **kafka**: Enterprise event bus using Apache Kafka (planned)

Index

Constants

View Source
const (
	// Message events
	EventTypeMessagePublished = "com.modular.eventbus.message.published"
	EventTypeMessageReceived  = "com.modular.eventbus.message.received"
	EventTypeMessageFailed    = "com.modular.eventbus.message.failed"

	// Topic events
	EventTypeTopicCreated = "com.modular.eventbus.topic.created"
	EventTypeTopicDeleted = "com.modular.eventbus.topic.deleted"

	// Subscription events
	EventTypeSubscriptionCreated = "com.modular.eventbus.subscription.created"
	EventTypeSubscriptionRemoved = "com.modular.eventbus.subscription.removed"

	// Bus lifecycle events
	EventTypeBusStarted = "com.modular.eventbus.bus.started"
	EventTypeBusStopped = "com.modular.eventbus.bus.stopped"

	// Configuration events
	EventTypeConfigLoaded = "com.modular.eventbus.config.loaded"
)

Event type constants for eventbus module events. Following CloudEvents specification reverse domain notation.

View Source
const ModuleName = "eventbus"

ModuleName is the unique identifier for the eventbus module.

View Source
const ServiceName = "eventbus.provider"

ServiceName is the name of the service provided by this module. Other modules can use this name to request the event bus service through dependency injection.

Variables

View Source
var (
	ErrDuplicateEngineName = errors.New("duplicate engine name")
	ErrUnknownEngineRef    = errors.New("routing rule references unknown engine")
)

Static errors for validation

View Source
var (
	ErrUnknownEngineType    = errors.New("unknown engine type")
	ErrEngineNotFound       = errors.New("engine not found")
	ErrSubscriptionNotFound = errors.New("subscription not found in any engine")
)

Static errors for engine registry

View Source
var (
	ErrEventBusNotStarted      = errors.New("event bus not started")
	ErrEventBusShutdownTimeout = errors.New("event bus shutdown timed out")
	ErrEventHandlerNil         = errors.New("event handler cannot be nil")
	ErrInvalidSubscriptionType = errors.New("invalid subscription type")
)

EventBus errors

View Source
var (
	ErrInvalidShardCount = errors.New("invalid shard count")
)

Static errors for Kinesis

View Source
var (
	// ErrNoSubjectForEventEmission is returned when trying to emit events without a subject
	ErrNoSubjectForEventEmission = errors.New("no subject available for event emission")
)

Module-specific errors for eventbus module. These errors are defined locally to ensure proper linting compliance.

Functions

func GetRegisteredEngines added in v0.1.1

func GetRegisteredEngines() []string

GetRegisteredEngines returns a list of all registered engine types.

func NewModule

func NewModule() modular.Module

NewModule creates a new instance of the event bus module. This is the primary constructor for the eventbus module and should be used when registering the module with the application.

Example:

app.RegisterModule(eventbus.NewModule())

func RegisterEngine added in v0.1.1

func RegisterEngine(engineType string, factory EngineFactory)

RegisterEngine registers a new engine type with its factory function. This allows custom engines to be registered at runtime.

Example:

eventbus.RegisterEngine("custom", func(config map[string]interface{}) (EventBus, error) {
    return NewCustomEngine(config), nil
})

Types

type CustomMemoryConfig added in v0.1.1

type CustomMemoryConfig struct {
	MaxEventQueueSize      int                      `json:"maxEventQueueSize"`
	DefaultEventBufferSize int                      `json:"defaultEventBufferSize"`
	EnableMetrics          bool                     `json:"enableMetrics"`
	MetricsInterval        time.Duration            `json:"metricsInterval"`
	EventFilters           []map[string]interface{} `json:"eventFilters"`
}

CustomMemoryConfig holds configuration for the custom memory engine

type CustomMemoryEventBus added in v0.1.1

type CustomMemoryEventBus struct {
	// contains filtered or unexported fields
}

CustomMemoryEventBus is an example custom implementation of the EventBus interface. This demonstrates how to create and register custom engines. Unlike the standard memory engine, this one includes additional features like event metrics collection, custom event filtering, and enhanced subscription management.

func (*CustomMemoryEventBus) GetMetrics added in v0.1.1

func (c *CustomMemoryEventBus) GetMetrics() *EventMetrics

GetMetrics returns current event metrics (additional method not in EventBus interface)

func (*CustomMemoryEventBus) Publish added in v0.1.1

func (c *CustomMemoryEventBus) Publish(ctx context.Context, event Event) error

Publish sends an event to the specified topic with custom filtering and metrics

func (*CustomMemoryEventBus) Start added in v0.1.1

func (c *CustomMemoryEventBus) Start(ctx context.Context) error

Start initializes the custom memory event bus

func (*CustomMemoryEventBus) Stop added in v0.1.1

Stop shuts down the custom memory event bus

func (*CustomMemoryEventBus) Subscribe added in v0.1.1

func (c *CustomMemoryEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

Subscribe registers a handler for a topic

func (*CustomMemoryEventBus) SubscribeAsync added in v0.1.1

func (c *CustomMemoryEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

SubscribeAsync registers a handler for a topic with asynchronous processing

func (*CustomMemoryEventBus) SubscriberCount added in v0.1.1

func (c *CustomMemoryEventBus) SubscriberCount(topic string) int

SubscriberCount returns the number of subscribers for a topic

func (*CustomMemoryEventBus) Topics added in v0.1.1

func (c *CustomMemoryEventBus) Topics() []string

Topics returns a list of all active topics

func (*CustomMemoryEventBus) Unsubscribe added in v0.1.1

func (c *CustomMemoryEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error

Unsubscribe removes a subscription

type DatadogStatsdExporter added in v0.1.1

type DatadogStatsdExporter struct {
	// contains filtered or unexported fields
}

func NewDatadogStatsdExporter added in v0.1.1

func NewDatadogStatsdExporter(eventBus *EventBusModule, prefix, addr string, interval time.Duration, baseTags []string) (*DatadogStatsdExporter, error)

NewDatadogStatsdExporter creates a new exporter. addr example: "127.0.0.1:8125". prefix defaults to "eventbus" if empty. interval must be > 0.

func (*DatadogStatsdExporter) Close added in v0.1.1

func (e *DatadogStatsdExporter) Close() error

Close closes underlying statsd client.

func (*DatadogStatsdExporter) Run added in v0.1.1

Run starts the export loop until context cancellation.

type DeliveryStats added in v0.1.1

type DeliveryStats struct {
	Delivered uint64 `json:"delivered" yaml:"delivered"`
	Dropped   uint64 `json:"dropped" yaml:"dropped"`
}

DeliveryStats represents basic delivery outcomes for an engine or aggregate. These counters are monotonically increasing from module start. They are intentionally simple (uint64) to keep overhead negligible; consumers wanting rates should compute deltas externally.

type EngineConfig added in v0.1.1

type EngineConfig struct {
	// Name is the unique identifier for this engine instance.
	// Used for routing and engine selection.
	Name string `json:"name" yaml:"name" validate:"required"`

	// Type specifies the engine implementation to use.
	// Supported values: "memory", "redis", "kafka", "kinesis", "custom"
	Type string `json:"type" yaml:"type" validate:"required,oneof=memory redis kafka kinesis custom"`

	// Config contains engine-specific configuration as a map.
	// The structure depends on the engine type.
	Config map[string]interface{} `json:"config,omitempty" yaml:"config,omitempty"`
}

EngineConfig defines the configuration for an individual event bus engine. Each engine can have its own specific configuration requirements.

type EngineFactory added in v0.1.1

type EngineFactory func(config map[string]interface{}) (EventBus, error)

EngineFactory is a function that creates an EventBus implementation. It receives the engine configuration and returns a configured EventBus instance.

type EngineRouter added in v0.1.1

type EngineRouter struct {
	// contains filtered or unexported fields
}

EngineRouter manages multiple event bus engines and routes events based on configuration.

func NewEngineRouter added in v0.1.1

func NewEngineRouter(config *EventBusConfig) (*EngineRouter, error)

NewEngineRouter creates a new engine router with the given configuration.

func (*EngineRouter) CollectPerEngineStats added in v0.1.1

func (r *EngineRouter) CollectPerEngineStats() map[string]DeliveryStats

CollectPerEngineStats returns per-engine delivery statistics for engines that expose them (currently only the in-memory engine). Engines that do not implement statistics are omitted from the returned map. This is useful for fine‑grained monitoring and test verification without exposing internal engine details elsewhere.

func (*EngineRouter) CollectStats added in v0.1.1

func (r *EngineRouter) CollectStats() (delivered uint64, dropped uint64)

CollectStats aggregates delivery statistics from engines that expose them. At present only the in-memory engine exposes Stats(). Engines that don't implement Stats() are simply skipped. This keeps the method safe to call in multi-engine configurations mixing different backend types.

func (*EngineRouter) GetEngineForTopic added in v0.1.1

func (r *EngineRouter) GetEngineForTopic(topic string) string

GetEngineForTopic returns the name of the engine that handles the specified topic. This is useful for debugging and monitoring.

func (*EngineRouter) GetEngineNames added in v0.1.1

func (r *EngineRouter) GetEngineNames() []string

GetEngineNames returns the names of all configured engines.

func (*EngineRouter) Publish added in v0.1.1

func (r *EngineRouter) Publish(ctx context.Context, event Event) error

Publish publishes an event to the appropriate engine based on routing rules.

func (*EngineRouter) SetModuleReference added in v0.1.1

func (r *EngineRouter) SetModuleReference(module *EventBusModule)

SetModuleReference sets the module reference for all memory event buses This enables memory engines to emit events through the module

func (*EngineRouter) Start added in v0.1.1

func (r *EngineRouter) Start(ctx context.Context) error

Start starts all managed engines.

func (*EngineRouter) Stop added in v0.1.1

func (r *EngineRouter) Stop(ctx context.Context) error

Stop stops all managed engines.

func (*EngineRouter) Subscribe added in v0.1.1

func (r *EngineRouter) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

Subscribe subscribes to a topic using the appropriate engine. The subscription is created on the engine that handles the specified topic.

func (*EngineRouter) SubscribeAsync added in v0.1.1

func (r *EngineRouter) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

SubscribeAsync subscribes to a topic asynchronously using the appropriate engine.

func (*EngineRouter) SubscriberCount added in v0.1.1

func (r *EngineRouter) SubscriberCount(topic string) int

SubscriberCount returns the total number of subscribers for a topic across all engines.

func (*EngineRouter) Topics added in v0.1.1

func (r *EngineRouter) Topics() []string

Topics returns all active topics from all engines.

func (*EngineRouter) Unsubscribe added in v0.1.1

func (r *EngineRouter) Unsubscribe(ctx context.Context, subscription Subscription) error

Unsubscribe removes a subscription from its engine.

type Event

type Event struct {
	// Topic is the channel or subject of the event.
	// Topics are used for routing events to the appropriate subscribers.
	// Topic names can use hierarchical patterns like "user.created" or "order.payment.failed".
	Topic string `json:"topic"`

	// Payload is the data associated with the event.
	// This can be any serializable data structure that represents
	// the event's information. The payload type should be consistent
	// for events within the same topic.
	Payload interface{} `json:"payload"`

	// Metadata contains additional information about the event.
	// This can include source information, correlation IDs, version numbers,
	// or any other contextual data that doesn't belong in the main payload.
	// Optional field that can be nil if no metadata is needed.
	Metadata map[string]interface{} `json:"metadata,omitempty"`

	// CreatedAt is when the event was created.
	// This timestamp is set automatically when the event is published
	// and can be used for event ordering, TTL calculations, and debugging.
	CreatedAt time.Time `json:"createdAt"`

	// ProcessingStarted is when the event processing started.
	// This field is set when an event handler begins processing the event.
	// Used for performance monitoring and timeout detection.
	ProcessingStarted *time.Time `json:"processingStarted,omitempty"`

	// ProcessingCompleted is when the event processing completed.
	// This field is set when an event handler finishes processing the event,
	// whether successfully or with an error. Used for performance monitoring
	// and event lifecycle tracking.
	ProcessingCompleted *time.Time `json:"processingCompleted,omitempty"`
}

Event represents a message in the event bus. Events are the core data structure used for communication between publishers and subscribers. They contain the message data along with metadata for tracking and processing.

type EventBus

type EventBus interface {
	// Start initializes the event bus.
	// This method is called during module startup and should prepare
	// the event bus for publishing and subscribing operations.
	// For memory buses, this might initialize internal data structures.
	// For network-based buses, this establishes connections.
	Start(ctx context.Context) error

	// Stop shuts down the event bus.
	// This method is called during module shutdown and should cleanup
	// all resources, close connections, and stop background processes.
	// It should ensure all in-flight events are processed before returning.
	Stop(ctx context.Context) error

	// Publish sends an event to the specified topic.
	// The event will be delivered to all active subscribers of the topic.
	// The method should handle event queuing, topic routing, and delivery
	// according to the engine's semantics.
	Publish(ctx context.Context, event Event) error

	// Subscribe registers a handler for a topic with synchronous processing.
	// Events matching the topic will be delivered immediately to the handler
	// in the same goroutine that published them. The publisher will wait
	// for the handler to complete before continuing.
	Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

	// SubscribeAsync registers a handler for a topic with asynchronous processing.
	// Events matching the topic will be queued for processing by worker goroutines.
	// The publisher can continue immediately without waiting for processing.
	// This is preferred for heavy operations or non-critical event handling.
	SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

	// Unsubscribe removes a subscription.
	// After unsubscribing, the subscription will no longer receive events.
	// This method should be idempotent and not return errors for
	// subscriptions that are already cancelled.
	Unsubscribe(ctx context.Context, subscription Subscription) error

	// Topics returns a list of all active topics.
	// This includes only topics that currently have at least one subscriber.
	// Useful for monitoring, debugging, and administrative interfaces.
	Topics() []string

	// SubscriberCount returns the number of subscribers for a topic.
	// This includes both synchronous and asynchronous subscriptions.
	// Returns 0 if the topic has no subscribers or doesn't exist.
	SubscriberCount(topic string) int
}

EventBus defines the interface for an event bus implementation. This interface abstracts the underlying messaging mechanism, allowing the eventbus module to support multiple backends (memory, Redis, Kafka) through a common API.

All operations are context-aware to support cancellation and timeouts. Implementations should be thread-safe and handle concurrent access properly.

func NewCustomMemoryEventBus added in v0.1.1

func NewCustomMemoryEventBus(config map[string]interface{}) (EventBus, error)

NewCustomMemoryEventBus creates a new custom memory-based event bus

func NewKafkaEventBus added in v0.1.1

func NewKafkaEventBus(config map[string]interface{}) (EventBus, error)

NewKafkaEventBus creates a new Kafka-based event bus

func NewKinesisEventBus added in v0.1.1

func NewKinesisEventBus(config map[string]interface{}) (EventBus, error)

NewKinesisEventBus creates a new Kinesis-based event bus

func NewRedisEventBus added in v0.1.1

func NewRedisEventBus(config map[string]interface{}) (EventBus, error)

NewRedisEventBus creates a new Redis-based event bus

type EventBusConfig

type EventBusConfig struct {

	// Engine specifies the event bus engine to use for single-engine mode.
	// Supported values: "memory", "redis", "kafka", "kinesis"
	// Default: "memory"
	// Note: This field is used only when Engines is empty (legacy mode)
	Engine string `json:"engine,omitempty" yaml:"engine,omitempty" validate:"omitempty,oneof=memory redis kafka kinesis" env:"ENGINE"`

	// MaxEventQueueSize is the maximum number of events to queue per topic.
	// When this limit is reached, new events may be dropped or publishers
	// may be blocked, depending on the engine implementation.
	// Must be at least 1. Used in single-engine mode.
	MaxEventQueueSize int `json:"maxEventQueueSize,omitempty" yaml:"maxEventQueueSize,omitempty" validate:"omitempty,min=1" env:"MAX_EVENT_QUEUE_SIZE"`

	// DefaultEventBufferSize is the default buffer size for subscription channels.
	// This affects how many events can be buffered for each subscription before
	// blocking. Larger buffers can improve performance but use more memory.
	// Must be at least 1. Used in single-engine mode.
	DefaultEventBufferSize int `` /* 138-byte string literal not displayed */

	// WorkerCount is the number of worker goroutines for async event processing.
	// These workers process events from asynchronous subscriptions. More workers
	// can increase throughput but also increase resource usage.
	// Must be at least 1. Used in single-engine mode.
	WorkerCount int `json:"workerCount,omitempty" yaml:"workerCount,omitempty" validate:"omitempty,min=1" env:"WORKER_COUNT"`

	// DeliveryMode controls how publish behaves when a subscriber queue is full.
	//   drop     - (default) non-blocking send; event is dropped if subscriber channel is full
	//   block    - block indefinitely until space is available in the subscriber channel
	//   timeout  - block up to PublishBlockTimeout then drop if still full
	// This applies to the memory engine. Other engines may implement differently.
	DeliveryMode string `json:"deliveryMode,omitempty" yaml:"deliveryMode,omitempty" validate:"omitempty,oneof=drop block timeout" env:"DELIVERY_MODE"`

	// PublishBlockTimeout is used when DeliveryMode == "timeout". Zero means no wait.
	PublishBlockTimeout time.Duration `json:"publishBlockTimeout,omitempty" yaml:"publishBlockTimeout,omitempty" env:"PUBLISH_BLOCK_TIMEOUT"`

	// RotateSubscriberOrder when true rotates the ordering of subscribers per publish
	// to reduce starvation and provide fairer drop distribution. This is now OPT-IN.
	// Historical note: an earlier revision forced this to true during validation which
	// made it impossible for users to explicitly disable the feature (a plain bool
	// cannot distinguish an "unset" zero value from an explicitly configured false).
	// We intentionally removed the auto-enable logic so that leaving the field absent
	// (or false) will NOT enable rotation. Users that want fairness rotation must set
	// rotateSubscriberOrder: true explicitly in configuration. This trades a changed
	// default for honoring explicit operator intent.
	RotateSubscriberOrder bool `json:"rotateSubscriberOrder,omitempty" yaml:"rotateSubscriberOrder,omitempty" env:"ROTATE_SUBSCRIBER_ORDER"`

	// EventTTL is the time to live for events.
	// Events older than this value may be automatically removed from queues
	// or marked as expired. Used for event cleanup and storage management.
	EventTTL time.Duration `json:"eventTTL,omitempty" yaml:"eventTTL,omitempty" env:"EVENT_TTL" default:"3600s"`

	// RetentionDays is how many days to retain event history.
	// This affects event storage and cleanup policies. Longer retention
	// allows for event replay and debugging but requires more storage.
	// Must be at least 1. Used in single-engine mode.
	RetentionDays int `json:"retentionDays,omitempty" yaml:"retentionDays,omitempty" validate:"omitempty,min=1" env:"RETENTION_DAYS"`

	// ExternalBrokerURL is the connection URL for external message brokers.
	// Used when the engine is set to "redis", "kafka", or "kinesis". The format depends
	// on the specific broker type.
	// Examples:
	//   Redis: "redis://localhost:6379" or "redis://user:pass@host:port/db"
	//   Kafka: "kafka://localhost:9092" or "kafka://broker1:9092,broker2:9092"
	//   Kinesis: "https://kinesis.us-east-1.amazonaws.com"
	ExternalBrokerURL string `json:"externalBrokerURL,omitempty" yaml:"externalBrokerURL,omitempty" env:"EXTERNAL_BROKER_URL"`

	// ExternalBrokerUser is the username for external broker authentication.
	// Used when the external broker requires authentication.
	// Leave empty if the broker doesn't require authentication.
	ExternalBrokerUser string `json:"externalBrokerUser,omitempty" yaml:"externalBrokerUser,omitempty" env:"EXTERNAL_BROKER_USER"`

	// ExternalBrokerPassword is the password for external broker authentication.
	// Used when the external broker requires authentication.
	// Leave empty if the broker doesn't require authentication.
	// This should be kept secure and may be provided via environment variables.
	ExternalBrokerPassword string `json:"externalBrokerPassword,omitempty" yaml:"externalBrokerPassword,omitempty" env:"EXTERNAL_BROKER_PASSWORD"`

	// Engines defines multiple event bus engines that can be used simultaneously.
	// When this field is populated, it takes precedence over the single-engine fields above.
	Engines []EngineConfig `json:"engines,omitempty" yaml:"engines,omitempty" validate:"dive"`

	// Routing defines how topics are routed to different engines.
	// Rules are evaluated in order, and the first matching rule is used.
	// If no routing rules are specified and multiple engines are configured,
	// all topics will be routed to the first engine.
	Routing []RoutingRule `json:"routing,omitempty" yaml:"routing,omitempty" validate:"dive"`
}

EventBusConfig defines the configuration for the event bus module. This structure supports both single-engine (legacy) and multi-engine configurations.

Example single-engine YAML configuration (legacy, still supported):

engine: "memory"
maxEventQueueSize: 1000
workerCount: 5

Example multi-engine YAML configuration:

engines:
  - name: "memory"
    type: "memory"
    config:
      workerCount: 5
      maxEventQueueSize: 1000
  - name: "redis"
    type: "redis"
    config:
      url: "redis://localhost:6379"
      db: 0
routing:
  - topics: ["user.*", "auth.*"]
    engine: "memory"
  - topics: ["*"]
    engine: "redis"

func (*EventBusConfig) GetDefaultEngine added in v0.1.1

func (c *EventBusConfig) GetDefaultEngine() string

GetDefaultEngine returns the name of the default engine to use. For single-engine mode, returns "default". For multi-engine mode, returns the name of the first engine.

func (*EventBusConfig) IsMultiEngine added in v0.1.1

func (c *EventBusConfig) IsMultiEngine() bool

IsMultiEngine returns true if this configuration uses multiple engines.

func (*EventBusConfig) ValidateConfig added in v0.1.1

func (c *EventBusConfig) ValidateConfig() error

ValidateConfig performs additional validation on the configuration. This is called after basic struct tag validation.

type EventBusModule

type EventBusModule struct {
	// contains filtered or unexported fields
}

EventBusModule provides event-driven messaging capabilities for the modular framework. It implements a publish-subscribe pattern with support for multiple event bus engines, asynchronous processing, and flexible subscription management.

The module implements the following interfaces:

  • modular.Module: Basic module lifecycle
  • modular.Configurable: Configuration management
  • modular.ServiceAware: Service dependency management
  • modular.Startable: Startup logic
  • modular.Stoppable: Shutdown logic
  • modular.ObservableModule: Event observation and emission
  • EventBus: Event publishing and subscription interface

Event processing is thread-safe and supports concurrent publishers and subscribers.

func (*EventBusModule) Constructor

func (m *EventBusModule) Constructor() modular.ModuleConstructor

Constructor provides a dependency injection constructor for the module. This method is used by the dependency injection system to create the module instance with any required services.

func (*EventBusModule) Dependencies

func (m *EventBusModule) Dependencies() []string

Dependencies returns the names of modules this module depends on. The eventbus module operates independently and has no dependencies.

func (*EventBusModule) EmitEvent added in v0.1.1

func (m *EventBusModule) EmitEvent(ctx context.Context, event cloudevents.Event) error

EmitEvent implements the ObservableModule interface. This allows the eventbus module to emit events to registered observers.

func (*EventBusModule) GetRegisteredEventTypes added in v0.1.1

func (m *EventBusModule) GetRegisteredEventTypes() []string

GetRegisteredEventTypes implements the ObservableModule interface. Returns all event types that this eventbus module can emit.

func (*EventBusModule) GetRouter added in v0.1.1

func (m *EventBusModule) GetRouter() *EngineRouter

GetRouter returns the underlying engine router for advanced operations. This method provides access to engine-specific functionality like checking which engine a topic routes to.

Example:

router := eventBus.GetRouter()
engine := router.GetEngineForTopic("user.created")
fmt.Printf("Topic routes to engine: %s", engine)

func (*EventBusModule) Init

func (m *EventBusModule) Init(app modular.Application) error

Init initializes the eventbus module with the application context. This method is called after all modules have been registered and their configurations loaded. It sets up the event bus engine(s) based on configuration.

The initialization process:

  1. Retrieves the module's configuration
  2. Sets up logging
  3. Validates configuration
  4. Initializes the engine router with configured engines
  5. Prepares the event bus for startup

Supported engines:

  • "memory": In-process event bus using Go channels
  • "redis": Distributed event bus using Redis pub/sub
  • "kafka": Enterprise event bus using Apache Kafka
  • "kinesis": AWS Kinesis streams
  • "custom": Custom engine implementations

func (*EventBusModule) Name

func (m *EventBusModule) Name() string

Name returns the unique identifier for this module. This name is used for service registration, dependency resolution, and configuration section identification.

func (*EventBusModule) PerEngineStats added in v0.1.1

func (m *EventBusModule) PerEngineStats() map[string]DeliveryStats

PerEngineStats returns delivery statistics broken down per configured engine (only engines that expose stats are included). Safe to call before Start; returns an empty map if router not yet built.

func (*EventBusModule) ProvidesServices

func (m *EventBusModule) ProvidesServices() []modular.ServiceProvider

ProvidesServices declares services provided by this module. The eventbus module provides an event bus service that can be injected into other modules for event-driven communication.

Provided services:

  • "eventbus.provider": The main event bus service interface

func (*EventBusModule) Publish

func (m *EventBusModule) Publish(ctx context.Context, topic string, payload interface{}) error

Publish publishes an event to the event bus. Creates an Event struct with the provided topic and payload, then sends it through the event bus for processing by subscribers.

The event will be delivered to all active subscribers of the topic. Topic patterns and wildcards may be supported depending on the engine. With multiple engines, the event is routed to the appropriate engine based on the configured routing rules.

Example:

err := eventBus.Publish(ctx, "user.created", userData)
err := eventBus.Publish(ctx, "order.payment.failed", paymentData)

func (*EventBusModule) RegisterConfig

func (m *EventBusModule) RegisterConfig(app modular.Application) error

RegisterConfig registers the module's configuration structure. This method is called during application initialization to register the default configuration values for the eventbus module.

Default configuration:

  • Engine: "memory"
  • MaxEventQueueSize: 1000 events per topic
  • DefaultEventBufferSize: 10 events per subscription channel
  • WorkerCount: 5 async processing workers
  • EventTTL: 3600 seconds (1 hour)
  • RetentionDays: 7 days for event history
  • ExternalBroker settings: empty (not used for memory engine)

func (*EventBusModule) RegisterObservers added in v0.1.1

func (m *EventBusModule) RegisterObservers(subject modular.Subject) error

RegisterObservers implements the ObservableModule interface. This allows the eventbus module to register as an observer for events it's interested in.

func (*EventBusModule) RequiresServices

func (m *EventBusModule) RequiresServices() []modular.ServiceDependency

RequiresServices declares services required by this module. The eventbus module operates independently and requires no external services.

func (*EventBusModule) Start

func (m *EventBusModule) Start(ctx context.Context) error

Start performs startup logic for the module. This method starts all configured event bus engines and begins processing events. It's called after all modules have been initialized and are ready to start.

The startup process:

  1. Checks if already started (idempotent)
  2. Starts all underlying event bus engines
  3. Initializes worker pools for async processing
  4. Prepares topic management and subscription tracking

This method is thread-safe and can be called multiple times safely.

func (*EventBusModule) Stats added in v0.1.1

func (m *EventBusModule) Stats() (delivered uint64, dropped uint64)

Stats returns aggregated delivery statistics for all underlying engines that support them (currently only the in-memory engine). This is intended for lightweight monitoring/metrics and testing. Returns zeros if the module has not been started yet or no engines expose stats.

func (*EventBusModule) Stop

func (m *EventBusModule) Stop(ctx context.Context) error

Stop performs shutdown logic for the module. This method gracefully shuts down all event bus engines, ensuring all in-flight events are processed and all subscriptions are properly cleaned up.

The shutdown process:

  1. Checks if already stopped (idempotent)
  2. Stops accepting new events
  3. Waits for in-flight events to complete
  4. Cancels all active subscriptions
  5. Shuts down worker pools
  6. Closes all underlying event bus engines

This method is thread-safe and can be called multiple times safely.

func (*EventBusModule) Subscribe

func (m *EventBusModule) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

Subscribe subscribes to a topic on the event bus with synchronous processing. The provided handler will be called immediately when an event is published to the specified topic. The handler blocks the event delivery until it completes.

With multiple engines, the subscription is created on the engine that handles the specified topic according to the routing configuration.

Use synchronous subscriptions for:

  • Lightweight event processing
  • When event ordering is important
  • Critical event handlers that must complete before continuing

Example:

subscription, err := eventBus.Subscribe(ctx, "user.login", func(ctx context.Context, event Event) error {
    user := event.Payload.(UserData)
    return updateLastLoginTime(user.ID)
})

func (*EventBusModule) SubscribeAsync

func (m *EventBusModule) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

SubscribeAsync subscribes to a topic with asynchronous event processing. The provided handler will be queued for processing by worker goroutines, allowing the event publisher to continue without waiting for processing.

With multiple engines, the subscription is created on the engine that handles the specified topic according to the routing configuration.

Use asynchronous subscriptions for:

  • Heavy processing operations
  • External API calls
  • Non-critical event handlers
  • When you want to avoid blocking publishers

Example:

subscription, err := eventBus.SubscribeAsync(ctx, "image.uploaded", func(ctx context.Context, event Event) error {
    imageData := event.Payload.(ImageData)
    return generateThumbnails(imageData)
})

func (*EventBusModule) SubscriberCount

func (m *EventBusModule) SubscriberCount(topic string) int

SubscriberCount returns the number of active subscribers for a topic. This includes both synchronous and asynchronous subscriptions. Returns 0 if the topic has no subscribers.

Example:

count := eventBus.SubscriberCount("user.created")
if count == 0 {
    log.Warn("No subscribers for user creation events")
}

func (*EventBusModule) Topics

func (m *EventBusModule) Topics() []string

Topics returns a list of all active topics that have subscribers. This can be useful for debugging, monitoring, or building administrative interfaces that show current event bus activity.

Example:

activeTopics := eventBus.Topics()
for _, topic := range activeTopics {
    count := eventBus.SubscriberCount(topic)
    fmt.Printf("Topic: %s, Subscribers: %d\n", topic, count)
}

func (*EventBusModule) Unsubscribe

func (m *EventBusModule) Unsubscribe(ctx context.Context, subscription Subscription) error

Unsubscribe cancels a subscription and stops receiving events. The subscription will be removed from the event bus and no longer receive events for its topic.

This method is idempotent - calling it multiple times on the same subscription is safe and will not cause errors.

Example:

err := eventBus.Unsubscribe(ctx, subscription)

type EventFilter added in v0.1.1

type EventFilter interface {
	ShouldProcess(event Event) bool
	Name() string
}

EventFilter defines a filter that can be applied to events

type EventHandler

type EventHandler func(ctx context.Context, event Event) error

EventHandler is a function that handles an event. Event handlers are called when an event matching their subscription topic is published. Handlers should be idempotent when possible and handle errors gracefully.

The context can be used for cancellation, timeouts, and passing request-scoped values. Handlers should respect context cancellation and return promptly when the context is cancelled.

Example handler:

func userCreatedHandler(ctx context.Context, event Event) error {
    user := event.Payload.(UserData)
    return sendWelcomeEmail(ctx, user.Email)
}

type EventMetrics added in v0.1.1

type EventMetrics struct {
	TotalEvents           int64            `json:"totalEvents"`
	EventsPerTopic        map[string]int64 `json:"eventsPerTopic"`
	AverageProcessingTime time.Duration    `json:"averageProcessingTime"`
	LastResetTime         time.Time        `json:"lastResetTime"`
	// contains filtered or unexported fields
}

EventMetrics holds metrics about event processing

type KafkaConfig added in v0.1.1

type KafkaConfig struct {
	Brokers        []string          `json:"brokers"`
	GroupID        string            `json:"groupId"`
	SecurityConfig map[string]string `json:"security"`
	ProducerConfig map[string]string `json:"producer"`
	ConsumerConfig map[string]string `json:"consumer"`
}

KafkaConfig holds Kafka-specific configuration

type KafkaConsumerGroupHandler added in v0.1.1

type KafkaConsumerGroupHandler struct {
	// contains filtered or unexported fields
}

KafkaConsumerGroupHandler implements sarama.ConsumerGroupHandler

func (*KafkaConsumerGroupHandler) Cleanup added in v0.1.1

Cleanup is called at the end of a session, once all ConsumeClaim goroutines have exited

func (*KafkaConsumerGroupHandler) ConsumeClaim added in v0.1.1

ConsumeClaim processes messages from a Kafka partition

func (*KafkaConsumerGroupHandler) Setup added in v0.1.1

Setup is called at the beginning of a new session, before ConsumeClaim

type KafkaEventBus added in v0.1.1

type KafkaEventBus struct {
	// contains filtered or unexported fields
}

KafkaEventBus implements EventBus using Apache Kafka

func (*KafkaEventBus) Publish added in v0.1.1

func (k *KafkaEventBus) Publish(ctx context.Context, event Event) error

Publish sends an event to the specified topic using Kafka

func (*KafkaEventBus) Start added in v0.1.1

func (k *KafkaEventBus) Start(ctx context.Context) error

Start initializes the Kafka event bus

func (*KafkaEventBus) Stop added in v0.1.1

func (k *KafkaEventBus) Stop(ctx context.Context) error

Stop shuts down the Kafka event bus

func (*KafkaEventBus) Subscribe added in v0.1.1

func (k *KafkaEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

Subscribe registers a handler for a topic

func (*KafkaEventBus) SubscribeAsync added in v0.1.1

func (k *KafkaEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

SubscribeAsync registers a handler for a topic with asynchronous processing

func (*KafkaEventBus) SubscriberCount added in v0.1.1

func (k *KafkaEventBus) SubscriberCount(topic string) int

SubscriberCount returns the number of subscribers for a topic

func (*KafkaEventBus) Topics added in v0.1.1

func (k *KafkaEventBus) Topics() []string

Topics returns a list of all active topics

func (*KafkaEventBus) Unsubscribe added in v0.1.1

func (k *KafkaEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error

Unsubscribe removes a subscription

type KinesisConfig added in v0.1.1

type KinesisConfig struct {
	Region          string `json:"region"`
	StreamName      string `json:"streamName"`
	AccessKeyID     string `json:"accessKeyId"`
	SecretAccessKey string `json:"secretAccessKey"`
	SessionToken    string `json:"sessionToken"`
	ShardCount      int32  `json:"shardCount"`
}

KinesisConfig holds Kinesis-specific configuration

type KinesisEventBus added in v0.1.1

type KinesisEventBus struct {
	// contains filtered or unexported fields
}

KinesisEventBus implements EventBus using AWS Kinesis

func (*KinesisEventBus) Publish added in v0.1.1

func (k *KinesisEventBus) Publish(ctx context.Context, event Event) error

Publish sends an event to the specified topic using Kinesis

func (*KinesisEventBus) Start added in v0.1.1

func (k *KinesisEventBus) Start(ctx context.Context) error

Start initializes the Kinesis event bus

func (*KinesisEventBus) Stop added in v0.1.1

func (k *KinesisEventBus) Stop(ctx context.Context) error

Stop shuts down the Kinesis event bus

func (*KinesisEventBus) Subscribe added in v0.1.1

func (k *KinesisEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

Subscribe registers a handler for a topic

func (*KinesisEventBus) SubscribeAsync added in v0.1.1

func (k *KinesisEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

SubscribeAsync registers a handler for a topic with asynchronous processing

func (*KinesisEventBus) SubscriberCount added in v0.1.1

func (k *KinesisEventBus) SubscriberCount(topic string) int

SubscriberCount returns the number of subscribers for a topic

func (*KinesisEventBus) Topics added in v0.1.1

func (k *KinesisEventBus) Topics() []string

Topics returns a list of all active topics

func (*KinesisEventBus) Unsubscribe added in v0.1.1

func (k *KinesisEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error

Unsubscribe removes a subscription

type MemoryEventBus

type MemoryEventBus struct {
	// contains filtered or unexported fields
}

MemoryEventBus implements EventBus using in-memory channels

func NewMemoryEventBus

func NewMemoryEventBus(config *EventBusConfig) *MemoryEventBus

NewMemoryEventBus creates a new in-memory event bus

func (*MemoryEventBus) Publish

func (m *MemoryEventBus) Publish(ctx context.Context, event Event) error

Publish sends an event to the specified topic

func (*MemoryEventBus) SetModule added in v0.1.1

func (m *MemoryEventBus) SetModule(module *EventBusModule)

SetModule sets the parent module for event emission

func (*MemoryEventBus) Start

func (m *MemoryEventBus) Start(ctx context.Context) error

Start initializes the event bus

func (*MemoryEventBus) Stats added in v0.1.1

func (m *MemoryEventBus) Stats() (delivered uint64, dropped uint64)

Stats returns basic delivery stats for monitoring/testing.

func (*MemoryEventBus) Stop

func (m *MemoryEventBus) Stop(ctx context.Context) error

Stop shuts down the event bus

func (*MemoryEventBus) Subscribe

func (m *MemoryEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

Subscribe registers a handler for a topic

func (*MemoryEventBus) SubscribeAsync

func (m *MemoryEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

SubscribeAsync registers a handler for a topic with asynchronous processing

func (*MemoryEventBus) SubscriberCount

func (m *MemoryEventBus) SubscriberCount(topic string) int

SubscriberCount returns the number of subscribers for a topic

func (*MemoryEventBus) Topics

func (m *MemoryEventBus) Topics() []string

Topics returns a list of all active topics

func (*MemoryEventBus) Unsubscribe

func (m *MemoryEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error

Unsubscribe removes a subscription

type PrometheusCollector added in v0.1.1

type PrometheusCollector struct {
	// contains filtered or unexported fields
}

func NewPrometheusCollector added in v0.1.1

func NewPrometheusCollector(eventBus *EventBusModule, namespace string) *PrometheusCollector

NewPrometheusCollector creates a new collector for the given event bus. namespace is used as metric prefix (default if empty: modular_eventbus).

func (*PrometheusCollector) Collect added in v0.1.1

func (c *PrometheusCollector) Collect(ch chan<- prometheus.Metric)

Collect gathers current stats and emits ConstMetrics.

func (*PrometheusCollector) Describe added in v0.1.1

func (c *PrometheusCollector) Describe(ch chan<- *prometheus.Desc)

Describe sends metric descriptors.

type RedisConfig added in v0.1.1

type RedisConfig struct {
	URL      string `json:"url"`
	DB       int    `json:"db"`
	Username string `json:"username"`
	Password string `json:"password"`
	PoolSize int    `json:"poolSize"`
}

RedisConfig holds Redis-specific configuration

type RedisEventBus added in v0.1.1

type RedisEventBus struct {
	// contains filtered or unexported fields
}

RedisEventBus implements EventBus using Redis pub/sub

func (*RedisEventBus) Publish added in v0.1.1

func (r *RedisEventBus) Publish(ctx context.Context, event Event) error

Publish sends an event to the specified topic using Redis pub/sub

func (*RedisEventBus) Start added in v0.1.1

func (r *RedisEventBus) Start(ctx context.Context) error

Start initializes the Redis event bus

func (*RedisEventBus) Stop added in v0.1.1

func (r *RedisEventBus) Stop(ctx context.Context) error

Stop shuts down the Redis event bus

func (*RedisEventBus) Subscribe added in v0.1.1

func (r *RedisEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

Subscribe registers a handler for a topic

func (*RedisEventBus) SubscribeAsync added in v0.1.1

func (r *RedisEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

SubscribeAsync registers a handler for a topic with asynchronous processing

func (*RedisEventBus) SubscriberCount added in v0.1.1

func (r *RedisEventBus) SubscriberCount(topic string) int

SubscriberCount returns the number of subscribers for a topic

func (*RedisEventBus) Topics added in v0.1.1

func (r *RedisEventBus) Topics() []string

Topics returns a list of all active topics

func (*RedisEventBus) Unsubscribe added in v0.1.1

func (r *RedisEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error

Unsubscribe removes a subscription

type RoutingRule added in v0.1.1

type RoutingRule struct {
	// Topics is a list of topic patterns to match.
	// Supports wildcards like "user.*" or exact matches.
	Topics []string `json:"topics" yaml:"topics" validate:"required,min=1"`

	// Engine is the name of the engine to route matching topics to.
	// Must match the name of a configured engine.
	Engine string `json:"engine" yaml:"engine" validate:"required"`
}

RoutingRule defines how topics are routed to engines.

type Subscription

type Subscription interface {
	// Topic returns the topic being subscribed to.
	// This may include wildcard patterns depending on the engine implementation.
	Topic() string

	// ID returns the unique identifier for this subscription.
	// Each subscription gets a unique ID that can be used for tracking,
	// logging, and debugging purposes.
	ID() string

	// IsAsync returns true if this is an asynchronous subscription.
	// Asynchronous subscriptions process events in background workers,
	// while synchronous subscriptions process events immediately.
	IsAsync() bool

	// Cancel cancels the subscription.
	// After calling Cancel, the subscription will no longer receive events.
	// This is equivalent to calling Unsubscribe on the event bus.
	// The method is idempotent and safe to call multiple times.
	Cancel() error
}

Subscription represents a subscription to a topic. Subscriptions are created when a handler is registered for a topic and provide methods for managing the subscription lifecycle.

type TopicPrefixFilter added in v0.1.1

type TopicPrefixFilter struct {
	AllowedPrefixes []string
	// contains filtered or unexported fields
}

TopicPrefixFilter filters events based on topic prefix

func (*TopicPrefixFilter) Name added in v0.1.1

func (f *TopicPrefixFilter) Name() string

func (*TopicPrefixFilter) ShouldProcess added in v0.1.1

func (f *TopicPrefixFilter) ShouldProcess(event Event) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL