events

package
v0.1.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEventNotFound is returned when an event is not found
	ErrEventNotFound = errors.New("event not found")

	// ErrSnapshotNotFound is returned when a snapshot is not found
	ErrSnapshotNotFound = errors.New("snapshot not found")

	// ErrInvalidEvent is returned when an event is invalid
	ErrInvalidEvent = errors.New("invalid event")

	// ErrInvalidSnapshot is returned when a snapshot is invalid
	ErrInvalidSnapshot = errors.New("invalid snapshot")

	// ErrBrokerNotFound is returned when a broker is not found
	ErrBrokerNotFound = errors.New("broker not found")

	// ErrBrokerAlreadyRegistered is returned when a broker is already registered
	ErrBrokerAlreadyRegistered = errors.New("broker already registered")

	// ErrHandlerNotFound is returned when a handler is not found
	ErrHandlerNotFound = errors.New("handler not found")

	// ErrEventBusNotStarted is returned when the event bus is not started
	ErrEventBusNotStarted = errors.New("event bus not started")

	// ErrEventStoreNotStarted is returned when the event store is not started
	ErrEventStoreNotStarted = errors.New("event store not started")
)

Functions

func NewEventBus

func NewEventBus(config EventBusOptions) (core.EventBus, error)

NewEventBus creates a new event bus

func NewExtension

func NewExtension() forge.Extension

NewExtension creates a new events extension

func NewExtensionWithConfig

func NewExtensionWithConfig(config Config) forge.Extension

NewExtensionWithConfig creates a new events extension with custom config

func WrapError

func WrapError(err error, message string) error

WrapError wraps an error with additional context

Types

type BrokerConfig

type BrokerConfig struct {
	Name     string                 `yaml:"name" json:"name"`
	Type     string                 `yaml:"type" json:"type"` // memory, nats, redis
	Enabled  bool                   `yaml:"enabled" json:"enabled"`
	Priority int                    `yaml:"priority" json:"priority"`
	Config   map[string]interface{} `yaml:"config" json:"config"`
}

BrokerConfig defines configuration for message brokers

type BusConfig

type BusConfig struct {
	DefaultBroker     string        `yaml:"default_broker" json:"default_broker"`
	MaxRetries        int           `yaml:"max_retries" json:"max_retries"`
	RetryDelay        time.Duration `yaml:"retry_delay" json:"retry_delay"`
	EnableMetrics     bool          `yaml:"enable_metrics" json:"enable_metrics"`
	EnableTracing     bool          `yaml:"enable_tracing" json:"enable_tracing"`
	BufferSize        int           `yaml:"buffer_size" json:"buffer_size"`
	WorkerCount       int           `yaml:"worker_count" json:"worker_count"`
	ProcessingTimeout time.Duration `yaml:"processing_timeout" json:"processing_timeout"`
}

BusConfig defines configuration for the event bus

type Config

type Config struct {
	// Event Bus configuration
	Bus BusConfig `yaml:"bus" json:"bus"`

	// Event Store configuration
	Store StoreConfig `yaml:"store" json:"store"`

	// Message Brokers
	Brokers []BrokerConfig `yaml:"brokers" json:"brokers"`

	// Metrics
	Metrics MetricsConfig `yaml:"metrics" json:"metrics"`
}

Config defines the configuration for the events extension

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration

type DomainEventHandler

type DomainEventHandler = core.DomainEventHandler

DomainEventHandler is a specialized handler for domain events

func NewDomainEventHandler

func NewDomainEventHandler(name, aggregateType string, eventTypes []string, handler EventHandlerFunc) *DomainEventHandler

NewDomainEventHandler creates a new domain event handler

type EventBusConfig

type EventBusConfig struct {
	DefaultBroker     string        `yaml:"default_broker" json:"default_broker"`
	MaxRetries        int           `yaml:"max_retries" json:"max_retries"`
	RetryDelay        time.Duration `yaml:"retry_delay" json:"retry_delay"`
	EnableMetrics     bool          `yaml:"enable_metrics" json:"enable_metrics"`
	EnableTracing     bool          `yaml:"enable_tracing" json:"enable_tracing"`
	BufferSize        int           `yaml:"buffer_size" json:"buffer_size"`
	WorkerCount       int           `yaml:"worker_count" json:"worker_count"`
	ProcessingTimeout time.Duration `yaml:"processing_timeout" json:"processing_timeout"`
}

EventBusConfig defines configuration for the event bus

type EventBusImpl

type EventBusImpl struct {
	// contains filtered or unexported fields
}

EventBusImpl implements EventBus

func (*EventBusImpl) Dependencies

func (eb *EventBusImpl) Dependencies() []string

Dependencies implements core.Service

func (*EventBusImpl) GetBroker

func (eb *EventBusImpl) GetBroker(name string) (core.MessageBroker, error)

GetBroker implements EventBus

func (*EventBusImpl) GetBrokers

func (eb *EventBusImpl) GetBrokers() map[string]core.MessageBroker

GetBrokers implements EventBus

func (*EventBusImpl) GetStats

func (eb *EventBusImpl) GetStats() map[string]interface{}

GetStats implements EventBus

func (*EventBusImpl) HealthCheck

func (eb *EventBusImpl) HealthCheck(ctx context.Context) error

HealthCheck implements core.EventBus

func (*EventBusImpl) Name

func (eb *EventBusImpl) Name() string

Name implements core.Service

func (*EventBusImpl) Publish

func (eb *EventBusImpl) Publish(ctx context.Context, event *core.Event) error

Publish implements EventBus

func (*EventBusImpl) PublishTo

func (eb *EventBusImpl) PublishTo(ctx context.Context, brokerName string, event *core.Event) error

PublishTo implements EventBus

func (*EventBusImpl) RegisterBroker

func (eb *EventBusImpl) RegisterBroker(name string, broker core.MessageBroker) error

RegisterBroker implements EventBus

func (*EventBusImpl) SetDefaultBroker

func (eb *EventBusImpl) SetDefaultBroker(name string) error

SetDefaultBroker implements EventBus

func (*EventBusImpl) Start

func (eb *EventBusImpl) Start(ctx context.Context) error

OnStart implements core.Service

func (*EventBusImpl) Stop

func (eb *EventBusImpl) Stop(ctx context.Context) error

OnStop implements core.Service

func (*EventBusImpl) Subscribe

func (eb *EventBusImpl) Subscribe(eventType string, handler core.EventHandler) error

Subscribe implements EventBus

func (*EventBusImpl) UnregisterBroker

func (eb *EventBusImpl) UnregisterBroker(name string) error

UnregisterBroker implements EventBus

func (*EventBusImpl) Unsubscribe

func (eb *EventBusImpl) Unsubscribe(eventType string, handlerName string) error

Unsubscribe implements EventBus

type EventBusOptions

type EventBusOptions struct {
	Store           core.EventStore
	HandlerRegistry *core.HandlerRegistry
	Logger          forge.Logger
	Metrics         forge.Metrics
	Config          EventBusConfig
}

EventBusOptions defines configuration for EventBusImpl

type EventCriteria

type EventCriteria = core.EventCriteria

EventCriteria defines criteria for querying events

func NewEventCriteria

func NewEventCriteria() *EventCriteria

NewEventCriteria creates a new event criteria

type EventHandler

type EventHandler = core.EventHandler

EventHandler defines the interface for handling events

type EventHandlerFunc

type EventHandlerFunc = core.EventHandlerFunc

EventHandlerFunc is a function adapter for EventHandler

type EventProjection

type EventProjection = core.EventProjection

EventProjection represents a projection of events

type EventService

type EventService struct {
	// contains filtered or unexported fields
}

EventService provides event-driven architecture capabilities

func NewEventService

func NewEventService(config Config, logger forge.Logger, metrics forge.Metrics) *EventService

NewEventService creates a new event service

func (*EventService) GetEventBus

func (es *EventService) GetEventBus() core.EventBus

GetEventBus returns the event bus

func (*EventService) GetEventStore

func (es *EventService) GetEventStore() core.EventStore

GetEventStore returns the event store

func (*EventService) GetHandlerRegistry

func (es *EventService) GetHandlerRegistry() *core.HandlerRegistry

GetHandlerRegistry returns the handler registry

func (*EventService) HealthCheck

func (es *EventService) HealthCheck(ctx context.Context) error

HealthCheck checks the health of the event service

func (*EventService) Start

func (es *EventService) Start(ctx context.Context) error

Start starts the event service

func (*EventService) Stop

func (es *EventService) Stop(ctx context.Context) error

Stop stops the event service

type EventStore

type EventStore = core.EventStore

EventStore defines the interface for persisting and retrieving events

type EventStoreConfig

type EventStoreConfig = core.EventStoreConfig

EventStoreConfig defines configuration for event stores

func DefaultEventStoreConfig

func DefaultEventStoreConfig() *EventStoreConfig

DefaultEventStoreConfig returns default configuration

type EventStoreMetrics

type EventStoreMetrics = core.EventStoreMetrics

EventStoreMetrics defines metrics for event stores

type EventStoreMigration

type EventStoreMigration = core.EventStoreMigration

EventStoreMigration represents a migration for the event store

type EventStoreMigrator

type EventStoreMigrator = core.EventStoreMigrator

EventStoreMigrator manages event store migrations

type EventStoreStats

type EventStoreStats = core.EventStoreStats

EventStoreStats represents statistics for an event store

type EventStoreTransaction

type EventStoreTransaction = core.EventStoreTransaction

EventStoreTransaction represents a transaction in the event store

type EventStream

type EventStream = core.EventStream

EventStream represents a stream of events

type EventStreamConfig

type EventStreamConfig = core.EventStreamConfig

EventStreamConfig defines configuration for event streams

func DefaultEventStreamConfig

func DefaultEventStreamConfig() *EventStreamConfig

DefaultEventStreamConfig returns default stream configuration

type EventStreamHandler

type EventStreamHandler = core.EventStreamHandler

EventStreamHandler handles events from an event stream

type EventWorker

type EventWorker struct {
	// contains filtered or unexported fields
}

EventWorker processes events from the queue

func NewEventWorker

func NewEventWorker(id int, eventQueue <-chan *core.EventEnvelope, processor func(context.Context, *core.EventEnvelope) error, logger forge.Logger, metrics forge.Metrics) *EventWorker

NewEventWorker creates a new event worker

func (*EventWorker) GetStats

func (ew *EventWorker) GetStats() map[string]interface{}

GetStats returns worker statistics

func (*EventWorker) Start

func (ew *EventWorker) Start(ctx context.Context)

Start starts the worker

type Extension

type Extension struct {
	// contains filtered or unexported fields
}

Extension implements forge.Extension for events

func (*Extension) Dependencies

func (e *Extension) Dependencies() []string

Dependencies returns the extension dependencies

func (*Extension) Description

func (e *Extension) Description() string

Description returns the extension description

func (*Extension) Health

func (e *Extension) Health(ctx context.Context) error

Health checks the health of the extension

func (*Extension) Name

func (e *Extension) Name() string

Name returns the extension name

func (*Extension) Register

func (e *Extension) Register(app forge.App) error

Register registers the extension with the app

func (*Extension) Start

func (e *Extension) Start(ctx context.Context) error

Start starts the extension

func (*Extension) Stop

func (e *Extension) Stop(ctx context.Context) error

Stop stops the extension

func (*Extension) Version

func (e *Extension) Version() string

Version returns the extension version

type HandlerMiddleware

type HandlerMiddleware = core.HandlerMiddleware

HandlerMiddleware defines middleware for event handlers

func LoggingMiddleware

func LoggingMiddleware(l forge.Logger) HandlerMiddleware

LoggingMiddleware creates logging middleware for handlers

func MetricsMiddleware

func MetricsMiddleware(metrics forge.Metrics) HandlerMiddleware

MetricsMiddleware creates metrics middleware for handlers

func ValidationMiddleware

func ValidationMiddleware() HandlerMiddleware

ValidationMiddleware creates validation middleware for handlers

type HandlerRegistry

type HandlerRegistry = core.HandlerRegistry

HandlerRegistry manages event handlers

func NewHandlerRegistry

func NewHandlerRegistry(logger forge.Logger, metrics forge.Metrics) *HandlerRegistry

NewHandlerRegistry creates a new handler registry

type MetricsConfig

type MetricsConfig struct {
	Enabled          bool          `yaml:"enabled" json:"enabled"`
	PublishInterval  time.Duration `yaml:"publish_interval" json:"publish_interval"`
	EnablePerType    bool          `yaml:"enable_per_type" json:"enable_per_type"`
	EnablePerHandler bool          `yaml:"enable_per_handler" json:"enable_per_handler"`
}

MetricsConfig defines configuration for event metrics

type ProjectionManager

type ProjectionManager = core.ProjectionManager

ProjectionManager manages event projections

type ReflectionEventHandler

type ReflectionEventHandler = core.ReflectionEventHandler

ReflectionEventHandler uses reflection to call methods based on event type

func NewReflectionEventHandler

func NewReflectionEventHandler(name string, target interface{}) *ReflectionEventHandler

NewReflectionEventHandler creates a new reflection-based event handler

type RetryPolicy

type RetryPolicy = core.RetryPolicy

RetryPolicy defines retry behavior for event handlers

func NewRetryPolicy

func NewRetryPolicy(maxRetries int, initialDelay time.Duration) *RetryPolicy

NewRetryPolicy creates a new retry policy

type Snapshot

type Snapshot = core.Snapshot

Snapshot represents a point-in-time snapshot of aggregate state

func NewSnapshot

func NewSnapshot(aggregateID, snapshotType string, data interface{}, version int) *Snapshot

NewSnapshot creates a new snapshot

type StoreConfig

type StoreConfig struct {
	Type     string `yaml:"type" json:"type"`         // memory, postgres, mongodb
	Database string `yaml:"database" json:"database"` // Database connection name (from database extension)
	Table    string `yaml:"table" json:"table"`
}

StoreConfig defines configuration for the event store

func (*StoreConfig) ToCoreStoreConfig

func (c *StoreConfig) ToCoreStoreConfig() *core.EventStoreConfig

ToCoreStoreConfig converts to core.EventStoreConfig

type TransactionalEventStore

type TransactionalEventStore = core.TransactionalEventStore

TransactionalEventStore extends EventStore with transaction support

type TypedEventHandler

type TypedEventHandler = core.TypedEventHandler

TypedEventHandler is a handler for specific event types

func NewTypedEventHandler

func NewTypedEventHandler(name string, eventTypes []string, handler EventHandlerFunc) *TypedEventHandler

NewTypedEventHandler creates a new typed event handler

type WorkerStats

type WorkerStats struct {
	ID                    int           `json:"id"`
	EventsProcessed       int64         `json:"events_processed"`
	ErrorsEncountered     int64         `json:"errors_encountered"`
	TotalProcessingTime   time.Duration `json:"total_processing_time"`
	AverageProcessingTime time.Duration `json:"average_processing_time"`
	LastEventTime         *time.Time    `json:"last_event_time,omitempty"`
	IsRunning             bool          `json:"is_running"`
}

WorkerStats contains worker statistics

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL