events

package
v0.8.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEventNotFound is returned when an event is not found.
	ErrEventNotFound = errors.New("event not found")

	// ErrSnapshotNotFound is returned when a snapshot is not found.
	ErrSnapshotNotFound = errors.New("snapshot not found")

	// ErrInvalidEvent is returned when an event is invalid.
	ErrInvalidEvent = errors.New("invalid event")

	// ErrInvalidSnapshot is returned when a snapshot is invalid.
	ErrInvalidSnapshot = errors.New("invalid snapshot")

	// ErrBrokerNotFound is returned when a broker is not found.
	ErrBrokerNotFound = errors.New("broker not found")

	// ErrBrokerAlreadyRegistered is returned when a broker is already registered.
	ErrBrokerAlreadyRegistered = errors.New("broker already registered")

	// ErrHandlerNotFound is returned when a handler is not found.
	ErrHandlerNotFound = errors.New("handler not found")

	// ErrEventBusNotStarted is returned when the event bus is not started.
	ErrEventBusNotStarted = errors.New("event bus not started")

	// ErrEventStoreNotStarted is returned when the event store is not started.
	ErrEventStoreNotStarted = errors.New("event store not started")
)

Functions

func NewEventBus

func NewEventBus(config EventBusOptions) (core.EventBus, error)

NewEventBus creates a new event bus.

func NewExtension

func NewExtension() forge.Extension

NewExtension creates a new events extension.

func NewExtensionWithConfig

func NewExtensionWithConfig(config Config) forge.Extension

NewExtensionWithConfig creates a new events extension with custom config.

func WrapError

func WrapError(err error, message string) error

WrapError wraps an error with additional context.

Types

type BrokerConfig

type BrokerConfig struct {
	Name     string         `json:"name"     yaml:"name"`
	Type     string         `json:"type"     yaml:"type"` // memory, nats, redis
	Enabled  bool           `json:"enabled"  yaml:"enabled"`
	Priority int            `json:"priority" yaml:"priority"`
	Config   map[string]any `json:"config"   yaml:"config"`
}

BrokerConfig defines configuration for message brokers.

type BusConfig

type BusConfig struct {
	DefaultBroker     string        `json:"default_broker"     yaml:"default_broker"`
	MaxRetries        int           `json:"max_retries"        yaml:"max_retries"`
	RetryDelay        time.Duration `json:"retry_delay"        yaml:"retry_delay"`
	EnableMetrics     bool          `json:"enable_metrics"     yaml:"enable_metrics"`
	EnableTracing     bool          `json:"enable_tracing"     yaml:"enable_tracing"`
	BufferSize        int           `json:"buffer_size"        yaml:"buffer_size"`
	WorkerCount       int           `json:"worker_count"       yaml:"worker_count"`
	ProcessingTimeout time.Duration `json:"processing_timeout" yaml:"processing_timeout"`
}

BusConfig defines configuration for the event bus.

type Config

type Config struct {
	// Event Bus configuration
	Bus BusConfig `json:"bus" yaml:"bus"`

	// Event Store configuration
	Store StoreConfig `json:"store" yaml:"store"`

	// Message Brokers
	Brokers []BrokerConfig `json:"brokers" yaml:"brokers"`

	// Metrics
	Metrics MetricsConfig `json:"metrics" yaml:"metrics"`
}

Config defines the configuration for the events extension.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration.

type DomainEventHandler

type DomainEventHandler = core.DomainEventHandler

DomainEventHandler is a specialized handler for domain events.

func NewDomainEventHandler

func NewDomainEventHandler(name, aggregateType string, eventTypes []string, handler EventHandlerFunc) *DomainEventHandler

NewDomainEventHandler creates a new domain event handler.

type EventBusConfig

type EventBusConfig struct {
	DefaultBroker     string        `json:"default_broker"     yaml:"default_broker"`
	MaxRetries        int           `json:"max_retries"        yaml:"max_retries"`
	RetryDelay        time.Duration `json:"retry_delay"        yaml:"retry_delay"`
	EnableMetrics     bool          `json:"enable_metrics"     yaml:"enable_metrics"`
	EnableTracing     bool          `json:"enable_tracing"     yaml:"enable_tracing"`
	BufferSize        int           `json:"buffer_size"        yaml:"buffer_size"`
	WorkerCount       int           `json:"worker_count"       yaml:"worker_count"`
	ProcessingTimeout time.Duration `json:"processing_timeout" yaml:"processing_timeout"`
}

EventBusConfig defines configuration for the event bus.

type EventBusImpl

type EventBusImpl struct {
	// contains filtered or unexported fields
}

EventBusImpl implements EventBus.

func (*EventBusImpl) Dependencies

func (eb *EventBusImpl) Dependencies() []string

Dependencies implements core.Service.

func (*EventBusImpl) GetBroker

func (eb *EventBusImpl) GetBroker(name string) (core.MessageBroker, error)

GetBroker implements EventBus.

func (*EventBusImpl) GetBrokers

func (eb *EventBusImpl) GetBrokers() map[string]core.MessageBroker

GetBrokers implements EventBus.

func (*EventBusImpl) GetStats

func (eb *EventBusImpl) GetStats() map[string]any

GetStats implements EventBus.

func (*EventBusImpl) HealthCheck

func (eb *EventBusImpl) HealthCheck(ctx context.Context) error

HealthCheck implements core.EventBus.

func (*EventBusImpl) Name

func (eb *EventBusImpl) Name() string

Name implements core.Service.

func (*EventBusImpl) Publish

func (eb *EventBusImpl) Publish(ctx context.Context, event *core.Event) error

Publish implements EventBus.

func (*EventBusImpl) PublishTo

func (eb *EventBusImpl) PublishTo(ctx context.Context, brokerName string, event *core.Event) error

PublishTo implements EventBus.

func (*EventBusImpl) RegisterBroker

func (eb *EventBusImpl) RegisterBroker(name string, broker core.MessageBroker) error

RegisterBroker implements EventBus.

func (*EventBusImpl) SetDefaultBroker

func (eb *EventBusImpl) SetDefaultBroker(name string) error

SetDefaultBroker implements EventBus.

func (*EventBusImpl) Start

func (eb *EventBusImpl) Start(ctx context.Context) error

OnStart implements core.Service. This method is idempotent - calling it multiple times is safe.

func (*EventBusImpl) Stop

func (eb *EventBusImpl) Stop(ctx context.Context) error

OnStop implements core.Service.

func (*EventBusImpl) Subscribe

func (eb *EventBusImpl) Subscribe(eventType string, handler core.EventHandler) error

Subscribe implements EventBus.

func (*EventBusImpl) UnregisterBroker

func (eb *EventBusImpl) UnregisterBroker(name string) error

UnregisterBroker implements EventBus.

func (*EventBusImpl) Unsubscribe

func (eb *EventBusImpl) Unsubscribe(eventType string, handlerName string) error

Unsubscribe implements EventBus.

type EventBusOptions

type EventBusOptions struct {
	Store           core.EventStore
	HandlerRegistry *core.HandlerRegistry
	Logger          forge.Logger
	Metrics         forge.Metrics
	Config          EventBusConfig
}

EventBusOptions defines configuration for EventBusImpl.

type EventCriteria

type EventCriteria = core.EventCriteria

EventCriteria defines criteria for querying events.

func NewEventCriteria

func NewEventCriteria() *EventCriteria

NewEventCriteria creates a new event criteria.

type EventHandler

type EventHandler = core.EventHandler

EventHandler defines the interface for handling events.

type EventHandlerFunc

type EventHandlerFunc = core.EventHandlerFunc

EventHandlerFunc is a function adapter for EventHandler.

type EventProjection

type EventProjection = core.EventProjection

EventProjection represents a projection of events.

type EventService

type EventService struct {
	// contains filtered or unexported fields
}

EventService provides event-driven architecture capabilities.

func NewEventService

func NewEventService(config Config, logger forge.Logger, metrics forge.Metrics) *EventService

NewEventService creates a new event service.

func (*EventService) GetEventBus

func (es *EventService) GetEventBus() core.EventBus

GetEventBus returns the event bus.

func (*EventService) GetEventStore

func (es *EventService) GetEventStore() core.EventStore

GetEventStore returns the event store.

func (*EventService) GetHandlerRegistry

func (es *EventService) GetHandlerRegistry() *core.HandlerRegistry

GetHandlerRegistry returns the handler registry.

func (*EventService) HealthCheck

func (es *EventService) HealthCheck(ctx context.Context) error

HealthCheck checks the health of the event service.

func (*EventService) Start

func (es *EventService) Start(ctx context.Context) error

Start starts the event service. This method is idempotent - calling it multiple times is safe.

func (*EventService) Stop

func (es *EventService) Stop(ctx context.Context) error

Stop stops the event service.

type EventStore

type EventStore = core.EventStore

EventStore defines the interface for persisting and retrieving events.

type EventStoreConfig

type EventStoreConfig = core.EventStoreConfig

EventStoreConfig defines configuration for event stores.

func DefaultEventStoreConfig

func DefaultEventStoreConfig() *EventStoreConfig

DefaultEventStoreConfig returns default configuration.

type EventStoreMetrics

type EventStoreMetrics = core.EventStoreMetrics

EventStoreMetrics defines metrics for event stores.

type EventStoreMigration

type EventStoreMigration = core.EventStoreMigration

EventStoreMigration represents a migration for the event store.

type EventStoreMigrator

type EventStoreMigrator = core.EventStoreMigrator

EventStoreMigrator manages event store migrations.

type EventStoreStats

type EventStoreStats = core.EventStoreStats

EventStoreStats represents statistics for an event store.

type EventStoreTransaction

type EventStoreTransaction = core.EventStoreTransaction

EventStoreTransaction represents a transaction in the event store.

type EventStream

type EventStream = core.EventStream

EventStream represents a stream of events.

type EventStreamConfig

type EventStreamConfig = core.EventStreamConfig

EventStreamConfig defines configuration for event streams.

func DefaultEventStreamConfig

func DefaultEventStreamConfig() *EventStreamConfig

DefaultEventStreamConfig returns default stream configuration.

type EventStreamHandler

type EventStreamHandler = core.EventStreamHandler

EventStreamHandler handles events from an event stream.

type EventWorker

type EventWorker struct {
	// contains filtered or unexported fields
}

EventWorker processes events from the queue.

func NewEventWorker

func NewEventWorker(id int, eventQueue <-chan *core.EventEnvelope, processor func(context.Context, *core.EventEnvelope) error, logger forge.Logger, metrics forge.Metrics) *EventWorker

NewEventWorker creates a new event worker.

func (*EventWorker) GetStats

func (ew *EventWorker) GetStats() map[string]any

GetStats returns worker statistics.

func (*EventWorker) Start

func (ew *EventWorker) Start(ctx context.Context)

Start starts the worker.

type Extension

type Extension struct {
	// contains filtered or unexported fields
}

Extension implements forge.Extension for events.

func (*Extension) Dependencies

func (e *Extension) Dependencies() []string

Dependencies returns the extension dependencies.

func (*Extension) Description

func (e *Extension) Description() string

Description returns the extension description.

func (*Extension) Health

func (e *Extension) Health(ctx context.Context) error

Health checks the health of the extension.

func (*Extension) Name

func (e *Extension) Name() string

Name returns the extension name.

func (*Extension) Register

func (e *Extension) Register(app forge.App) error

Register registers the extension with the app.

func (*Extension) Start

func (e *Extension) Start(ctx context.Context) error

Start starts the extension.

func (*Extension) Stop

func (e *Extension) Stop(ctx context.Context) error

Stop stops the extension.

func (*Extension) Version

func (e *Extension) Version() string

Version returns the extension version.

type HandlerMiddleware

type HandlerMiddleware = core.HandlerMiddleware

HandlerMiddleware defines middleware for event handlers.

func LoggingMiddleware

func LoggingMiddleware(l forge.Logger) HandlerMiddleware

LoggingMiddleware creates logging middleware for handlers.

func MetricsMiddleware

func MetricsMiddleware(metrics forge.Metrics) HandlerMiddleware

MetricsMiddleware creates metrics middleware for handlers.

func ValidationMiddleware

func ValidationMiddleware() HandlerMiddleware

ValidationMiddleware creates validation middleware for handlers.

type HandlerRegistry

type HandlerRegistry = core.HandlerRegistry

HandlerRegistry manages event handlers.

func NewHandlerRegistry

func NewHandlerRegistry(logger forge.Logger, metrics forge.Metrics) *HandlerRegistry

NewHandlerRegistry creates a new handler registry.

type MetricsConfig

type MetricsConfig struct {
	Enabled          bool          `json:"enabled"            yaml:"enabled"`
	PublishInterval  time.Duration `json:"publish_interval"   yaml:"publish_interval"`
	EnablePerType    bool          `json:"enable_per_type"    yaml:"enable_per_type"`
	EnablePerHandler bool          `json:"enable_per_handler" yaml:"enable_per_handler"`
}

MetricsConfig defines configuration for event metrics.

type ProjectionManager

type ProjectionManager = core.ProjectionManager

ProjectionManager manages event projections.

type ReflectionEventHandler

type ReflectionEventHandler = core.ReflectionEventHandler

ReflectionEventHandler uses reflection to call methods based on event type.

func NewReflectionEventHandler

func NewReflectionEventHandler(name string, target any) *ReflectionEventHandler

NewReflectionEventHandler creates a new reflection-based event handler.

type RetryPolicy

type RetryPolicy = core.RetryPolicy

RetryPolicy defines retry behavior for event handlers.

func NewRetryPolicy

func NewRetryPolicy(maxRetries int, initialDelay time.Duration) *RetryPolicy

NewRetryPolicy creates a new retry policy.

type Snapshot

type Snapshot = core.Snapshot

Snapshot represents a point-in-time snapshot of aggregate state.

func NewSnapshot

func NewSnapshot(aggregateID, snapshotType string, data any, version int) *Snapshot

NewSnapshot creates a new snapshot.

type StoreConfig

type StoreConfig struct {
	Type     string `json:"type"     yaml:"type"`     // memory, postgres, mongodb
	Database string `json:"database" yaml:"database"` // Database connection name (from database extension)
	Table    string `json:"table"    yaml:"table"`
}

StoreConfig defines configuration for the event store.

func (*StoreConfig) ToCoreStoreConfig

func (c *StoreConfig) ToCoreStoreConfig() *core.EventStoreConfig

ToCoreStoreConfig converts to core.EventStoreConfig.

type TransactionalEventStore

type TransactionalEventStore = core.TransactionalEventStore

TransactionalEventStore extends EventStore with transaction support.

type TypedEventHandler

type TypedEventHandler = core.TypedEventHandler

TypedEventHandler is a handler for specific event types.

func NewTypedEventHandler

func NewTypedEventHandler(name string, eventTypes []string, handler EventHandlerFunc) *TypedEventHandler

NewTypedEventHandler creates a new typed event handler.

type WorkerStats

type WorkerStats struct {
	ID                    int           `json:"id"`
	EventsProcessed       int64         `json:"events_processed"`
	ErrorsEncountered     int64         `json:"errors_encountered"`
	TotalProcessingTime   time.Duration `json:"total_processing_time"`
	AverageProcessingTime time.Duration `json:"average_processing_time"`
	LastEventTime         *time.Time    `json:"last_event_time,omitempty"`
	IsRunning             bool          `json:"is_running"`
}

WorkerStats contains worker statistics.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL