Documentation
¶
Index ¶
- Variables
- func NewEventBus(config EventBusOptions) (core.EventBus, error)
- func NewExtension() forge.Extension
- func NewExtensionWithConfig(config Config) forge.Extension
- func WrapError(err error, message string) error
- type BrokerConfig
- type BusConfig
- type Config
- type DomainEventHandler
- type EventBusConfig
- type EventBusImpl
- func (eb *EventBusImpl) Dependencies() []string
- func (eb *EventBusImpl) GetBroker(name string) (core.MessageBroker, error)
- func (eb *EventBusImpl) GetBrokers() map[string]core.MessageBroker
- func (eb *EventBusImpl) GetStats() map[string]interface{}
- func (eb *EventBusImpl) HealthCheck(ctx context.Context) error
- func (eb *EventBusImpl) Name() string
- func (eb *EventBusImpl) Publish(ctx context.Context, event *core.Event) error
- func (eb *EventBusImpl) PublishTo(ctx context.Context, brokerName string, event *core.Event) error
- func (eb *EventBusImpl) RegisterBroker(name string, broker core.MessageBroker) error
- func (eb *EventBusImpl) SetDefaultBroker(name string) error
- func (eb *EventBusImpl) Start(ctx context.Context) error
- func (eb *EventBusImpl) Stop(ctx context.Context) error
- func (eb *EventBusImpl) Subscribe(eventType string, handler core.EventHandler) error
- func (eb *EventBusImpl) UnregisterBroker(name string) error
- func (eb *EventBusImpl) Unsubscribe(eventType string, handlerName string) error
- type EventBusOptions
- type EventCriteria
- type EventHandler
- type EventHandlerFunc
- type EventProjection
- type EventService
- func (es *EventService) GetEventBus() core.EventBus
- func (es *EventService) GetEventStore() core.EventStore
- func (es *EventService) GetHandlerRegistry() *core.HandlerRegistry
- func (es *EventService) HealthCheck(ctx context.Context) error
- func (es *EventService) Start(ctx context.Context) error
- func (es *EventService) Stop(ctx context.Context) error
- type EventStore
- type EventStoreConfig
- type EventStoreMetrics
- type EventStoreMigration
- type EventStoreMigrator
- type EventStoreStats
- type EventStoreTransaction
- type EventStream
- type EventStreamConfig
- type EventStreamHandler
- type EventWorker
- type Extension
- func (e *Extension) Dependencies() []string
- func (e *Extension) Description() string
- func (e *Extension) Health(ctx context.Context) error
- func (e *Extension) Name() string
- func (e *Extension) Register(app forge.App) error
- func (e *Extension) Start(ctx context.Context) error
- func (e *Extension) Stop(ctx context.Context) error
- func (e *Extension) Version() string
- type HandlerMiddleware
- type HandlerRegistry
- type MetricsConfig
- type ProjectionManager
- type ReflectionEventHandler
- type RetryPolicy
- type Snapshot
- type StoreConfig
- type TransactionalEventStore
- type TypedEventHandler
- type WorkerStats
Constants ¶
This section is empty.
Variables ¶
var ( // ErrEventNotFound is returned when an event is not found ErrEventNotFound = errors.New("event not found") // ErrSnapshotNotFound is returned when a snapshot is not found ErrSnapshotNotFound = errors.New("snapshot not found") // ErrInvalidEvent is returned when an event is invalid ErrInvalidEvent = errors.New("invalid event") // ErrInvalidSnapshot is returned when a snapshot is invalid ErrInvalidSnapshot = errors.New("invalid snapshot") // ErrBrokerNotFound is returned when a broker is not found ErrBrokerNotFound = errors.New("broker not found") // ErrBrokerAlreadyRegistered is returned when a broker is already registered ErrBrokerAlreadyRegistered = errors.New("broker already registered") // ErrHandlerNotFound is returned when a handler is not found ErrHandlerNotFound = errors.New("handler not found") // ErrEventBusNotStarted is returned when the event bus is not started ErrEventBusNotStarted = errors.New("event bus not started") // ErrEventStoreNotStarted is returned when the event store is not started ErrEventStoreNotStarted = errors.New("event store not started") )
Functions ¶
func NewEventBus ¶
func NewEventBus(config EventBusOptions) (core.EventBus, error)
NewEventBus creates a new event bus
func NewExtensionWithConfig ¶
NewExtensionWithConfig creates a new events extension with custom config
Types ¶
type BrokerConfig ¶
type BrokerConfig struct {
Name string `yaml:"name" json:"name"`
Type string `yaml:"type" json:"type"` // memory, nats, redis
Enabled bool `yaml:"enabled" json:"enabled"`
Priority int `yaml:"priority" json:"priority"`
Config map[string]interface{} `yaml:"config" json:"config"`
}
BrokerConfig defines configuration for message brokers
type BusConfig ¶
type BusConfig struct {
DefaultBroker string `yaml:"default_broker" json:"default_broker"`
MaxRetries int `yaml:"max_retries" json:"max_retries"`
RetryDelay time.Duration `yaml:"retry_delay" json:"retry_delay"`
EnableMetrics bool `yaml:"enable_metrics" json:"enable_metrics"`
EnableTracing bool `yaml:"enable_tracing" json:"enable_tracing"`
BufferSize int `yaml:"buffer_size" json:"buffer_size"`
WorkerCount int `yaml:"worker_count" json:"worker_count"`
ProcessingTimeout time.Duration `yaml:"processing_timeout" json:"processing_timeout"`
}
BusConfig defines configuration for the event bus
type Config ¶
type Config struct {
// Event Bus configuration
Bus BusConfig `yaml:"bus" json:"bus"`
// Event Store configuration
Store StoreConfig `yaml:"store" json:"store"`
// Message Brokers
Brokers []BrokerConfig `yaml:"brokers" json:"brokers"`
// Metrics
Metrics MetricsConfig `yaml:"metrics" json:"metrics"`
}
Config defines the configuration for the events extension
type DomainEventHandler ¶
type DomainEventHandler = core.DomainEventHandler
DomainEventHandler is a specialized handler for domain events
func NewDomainEventHandler ¶
func NewDomainEventHandler(name, aggregateType string, eventTypes []string, handler EventHandlerFunc) *DomainEventHandler
NewDomainEventHandler creates a new domain event handler
type EventBusConfig ¶
type EventBusConfig struct {
DefaultBroker string `yaml:"default_broker" json:"default_broker"`
MaxRetries int `yaml:"max_retries" json:"max_retries"`
RetryDelay time.Duration `yaml:"retry_delay" json:"retry_delay"`
EnableMetrics bool `yaml:"enable_metrics" json:"enable_metrics"`
EnableTracing bool `yaml:"enable_tracing" json:"enable_tracing"`
BufferSize int `yaml:"buffer_size" json:"buffer_size"`
WorkerCount int `yaml:"worker_count" json:"worker_count"`
ProcessingTimeout time.Duration `yaml:"processing_timeout" json:"processing_timeout"`
}
EventBusConfig defines configuration for the event bus
type EventBusImpl ¶
type EventBusImpl struct {
// contains filtered or unexported fields
}
EventBusImpl implements EventBus
func (*EventBusImpl) Dependencies ¶
func (eb *EventBusImpl) Dependencies() []string
Dependencies implements core.Service
func (*EventBusImpl) GetBroker ¶
func (eb *EventBusImpl) GetBroker(name string) (core.MessageBroker, error)
GetBroker implements EventBus
func (*EventBusImpl) GetBrokers ¶
func (eb *EventBusImpl) GetBrokers() map[string]core.MessageBroker
GetBrokers implements EventBus
func (*EventBusImpl) GetStats ¶
func (eb *EventBusImpl) GetStats() map[string]interface{}
GetStats implements EventBus
func (*EventBusImpl) HealthCheck ¶
func (eb *EventBusImpl) HealthCheck(ctx context.Context) error
HealthCheck implements core.EventBus
func (*EventBusImpl) RegisterBroker ¶
func (eb *EventBusImpl) RegisterBroker(name string, broker core.MessageBroker) error
RegisterBroker implements EventBus
func (*EventBusImpl) SetDefaultBroker ¶
func (eb *EventBusImpl) SetDefaultBroker(name string) error
SetDefaultBroker implements EventBus
func (*EventBusImpl) Start ¶
func (eb *EventBusImpl) Start(ctx context.Context) error
OnStart implements core.Service
func (*EventBusImpl) Stop ¶
func (eb *EventBusImpl) Stop(ctx context.Context) error
OnStop implements core.Service
func (*EventBusImpl) Subscribe ¶
func (eb *EventBusImpl) Subscribe(eventType string, handler core.EventHandler) error
Subscribe implements EventBus
func (*EventBusImpl) UnregisterBroker ¶
func (eb *EventBusImpl) UnregisterBroker(name string) error
UnregisterBroker implements EventBus
func (*EventBusImpl) Unsubscribe ¶
func (eb *EventBusImpl) Unsubscribe(eventType string, handlerName string) error
Unsubscribe implements EventBus
type EventBusOptions ¶
type EventBusOptions struct {
Store core.EventStore
HandlerRegistry *core.HandlerRegistry
Logger forge.Logger
Metrics forge.Metrics
Config EventBusConfig
}
EventBusOptions defines configuration for EventBusImpl
type EventCriteria ¶
type EventCriteria = core.EventCriteria
EventCriteria defines criteria for querying events
func NewEventCriteria ¶
func NewEventCriteria() *EventCriteria
NewEventCriteria creates a new event criteria
type EventHandler ¶
type EventHandler = core.EventHandler
EventHandler defines the interface for handling events
type EventHandlerFunc ¶
type EventHandlerFunc = core.EventHandlerFunc
EventHandlerFunc is a function adapter for EventHandler
type EventProjection ¶
type EventProjection = core.EventProjection
EventProjection represents a projection of events
type EventService ¶
type EventService struct {
// contains filtered or unexported fields
}
EventService provides event-driven architecture capabilities
func NewEventService ¶
NewEventService creates a new event service
func (*EventService) GetEventBus ¶
func (es *EventService) GetEventBus() core.EventBus
GetEventBus returns the event bus
func (*EventService) GetEventStore ¶
func (es *EventService) GetEventStore() core.EventStore
GetEventStore returns the event store
func (*EventService) GetHandlerRegistry ¶
func (es *EventService) GetHandlerRegistry() *core.HandlerRegistry
GetHandlerRegistry returns the handler registry
func (*EventService) HealthCheck ¶
func (es *EventService) HealthCheck(ctx context.Context) error
HealthCheck checks the health of the event service
type EventStore ¶
type EventStore = core.EventStore
EventStore defines the interface for persisting and retrieving events
type EventStoreConfig ¶
type EventStoreConfig = core.EventStoreConfig
EventStoreConfig defines configuration for event stores
func DefaultEventStoreConfig ¶
func DefaultEventStoreConfig() *EventStoreConfig
DefaultEventStoreConfig returns default configuration
type EventStoreMetrics ¶
type EventStoreMetrics = core.EventStoreMetrics
EventStoreMetrics defines metrics for event stores
type EventStoreMigration ¶
type EventStoreMigration = core.EventStoreMigration
EventStoreMigration represents a migration for the event store
type EventStoreMigrator ¶
type EventStoreMigrator = core.EventStoreMigrator
EventStoreMigrator manages event store migrations
type EventStoreStats ¶
type EventStoreStats = core.EventStoreStats
EventStoreStats represents statistics for an event store
type EventStoreTransaction ¶
type EventStoreTransaction = core.EventStoreTransaction
EventStoreTransaction represents a transaction in the event store
type EventStreamConfig ¶
type EventStreamConfig = core.EventStreamConfig
EventStreamConfig defines configuration for event streams
func DefaultEventStreamConfig ¶
func DefaultEventStreamConfig() *EventStreamConfig
DefaultEventStreamConfig returns default stream configuration
type EventStreamHandler ¶
type EventStreamHandler = core.EventStreamHandler
EventStreamHandler handles events from an event stream
type EventWorker ¶
type EventWorker struct {
// contains filtered or unexported fields
}
EventWorker processes events from the queue
func NewEventWorker ¶
func NewEventWorker(id int, eventQueue <-chan *core.EventEnvelope, processor func(context.Context, *core.EventEnvelope) error, logger forge.Logger, metrics forge.Metrics) *EventWorker
NewEventWorker creates a new event worker
func (*EventWorker) GetStats ¶
func (ew *EventWorker) GetStats() map[string]interface{}
GetStats returns worker statistics
func (*EventWorker) Start ¶
func (ew *EventWorker) Start(ctx context.Context)
Start starts the worker
type Extension ¶
type Extension struct {
// contains filtered or unexported fields
}
Extension implements forge.Extension for events
func (*Extension) Dependencies ¶
Dependencies returns the extension dependencies
func (*Extension) Description ¶
Description returns the extension description
type HandlerMiddleware ¶
type HandlerMiddleware = core.HandlerMiddleware
HandlerMiddleware defines middleware for event handlers
func LoggingMiddleware ¶
func LoggingMiddleware(l forge.Logger) HandlerMiddleware
LoggingMiddleware creates logging middleware for handlers
func MetricsMiddleware ¶
func MetricsMiddleware(metrics forge.Metrics) HandlerMiddleware
MetricsMiddleware creates metrics middleware for handlers
func ValidationMiddleware ¶
func ValidationMiddleware() HandlerMiddleware
ValidationMiddleware creates validation middleware for handlers
type HandlerRegistry ¶
type HandlerRegistry = core.HandlerRegistry
HandlerRegistry manages event handlers
func NewHandlerRegistry ¶
func NewHandlerRegistry(logger forge.Logger, metrics forge.Metrics) *HandlerRegistry
NewHandlerRegistry creates a new handler registry
type MetricsConfig ¶
type MetricsConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"`
PublishInterval time.Duration `yaml:"publish_interval" json:"publish_interval"`
EnablePerType bool `yaml:"enable_per_type" json:"enable_per_type"`
EnablePerHandler bool `yaml:"enable_per_handler" json:"enable_per_handler"`
}
MetricsConfig defines configuration for event metrics
type ProjectionManager ¶
type ProjectionManager = core.ProjectionManager
ProjectionManager manages event projections
type ReflectionEventHandler ¶
type ReflectionEventHandler = core.ReflectionEventHandler
ReflectionEventHandler uses reflection to call methods based on event type
func NewReflectionEventHandler ¶
func NewReflectionEventHandler(name string, target interface{}) *ReflectionEventHandler
NewReflectionEventHandler creates a new reflection-based event handler
type RetryPolicy ¶
type RetryPolicy = core.RetryPolicy
RetryPolicy defines retry behavior for event handlers
func NewRetryPolicy ¶
func NewRetryPolicy(maxRetries int, initialDelay time.Duration) *RetryPolicy
NewRetryPolicy creates a new retry policy
type Snapshot ¶
Snapshot represents a point-in-time snapshot of aggregate state
func NewSnapshot ¶
NewSnapshot creates a new snapshot
type StoreConfig ¶
type StoreConfig struct {
Type string `yaml:"type" json:"type"` // memory, postgres, mongodb
Database string `yaml:"database" json:"database"` // Database connection name (from database extension)
Table string `yaml:"table" json:"table"`
}
StoreConfig defines configuration for the event store
func (*StoreConfig) ToCoreStoreConfig ¶
func (c *StoreConfig) ToCoreStoreConfig() *core.EventStoreConfig
ToCoreStoreConfig converts to core.EventStoreConfig
type TransactionalEventStore ¶
type TransactionalEventStore = core.TransactionalEventStore
TransactionalEventStore extends EventStore with transaction support
type TypedEventHandler ¶
type TypedEventHandler = core.TypedEventHandler
TypedEventHandler is a handler for specific event types
func NewTypedEventHandler ¶
func NewTypedEventHandler(name string, eventTypes []string, handler EventHandlerFunc) *TypedEventHandler
NewTypedEventHandler creates a new typed event handler
type WorkerStats ¶
type WorkerStats struct {
ID int `json:"id"`
EventsProcessed int64 `json:"events_processed"`
ErrorsEncountered int64 `json:"errors_encountered"`
TotalProcessingTime time.Duration `json:"total_processing_time"`
AverageProcessingTime time.Duration `json:"average_processing_time"`
LastEventTime *time.Time `json:"last_event_time,omitempty"`
IsRunning bool `json:"is_running"`
}
WorkerStats contains worker statistics