Documentation
¶
Index ¶
- Variables
- func NewEventBus(config EventBusOptions) (core.EventBus, error)
- func NewExtension() forge.Extension
- func NewExtensionWithConfig(config Config) forge.Extension
- func WrapError(err error, message string) error
- type BrokerConfig
- type BusConfig
- type Config
- type DomainEventHandler
- type EventBusConfig
- type EventBusImpl
- func (eb *EventBusImpl) Dependencies() []string
- func (eb *EventBusImpl) GetBroker(name string) (core.MessageBroker, error)
- func (eb *EventBusImpl) GetBrokers() map[string]core.MessageBroker
- func (eb *EventBusImpl) GetStats() map[string]any
- func (eb *EventBusImpl) HealthCheck(ctx context.Context) error
- func (eb *EventBusImpl) Name() string
- func (eb *EventBusImpl) Publish(ctx context.Context, event *core.Event) error
- func (eb *EventBusImpl) PublishTo(ctx context.Context, brokerName string, event *core.Event) error
- func (eb *EventBusImpl) RegisterBroker(name string, broker core.MessageBroker) error
- func (eb *EventBusImpl) SetDefaultBroker(name string) error
- func (eb *EventBusImpl) Start(ctx context.Context) error
- func (eb *EventBusImpl) Stop(ctx context.Context) error
- func (eb *EventBusImpl) Subscribe(eventType string, handler core.EventHandler) error
- func (eb *EventBusImpl) UnregisterBroker(name string) error
- func (eb *EventBusImpl) Unsubscribe(eventType string, handlerName string) error
- type EventBusOptions
- type EventCriteria
- type EventHandler
- type EventHandlerFunc
- type EventProjection
- type EventService
- func (es *EventService) GetEventBus() core.EventBus
- func (es *EventService) GetEventStore() core.EventStore
- func (es *EventService) GetHandlerRegistry() *core.HandlerRegistry
- func (es *EventService) HealthCheck(ctx context.Context) error
- func (es *EventService) Start(ctx context.Context) error
- func (es *EventService) Stop(ctx context.Context) error
- type EventStore
- type EventStoreConfig
- type EventStoreMetrics
- type EventStoreMigration
- type EventStoreMigrator
- type EventStoreStats
- type EventStoreTransaction
- type EventStream
- type EventStreamConfig
- type EventStreamHandler
- type EventWorker
- type Extension
- func (e *Extension) Dependencies() []string
- func (e *Extension) Description() string
- func (e *Extension) Health(ctx context.Context) error
- func (e *Extension) Name() string
- func (e *Extension) Register(app forge.App) error
- func (e *Extension) Start(ctx context.Context) error
- func (e *Extension) Stop(ctx context.Context) error
- func (e *Extension) Version() string
- type HandlerMiddleware
- type HandlerRegistry
- type MetricsConfig
- type ProjectionManager
- type ReflectionEventHandler
- type RetryPolicy
- type Snapshot
- type StoreConfig
- type TransactionalEventStore
- type TypedEventHandler
- type WorkerStats
Constants ¶
This section is empty.
Variables ¶
var ( // ErrEventNotFound is returned when an event is not found. ErrEventNotFound = errors.New("event not found") // ErrSnapshotNotFound is returned when a snapshot is not found. ErrSnapshotNotFound = errors.New("snapshot not found") // ErrInvalidEvent is returned when an event is invalid. ErrInvalidEvent = errors.New("invalid event") // ErrInvalidSnapshot is returned when a snapshot is invalid. ErrInvalidSnapshot = errors.New("invalid snapshot") // ErrBrokerNotFound is returned when a broker is not found. ErrBrokerNotFound = errors.New("broker not found") // ErrBrokerAlreadyRegistered is returned when a broker is already registered. ErrBrokerAlreadyRegistered = errors.New("broker already registered") // ErrHandlerNotFound is returned when a handler is not found. ErrHandlerNotFound = errors.New("handler not found") // ErrEventBusNotStarted is returned when the event bus is not started. ErrEventBusNotStarted = errors.New("event bus not started") // ErrEventStoreNotStarted is returned when the event store is not started. ErrEventStoreNotStarted = errors.New("event store not started") )
Functions ¶
func NewEventBus ¶
func NewEventBus(config EventBusOptions) (core.EventBus, error)
NewEventBus creates a new event bus.
func NewExtension ¶
NewExtension creates a new events extension.
func NewExtensionWithConfig ¶
NewExtensionWithConfig creates a new events extension with custom config.
Types ¶
type BrokerConfig ¶
type BrokerConfig struct {
Name string `json:"name" yaml:"name"`
Type string `json:"type" yaml:"type"` // memory, nats, redis
Enabled bool `json:"enabled" yaml:"enabled"`
Priority int `json:"priority" yaml:"priority"`
Config map[string]any `json:"config" yaml:"config"`
}
BrokerConfig defines configuration for message brokers.
type BusConfig ¶
type BusConfig struct {
DefaultBroker string `json:"default_broker" yaml:"default_broker"`
MaxRetries int `json:"max_retries" yaml:"max_retries"`
RetryDelay time.Duration `json:"retry_delay" yaml:"retry_delay"`
EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics"`
EnableTracing bool `json:"enable_tracing" yaml:"enable_tracing"`
BufferSize int `json:"buffer_size" yaml:"buffer_size"`
WorkerCount int `json:"worker_count" yaml:"worker_count"`
ProcessingTimeout time.Duration `json:"processing_timeout" yaml:"processing_timeout"`
}
BusConfig defines configuration for the event bus.
type Config ¶
type Config struct {
// Event Bus configuration
Bus BusConfig `json:"bus" yaml:"bus"`
// Event Store configuration
Store StoreConfig `json:"store" yaml:"store"`
// Message Brokers
Brokers []BrokerConfig `json:"brokers" yaml:"brokers"`
// Metrics
Metrics MetricsConfig `json:"metrics" yaml:"metrics"`
}
Config defines the configuration for the events extension.
type DomainEventHandler ¶
type DomainEventHandler = core.DomainEventHandler
DomainEventHandler is a specialized handler for domain events.
func NewDomainEventHandler ¶
func NewDomainEventHandler(name, aggregateType string, eventTypes []string, handler EventHandlerFunc) *DomainEventHandler
NewDomainEventHandler creates a new domain event handler.
type EventBusConfig ¶
type EventBusConfig struct {
DefaultBroker string `json:"default_broker" yaml:"default_broker"`
MaxRetries int `json:"max_retries" yaml:"max_retries"`
RetryDelay time.Duration `json:"retry_delay" yaml:"retry_delay"`
EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics"`
EnableTracing bool `json:"enable_tracing" yaml:"enable_tracing"`
BufferSize int `json:"buffer_size" yaml:"buffer_size"`
WorkerCount int `json:"worker_count" yaml:"worker_count"`
ProcessingTimeout time.Duration `json:"processing_timeout" yaml:"processing_timeout"`
}
EventBusConfig defines configuration for the event bus.
type EventBusImpl ¶
type EventBusImpl struct {
// contains filtered or unexported fields
}
EventBusImpl implements EventBus.
func (*EventBusImpl) Dependencies ¶
func (eb *EventBusImpl) Dependencies() []string
Dependencies implements core.Service.
func (*EventBusImpl) GetBroker ¶
func (eb *EventBusImpl) GetBroker(name string) (core.MessageBroker, error)
GetBroker implements EventBus.
func (*EventBusImpl) GetBrokers ¶
func (eb *EventBusImpl) GetBrokers() map[string]core.MessageBroker
GetBrokers implements EventBus.
func (*EventBusImpl) GetStats ¶
func (eb *EventBusImpl) GetStats() map[string]any
GetStats implements EventBus.
func (*EventBusImpl) HealthCheck ¶
func (eb *EventBusImpl) HealthCheck(ctx context.Context) error
HealthCheck implements core.EventBus.
func (*EventBusImpl) RegisterBroker ¶
func (eb *EventBusImpl) RegisterBroker(name string, broker core.MessageBroker) error
RegisterBroker implements EventBus.
func (*EventBusImpl) SetDefaultBroker ¶
func (eb *EventBusImpl) SetDefaultBroker(name string) error
SetDefaultBroker implements EventBus.
func (*EventBusImpl) Start ¶
func (eb *EventBusImpl) Start(ctx context.Context) error
OnStart implements core.Service. This method is idempotent - calling it multiple times is safe.
func (*EventBusImpl) Stop ¶
func (eb *EventBusImpl) Stop(ctx context.Context) error
OnStop implements core.Service.
func (*EventBusImpl) Subscribe ¶
func (eb *EventBusImpl) Subscribe(eventType string, handler core.EventHandler) error
Subscribe implements EventBus.
func (*EventBusImpl) UnregisterBroker ¶
func (eb *EventBusImpl) UnregisterBroker(name string) error
UnregisterBroker implements EventBus.
func (*EventBusImpl) Unsubscribe ¶
func (eb *EventBusImpl) Unsubscribe(eventType string, handlerName string) error
Unsubscribe implements EventBus.
type EventBusOptions ¶
type EventBusOptions struct {
Store core.EventStore
HandlerRegistry *core.HandlerRegistry
Logger forge.Logger
Metrics forge.Metrics
Config EventBusConfig
}
EventBusOptions defines configuration for EventBusImpl.
type EventCriteria ¶
type EventCriteria = core.EventCriteria
EventCriteria defines criteria for querying events.
func NewEventCriteria ¶
func NewEventCriteria() *EventCriteria
NewEventCriteria creates a new event criteria.
type EventHandler ¶
type EventHandler = core.EventHandler
EventHandler defines the interface for handling events.
type EventHandlerFunc ¶
type EventHandlerFunc = core.EventHandlerFunc
EventHandlerFunc is a function adapter for EventHandler.
type EventProjection ¶
type EventProjection = core.EventProjection
EventProjection represents a projection of events.
type EventService ¶
type EventService struct {
// contains filtered or unexported fields
}
EventService provides event-driven architecture capabilities.
func NewEventService ¶
NewEventService creates a new event service.
func (*EventService) GetEventBus ¶
func (es *EventService) GetEventBus() core.EventBus
GetEventBus returns the event bus.
func (*EventService) GetEventStore ¶
func (es *EventService) GetEventStore() core.EventStore
GetEventStore returns the event store.
func (*EventService) GetHandlerRegistry ¶
func (es *EventService) GetHandlerRegistry() *core.HandlerRegistry
GetHandlerRegistry returns the handler registry.
func (*EventService) HealthCheck ¶
func (es *EventService) HealthCheck(ctx context.Context) error
HealthCheck checks the health of the event service.
type EventStore ¶
type EventStore = core.EventStore
EventStore defines the interface for persisting and retrieving events.
type EventStoreConfig ¶
type EventStoreConfig = core.EventStoreConfig
EventStoreConfig defines configuration for event stores.
func DefaultEventStoreConfig ¶
func DefaultEventStoreConfig() *EventStoreConfig
DefaultEventStoreConfig returns default configuration.
type EventStoreMetrics ¶
type EventStoreMetrics = core.EventStoreMetrics
EventStoreMetrics defines metrics for event stores.
type EventStoreMigration ¶
type EventStoreMigration = core.EventStoreMigration
EventStoreMigration represents a migration for the event store.
type EventStoreMigrator ¶
type EventStoreMigrator = core.EventStoreMigrator
EventStoreMigrator manages event store migrations.
type EventStoreStats ¶
type EventStoreStats = core.EventStoreStats
EventStoreStats represents statistics for an event store.
type EventStoreTransaction ¶
type EventStoreTransaction = core.EventStoreTransaction
EventStoreTransaction represents a transaction in the event store.
type EventStreamConfig ¶
type EventStreamConfig = core.EventStreamConfig
EventStreamConfig defines configuration for event streams.
func DefaultEventStreamConfig ¶
func DefaultEventStreamConfig() *EventStreamConfig
DefaultEventStreamConfig returns default stream configuration.
type EventStreamHandler ¶
type EventStreamHandler = core.EventStreamHandler
EventStreamHandler handles events from an event stream.
type EventWorker ¶
type EventWorker struct {
// contains filtered or unexported fields
}
EventWorker processes events from the queue.
func NewEventWorker ¶
func NewEventWorker(id int, eventQueue <-chan *core.EventEnvelope, processor func(context.Context, *core.EventEnvelope) error, logger forge.Logger, metrics forge.Metrics) *EventWorker
NewEventWorker creates a new event worker.
func (*EventWorker) GetStats ¶
func (ew *EventWorker) GetStats() map[string]any
GetStats returns worker statistics.
func (*EventWorker) Start ¶
func (ew *EventWorker) Start(ctx context.Context)
Start starts the worker.
type Extension ¶
type Extension struct {
// contains filtered or unexported fields
}
Extension implements forge.Extension for events.
func (*Extension) Dependencies ¶
Dependencies returns the extension dependencies.
func (*Extension) Description ¶
Description returns the extension description.
type HandlerMiddleware ¶
type HandlerMiddleware = core.HandlerMiddleware
HandlerMiddleware defines middleware for event handlers.
func LoggingMiddleware ¶
func LoggingMiddleware(l forge.Logger) HandlerMiddleware
LoggingMiddleware creates logging middleware for handlers.
func MetricsMiddleware ¶
func MetricsMiddleware(metrics forge.Metrics) HandlerMiddleware
MetricsMiddleware creates metrics middleware for handlers.
func ValidationMiddleware ¶
func ValidationMiddleware() HandlerMiddleware
ValidationMiddleware creates validation middleware for handlers.
type HandlerRegistry ¶
type HandlerRegistry = core.HandlerRegistry
HandlerRegistry manages event handlers.
func NewHandlerRegistry ¶
func NewHandlerRegistry(logger forge.Logger, metrics forge.Metrics) *HandlerRegistry
NewHandlerRegistry creates a new handler registry.
type MetricsConfig ¶
type MetricsConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
PublishInterval time.Duration `json:"publish_interval" yaml:"publish_interval"`
EnablePerType bool `json:"enable_per_type" yaml:"enable_per_type"`
EnablePerHandler bool `json:"enable_per_handler" yaml:"enable_per_handler"`
}
MetricsConfig defines configuration for event metrics.
type ProjectionManager ¶
type ProjectionManager = core.ProjectionManager
ProjectionManager manages event projections.
type ReflectionEventHandler ¶
type ReflectionEventHandler = core.ReflectionEventHandler
ReflectionEventHandler uses reflection to call methods based on event type.
func NewReflectionEventHandler ¶
func NewReflectionEventHandler(name string, target any) *ReflectionEventHandler
NewReflectionEventHandler creates a new reflection-based event handler.
type RetryPolicy ¶
type RetryPolicy = core.RetryPolicy
RetryPolicy defines retry behavior for event handlers.
func NewRetryPolicy ¶
func NewRetryPolicy(maxRetries int, initialDelay time.Duration) *RetryPolicy
NewRetryPolicy creates a new retry policy.
type StoreConfig ¶
type StoreConfig struct {
Type string `json:"type" yaml:"type"` // memory, postgres, mongodb
Database string `json:"database" yaml:"database"` // Database connection name (from database extension)
Table string `json:"table" yaml:"table"`
}
StoreConfig defines configuration for the event store.
func (*StoreConfig) ToCoreStoreConfig ¶
func (c *StoreConfig) ToCoreStoreConfig() *core.EventStoreConfig
ToCoreStoreConfig converts to core.EventStoreConfig.
type TransactionalEventStore ¶
type TransactionalEventStore = core.TransactionalEventStore
TransactionalEventStore extends EventStore with transaction support.
type TypedEventHandler ¶
type TypedEventHandler = core.TypedEventHandler
TypedEventHandler is a handler for specific event types.
func NewTypedEventHandler ¶
func NewTypedEventHandler(name string, eventTypes []string, handler EventHandlerFunc) *TypedEventHandler
NewTypedEventHandler creates a new typed event handler.
type WorkerStats ¶
type WorkerStats struct {
ID int `json:"id"`
EventsProcessed int64 `json:"events_processed"`
ErrorsEncountered int64 `json:"errors_encountered"`
TotalProcessingTime time.Duration `json:"total_processing_time"`
AverageProcessingTime time.Duration `json:"average_processing_time"`
LastEventTime *time.Time `json:"last_event_time,omitempty"`
IsRunning bool `json:"is_running"`
}
WorkerStats contains worker statistics.