Documentation
¶
Index ¶
- type DomainEventHandler
- type Event
- func (e *Event) Clone() *Event
- func (e *Event) GetMetadata(key string) (interface{}, bool)
- func (e *Event) GetMetadataString(key string) string
- func (e *Event) MarshalJSON() ([]byte, error)
- func (e *Event) String() string
- func (e *Event) UnmarshalJSON(data []byte) error
- func (e *Event) Validate() error
- func (e *Event) WithCausationID(causationID string) *Event
- func (e *Event) WithCorrelationID(correlationID string) *Event
- func (e *Event) WithMetadata(key string, value interface{}) *Event
- func (e *Event) WithSource(source string) *Event
- func (e *Event) WithVersion(version int) *Event
- type EventBus
- type EventCollection
- type EventCriteria
- func (ec *EventCriteria) Validate() error
- func (ec *EventCriteria) WithAggregateIDs(aggregateIDs ...string) *EventCriteria
- func (ec *EventCriteria) WithEndTime(end time.Time) *EventCriteria
- func (ec *EventCriteria) WithEventTypes(eventTypes ...string) *EventCriteria
- func (ec *EventCriteria) WithLimit(limit int) *EventCriteria
- func (ec *EventCriteria) WithMetadata(key string, value interface{}) *EventCriteria
- func (ec *EventCriteria) WithOffset(offset int64) *EventCriteria
- func (ec *EventCriteria) WithSort(sortBy, sortOrder string) *EventCriteria
- func (ec *EventCriteria) WithSources(sources ...string) *EventCriteria
- func (ec *EventCriteria) WithStartTime(start time.Time) *EventCriteria
- func (ec *EventCriteria) WithTimeRange(start, end time.Time) *EventCriteria
- type EventEnvelope
- type EventFilter
- type EventHandler
- type EventHandlerFunc
- type EventProjection
- type EventStore
- type EventStoreConfig
- type EventStoreMetrics
- type EventStoreMigration
- type EventStoreMigrator
- type EventStoreStats
- type EventStoreTransaction
- type EventStream
- type EventStreamConfig
- type EventStreamHandler
- type HandlerMiddleware
- type HandlerRegistry
- func (hr *HandlerRegistry) GetAllHandlers() map[string][]EventHandler
- func (hr *HandlerRegistry) GetHandlers(eventType string) []EventHandler
- func (hr *HandlerRegistry) HandleEvent(ctx context.Context, event *Event) error
- func (hr *HandlerRegistry) Register(eventType string, handler EventHandler) error
- func (hr *HandlerRegistry) Stats() map[string]interface{}
- func (hr *HandlerRegistry) Unregister(eventType string, handlerName string) error
- type MessageBroker
- type ProjectionManager
- type ReflectionEventHandler
- type RetryPolicy
- func (rp *RetryPolicy) CalculateDelay(attempt int) time.Duration
- func (rp *RetryPolicy) ShouldRetry(err error) bool
- func (rp *RetryPolicy) WithBackoffFactor(factor float64) *RetryPolicy
- func (rp *RetryPolicy) WithMaxDelay(maxDelay time.Duration) *RetryPolicy
- func (rp *RetryPolicy) WithRetryableErrors(errors ...error) *RetryPolicy
- func (rp *RetryPolicy) WithShouldRetryFunc(fn func(error) bool) *RetryPolicy
- type Snapshot
- type TransactionalEventStore
- type TypedEventHandler
- func (h *TypedEventHandler) CanHandle(event *Event) bool
- func (h *TypedEventHandler) Handle(ctx context.Context, event *Event) error
- func (h *TypedEventHandler) Name() string
- func (h *TypedEventHandler) WithLogger(logger forge.Logger) *TypedEventHandler
- func (h *TypedEventHandler) WithMetrics(metrics forge.Metrics) *TypedEventHandler
- func (h *TypedEventHandler) WithMiddleware(middleware ...HandlerMiddleware) *TypedEventHandler
- func (h *TypedEventHandler) WithRetryPolicy(policy *RetryPolicy) *TypedEventHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DomainEventHandler ¶
type DomainEventHandler struct {
*TypedEventHandler
// contains filtered or unexported fields
}
DomainEventHandler is a specialized handler for domain events
func NewDomainEventHandler ¶
func NewDomainEventHandler(name, aggregateType string, eventTypes []string, handler EventHandlerFunc) *DomainEventHandler
NewDomainEventHandler creates a new domain event handler
func (*DomainEventHandler) CanHandle ¶
func (dh *DomainEventHandler) CanHandle(event *Event) bool
CanHandle implements EventHandler with aggregate type checking
type Event ¶
type Event struct {
// ID is the unique identifier for the event
ID string `json:"id"`
// Type identifies the type of event (e.g., "user.created", "order.completed")
Type string `json:"type"`
// AggregateID identifies the aggregate root that produced this event
AggregateID string `json:"aggregate_id"`
// Data contains the event payload
Data interface{} `json:"data"`
// Metadata contains additional event information
Metadata map[string]interface{} `json:"metadata"`
// Timestamp when the event occurred
Timestamp time.Time `json:"timestamp"`
// Version for event versioning and schema evolution
Version int `json:"version"`
// Source indicates where the event originated
Source string `json:"source,omitempty"`
// CorrelationID for request correlation
CorrelationID string `json:"correlation_id,omitempty"`
// CausationID for event causation tracking
CausationID string `json:"causation_id,omitempty"`
}
Event represents a domain event in the system
func (*Event) GetMetadata ¶
GetMetadata retrieves metadata by key
func (*Event) GetMetadataString ¶
GetMetadataString retrieves string metadata by key
func (*Event) MarshalJSON ¶
MarshalJSON customizes JSON marshaling
func (*Event) UnmarshalJSON ¶
UnmarshalJSON customizes JSON unmarshaling
func (*Event) WithCausationID ¶
WithCausationID sets the causation ID for event causation tracking
func (*Event) WithCorrelationID ¶
WithCorrelationID sets the correlation ID for request tracking
func (*Event) WithMetadata ¶
WithMetadata adds metadata to the event
func (*Event) WithSource ¶
WithSource sets the event source
func (*Event) WithVersion ¶
WithVersion sets the event version for schema evolution
type EventBus ¶
type EventBus interface {
// Publish publishes an event to all registered brokers
Publish(ctx context.Context, event *Event) error
// PublishTo publishes an event to a specific broker
PublishTo(ctx context.Context, brokerName string, event *Event) error
// Subscribe subscribes to events of a specific type
Subscribe(eventType string, handler EventHandler) error
// Unsubscribe unsubscribes from events of a specific type
Unsubscribe(eventType string, handlerName string) error
// RegisterBroker registers a message broker
RegisterBroker(name string, broker MessageBroker) error
// UnregisterBroker unregisters a message broker
UnregisterBroker(name string) error
// GetBroker returns a broker by name
GetBroker(name string) (MessageBroker, error)
// GetBrokers returns all registered brokers
GetBrokers() map[string]MessageBroker
// SetDefaultBroker sets the default broker for publishing
SetDefaultBroker(name string) error
// GetStats returns event bus statistics
GetStats() map[string]interface{}
// Start starts the event bus
Start(ctx context.Context) error
// Stop stops the event bus
Stop(ctx context.Context) error
// HealthCheck checks the health of the event bus
HealthCheck(ctx context.Context) error
}
EventBus defines the interface for the event bus
type EventCollection ¶
type EventCollection struct {
Events []Event `json:"events"`
Total int `json:"total"`
Offset int64 `json:"offset"`
Limit int `json:"limit"`
}
EventCollection represents a collection of events
func NewEventCollection ¶
func NewEventCollection(events []Event, total int, offset int64, limit int) *EventCollection
NewEventCollection creates a new event collection
func (*EventCollection) Add ¶
func (ec *EventCollection) Add(event Event)
Add adds an event to the collection
func (*EventCollection) Filter ¶
func (ec *EventCollection) Filter(filter EventFilter) *EventCollection
Filter filters events in the collection
func (*EventCollection) SortByTimestamp ¶
func (ec *EventCollection) SortByTimestamp(ascending bool)
SortByTimestamp sorts events by timestamp
type EventCriteria ¶
type EventCriteria struct {
EventTypes []string `json:"event_types,omitempty"`
AggregateIDs []string `json:"aggregate_ids,omitempty"`
Sources []string `json:"sources,omitempty"`
StartTime *time.Time `json:"start_time,omitempty"`
EndTime *time.Time `json:"end_time,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int64 `json:"offset,omitempty"`
SortBy string `json:"sort_by,omitempty"` // timestamp, type, aggregate_id
SortOrder string `json:"sort_order,omitempty"` // asc, desc
}
EventCriteria defines criteria for querying events
func NewEventCriteria ¶
func NewEventCriteria() *EventCriteria
NewEventCriteria creates a new event criteria
func (*EventCriteria) Validate ¶
func (ec *EventCriteria) Validate() error
Validate validates the event criteria
func (*EventCriteria) WithAggregateIDs ¶
func (ec *EventCriteria) WithAggregateIDs(aggregateIDs ...string) *EventCriteria
WithAggregateIDs adds aggregate IDs to the criteria
func (*EventCriteria) WithEndTime ¶
func (ec *EventCriteria) WithEndTime(end time.Time) *EventCriteria
WithEndTime sets the end time for the criteria
func (*EventCriteria) WithEventTypes ¶
func (ec *EventCriteria) WithEventTypes(eventTypes ...string) *EventCriteria
WithEventTypes adds event types to the criteria
func (*EventCriteria) WithLimit ¶
func (ec *EventCriteria) WithLimit(limit int) *EventCriteria
WithLimit sets the limit for the criteria
func (*EventCriteria) WithMetadata ¶
func (ec *EventCriteria) WithMetadata(key string, value interface{}) *EventCriteria
WithMetadata adds metadata filter to the criteria
func (*EventCriteria) WithOffset ¶
func (ec *EventCriteria) WithOffset(offset int64) *EventCriteria
WithOffset sets the offset for the criteria
func (*EventCriteria) WithSort ¶
func (ec *EventCriteria) WithSort(sortBy, sortOrder string) *EventCriteria
WithSort sets the sort parameters for the criteria
func (*EventCriteria) WithSources ¶
func (ec *EventCriteria) WithSources(sources ...string) *EventCriteria
WithSources adds sources to the criteria
func (*EventCriteria) WithStartTime ¶
func (ec *EventCriteria) WithStartTime(start time.Time) *EventCriteria
WithStartTime sets the start time for the criteria
func (*EventCriteria) WithTimeRange ¶
func (ec *EventCriteria) WithTimeRange(start, end time.Time) *EventCriteria
WithTimeRange sets the time range for the criteria
type EventEnvelope ¶
type EventEnvelope struct {
Event *Event `json:"event"`
Topic string `json:"topic"`
Partition int `json:"partition,omitempty"`
Offset int64 `json:"offset,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
Retry int `json:"retry,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
EventEnvelope wraps an event with delivery metadata
func NewEventEnvelope ¶
func NewEventEnvelope(event *Event, topic string) *EventEnvelope
NewEventEnvelope creates a new event envelope
func (*EventEnvelope) WithHeader ¶
func (ee *EventEnvelope) WithHeader(key, value string) *EventEnvelope
WithHeader adds a header to the envelope
func (*EventEnvelope) WithOffset ¶
func (ee *EventEnvelope) WithOffset(offset int64) *EventEnvelope
WithOffset sets the offset for the envelope
func (*EventEnvelope) WithPartition ¶
func (ee *EventEnvelope) WithPartition(partition int) *EventEnvelope
WithPartition sets the partition for the envelope
type EventFilter ¶
EventFilter defines a filter for events
func AggregateFilter ¶
func AggregateFilter(aggregateIDs ...string) EventFilter
AggregateFilter creates a filter for specific aggregate IDs
func CombineFilters ¶
func CombineFilters(filters ...EventFilter) EventFilter
CombineFilters combines multiple filters with AND logic
func SourceFilter ¶
func SourceFilter(sources ...string) EventFilter
SourceFilter creates a filter for specific event sources
func TypeFilter ¶
func TypeFilter(eventTypes ...string) EventFilter
TypeFilter creates a filter for specific event types
type EventHandler ¶
type EventHandler interface {
// Handle processes an event
Handle(ctx context.Context, event *Event) error
// CanHandle checks if this handler can process the event
CanHandle(event *Event) bool
// Name returns the handler name
Name() string
}
EventHandler defines the interface for handling events
type EventHandlerFunc ¶
EventHandlerFunc is a function adapter for EventHandler
func (EventHandlerFunc) CanHandle ¶
func (f EventHandlerFunc) CanHandle(event *Event) bool
CanHandle implements EventHandler - always returns true for function handlers
type EventProjection ¶
type EventProjection interface {
// Name returns the projection name
Name() string
// When defines when this projection should be applied
When() map[string]EventHandler
// Apply applies events to the projection
Apply(ctx context.Context, event *Event) error
// Reset resets the projection state
Reset(ctx context.Context) error
// GetState returns the current projection state
GetState(ctx context.Context) (interface{}, error)
// SaveState saves the projection state
SaveState(ctx context.Context, state interface{}) error
}
EventProjection represents a projection of events
type EventStore ¶
type EventStore interface {
// SaveEvent persists a single event
SaveEvent(ctx context.Context, event *Event) error
// SaveEvents persists multiple events atomically
SaveEvents(ctx context.Context, events []*Event) error
// GetEvent retrieves a single event by ID
GetEvent(ctx context.Context, eventID string) (*Event, error)
// GetEvents retrieves events by various criteria
GetEvents(ctx context.Context, criteria EventCriteria) (*EventCollection, error)
// GetEventsByAggregate retrieves all events for a specific aggregate
GetEventsByAggregate(ctx context.Context, aggregateID string, fromVersion int) ([]*Event, error)
// GetEventsByType retrieves events of a specific type
GetEventsByType(ctx context.Context, eventType string, limit int, offset int64) ([]*Event, error)
// GetEventsSince retrieves events since a specific timestamp
GetEventsSince(ctx context.Context, since time.Time, limit int, offset int64) ([]*Event, error)
// GetEventsInRange retrieves events within a time range
GetEventsInRange(ctx context.Context, start, end time.Time, limit int, offset int64) ([]*Event, error)
// DeleteEvent deletes an event by ID (soft delete)
DeleteEvent(ctx context.Context, eventID string) error
// DeleteEventsByAggregate deletes all events for an aggregate (soft delete)
DeleteEventsByAggregate(ctx context.Context, aggregateID string) error
// GetLastEvent gets the last event for an aggregate
GetLastEvent(ctx context.Context, aggregateID string) (*Event, error)
// GetEventCount gets the total count of events
GetEventCount(ctx context.Context) (int64, error)
// GetEventCountByType gets the count of events by type
GetEventCountByType(ctx context.Context, eventType string) (int64, error)
// CreateSnapshot creates a snapshot of an aggregate's state
CreateSnapshot(ctx context.Context, snapshot *Snapshot) error
// GetSnapshot retrieves the latest snapshot for an aggregate
GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error)
// DeleteSnapshot deletes a snapshot
DeleteSnapshot(ctx context.Context, snapshotID string) error
// Close closes the event store connection
Close(ctx context.Context) error
// HealthCheck checks if the event store is healthy
HealthCheck(ctx context.Context) error
}
EventStore defines the interface for persisting and retrieving events
type EventStoreConfig ¶
type EventStoreConfig struct {
Type string `yaml:"type" json:"type"`
ConnectionName string `yaml:"connection_name" json:"connection_name"`
ConnectionString string `yaml:"connection_string" json:"connection_string"`
Database string `yaml:"database" json:"database"`
EventsTable string `yaml:"events_table" json:"events_table"`
SnapshotsTable string `yaml:"snapshots_table" json:"snapshots_table"`
MaxConnections int `yaml:"max_connections" json:"max_connections"`
ConnTimeout time.Duration `yaml:"connection_timeout" json:"connection_timeout"`
ReadTimeout time.Duration `yaml:"read_timeout" json:"read_timeout"`
WriteTimeout time.Duration `yaml:"write_timeout" json:"write_timeout"`
EnableMetrics bool `yaml:"enable_metrics" json:"enable_metrics"`
EnableTracing bool `yaml:"enable_tracing" json:"enable_tracing"`
}
EventStoreConfig defines configuration for event stores
func DefaultEventStoreConfig ¶
func DefaultEventStoreConfig() *EventStoreConfig
DefaultEventStoreConfig returns default configuration
func (*EventStoreConfig) Validate ¶
func (cfg *EventStoreConfig) Validate() error
Validate validates the event store configuration
type EventStoreMetrics ¶
type EventStoreMetrics struct {
EventsSaved int64 `json:"events_saved"`
EventsRead int64 `json:"events_read"`
SnapshotsCreated int64 `json:"snapshots_created"`
SnapshotsRead int64 `json:"snapshots_read"`
Errors int64 `json:"errors"`
AverageWriteTime time.Duration `json:"average_write_time"`
AverageReadTime time.Duration `json:"average_read_time"`
ConnectionsActive int `json:"connections_active"`
}
EventStoreMetrics defines metrics for event stores
type EventStoreMigration ¶
type EventStoreMigration interface {
// Version returns the migration version
Version() int
// Description returns the migration description
Description() string
// Up applies the migration
Up(ctx context.Context, store EventStore) error
// Down reverts the migration
Down(ctx context.Context, store EventStore) error
}
EventStoreMigration represents a migration for the event store
type EventStoreMigrator ¶
type EventStoreMigrator interface {
// AddMigration adds a migration
AddMigration(migration EventStoreMigration) error
// Migrate runs pending migrations
Migrate(ctx context.Context) error
// Rollback rolls back the last migration
Rollback(ctx context.Context) error
// GetVersion gets the current migration version
GetVersion(ctx context.Context) (int, error)
// GetPendingMigrations gets pending migrations
GetPendingMigrations(ctx context.Context) ([]EventStoreMigration, error)
}
EventStoreMigrator manages event store migrations
type EventStoreStats ¶
type EventStoreStats struct {
TotalEvents int64 `json:"total_events"`
EventsByType map[string]int64 `json:"events_by_type"`
TotalSnapshots int64 `json:"total_snapshots"`
SnapshotsByType map[string]int64 `json:"snapshots_by_type"`
OldestEvent *time.Time `json:"oldest_event,omitempty"`
NewestEvent *time.Time `json:"newest_event,omitempty"`
Metrics *EventStoreMetrics `json:"metrics"`
Health forge.HealthStatus `json:"health"`
ConnectionInfo map[string]interface{} `json:"connection_info"`
}
EventStoreStats represents statistics for an event store
type EventStoreTransaction ¶
type EventStoreTransaction interface {
// SaveEvent saves an event within the transaction
SaveEvent(ctx context.Context, event *Event) error
// SaveEvents saves multiple events within the transaction
SaveEvents(ctx context.Context, events []*Event) error
// CreateSnapshot creates a snapshot within the transaction
CreateSnapshot(ctx context.Context, snapshot *Snapshot) error
// Commit commits the transaction
Commit(ctx context.Context) error
// Rollback rolls back the transaction
Rollback(ctx context.Context) error
}
EventStoreTransaction represents a transaction in the event store
type EventStream ¶
type EventStream interface {
// Subscribe subscribes to the event stream
Subscribe(ctx context.Context, handler EventStreamHandler) error
// Unsubscribe unsubscribes from the event stream
Unsubscribe(ctx context.Context) error
// Position returns the current stream position
Position() int64
// SeekTo seeks to a specific position in the stream
SeekTo(position int64) error
// Close closes the event stream
Close() error
}
EventStream represents a stream of events
type EventStreamConfig ¶
type EventStreamConfig struct {
StartPosition int64 `yaml:"start_position" json:"start_position"`
BatchSize int `yaml:"batch_size" json:"batch_size"`
BufferSize int `yaml:"buffer_size" json:"buffer_size"`
PollInterval time.Duration `yaml:"poll_interval" json:"poll_interval"`
MaxRetries int `yaml:"max_retries" json:"max_retries"`
RetryDelay time.Duration `yaml:"retry_delay" json:"retry_delay"`
}
EventStreamConfig defines configuration for event streams
func DefaultEventStreamConfig ¶
func DefaultEventStreamConfig() *EventStreamConfig
DefaultEventStreamConfig returns default stream configuration
type EventStreamHandler ¶
type EventStreamHandler interface {
// HandleEvent handles an event from the stream
HandleEvent(ctx context.Context, event *Event) error
// HandleError handles errors from the stream
HandleError(ctx context.Context, err error)
// HandleEnd handles the end of the stream
HandleEnd(ctx context.Context)
}
EventStreamHandler handles events from an event stream
type HandlerMiddleware ¶
type HandlerMiddleware func(next EventHandlerFunc) EventHandlerFunc
HandlerMiddleware defines middleware for event handlers
func LoggingMiddleware ¶
func LoggingMiddleware(l forge.Logger) HandlerMiddleware
LoggingMiddleware creates logging middleware for handlers
func MetricsMiddleware ¶
func MetricsMiddleware(metrics forge.Metrics) HandlerMiddleware
MetricsMiddleware creates metrics middleware for handlers
func ValidationMiddleware ¶
func ValidationMiddleware() HandlerMiddleware
ValidationMiddleware creates validation middleware for handlers
type HandlerRegistry ¶
type HandlerRegistry struct {
// contains filtered or unexported fields
}
HandlerRegistry manages event handlers
func NewHandlerRegistry ¶
func NewHandlerRegistry(logger forge.Logger, metrics forge.Metrics) *HandlerRegistry
NewHandlerRegistry creates a new handler registry
func (*HandlerRegistry) GetAllHandlers ¶
func (hr *HandlerRegistry) GetAllHandlers() map[string][]EventHandler
GetAllHandlers returns all registered handlers
func (*HandlerRegistry) GetHandlers ¶
func (hr *HandlerRegistry) GetHandlers(eventType string) []EventHandler
GetHandlers returns all handlers for an event type
func (*HandlerRegistry) HandleEvent ¶
func (hr *HandlerRegistry) HandleEvent(ctx context.Context, event *Event) error
HandleEvent dispatches an event to all registered handlers
func (*HandlerRegistry) Register ¶
func (hr *HandlerRegistry) Register(eventType string, handler EventHandler) error
Register registers a handler for specific event types
func (*HandlerRegistry) Stats ¶
func (hr *HandlerRegistry) Stats() map[string]interface{}
Stats returns handler registry statistics
func (*HandlerRegistry) Unregister ¶
func (hr *HandlerRegistry) Unregister(eventType string, handlerName string) error
Unregister removes a handler for an event type
type MessageBroker ¶
type MessageBroker interface {
// Connect connects to the message broker
Connect(ctx context.Context, config interface{}) error
// Publish publishes an event to a topic
Publish(ctx context.Context, topic string, event Event) error
// Subscribe subscribes to a topic
Subscribe(ctx context.Context, topic string, handler EventHandler) error
// Unsubscribe unsubscribes from a topic
Unsubscribe(ctx context.Context, topic string, handlerName string) error
// Close closes the connection to the message broker
Close(ctx context.Context) error
// HealthCheck checks if the broker is healthy
HealthCheck(ctx context.Context) error
// GetStats returns broker statistics
GetStats() map[string]interface{}
}
MessageBroker defines the interface for message brokers
type ProjectionManager ¶
type ProjectionManager interface {
// RegisterProjection registers a projection
RegisterProjection(projection EventProjection) error
// UnregisterProjection unregisters a projection
UnregisterProjection(name string) error
// GetProjection gets a projection by name
GetProjection(name string) (EventProjection, error)
// GetProjections gets all registered projections
GetProjections() []EventProjection
// RebuildProjection rebuilds a projection from scratch
RebuildProjection(ctx context.Context, name string) error
// RebuildAllProjections rebuilds all projections
RebuildAllProjections(ctx context.Context) error
// Start starts the projection manager
Start(ctx context.Context) error
// Stop stops the projection manager
Stop(ctx context.Context) error
}
ProjectionManager manages event projections
type ReflectionEventHandler ¶
type ReflectionEventHandler struct {
// contains filtered or unexported fields
}
ReflectionEventHandler uses reflection to call methods based on event type
func NewReflectionEventHandler ¶
func NewReflectionEventHandler(name string, target interface{}) *ReflectionEventHandler
NewReflectionEventHandler creates a new reflection-based event handler
func (*ReflectionEventHandler) CanHandle ¶
func (rh *ReflectionEventHandler) CanHandle(event *Event) bool
CanHandle implements EventHandler
func (*ReflectionEventHandler) Handle ¶
func (rh *ReflectionEventHandler) Handle(ctx context.Context, event *Event) error
Handle implements EventHandler using reflection
func (*ReflectionEventHandler) Name ¶
func (rh *ReflectionEventHandler) Name() string
Name implements EventHandler
type RetryPolicy ¶
type RetryPolicy struct {
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
RetryableErrors []error
ShouldRetryFunc func(error) bool
}
RetryPolicy defines retry behavior for event handlers
func NewRetryPolicy ¶
func NewRetryPolicy(maxRetries int, initialDelay time.Duration) *RetryPolicy
NewRetryPolicy creates a new retry policy
func (*RetryPolicy) CalculateDelay ¶
func (rp *RetryPolicy) CalculateDelay(attempt int) time.Duration
CalculateDelay calculates the delay for a retry attempt
func (*RetryPolicy) ShouldRetry ¶
func (rp *RetryPolicy) ShouldRetry(err error) bool
ShouldRetry determines if an error should trigger a retry
func (*RetryPolicy) WithBackoffFactor ¶
func (rp *RetryPolicy) WithBackoffFactor(factor float64) *RetryPolicy
WithBackoffFactor sets the backoff factor
func (*RetryPolicy) WithMaxDelay ¶
func (rp *RetryPolicy) WithMaxDelay(maxDelay time.Duration) *RetryPolicy
WithMaxDelay sets the maximum delay between retries
func (*RetryPolicy) WithRetryableErrors ¶
func (rp *RetryPolicy) WithRetryableErrors(errors ...error) *RetryPolicy
WithRetryableErrors sets specific errors that should trigger retries
func (*RetryPolicy) WithShouldRetryFunc ¶
func (rp *RetryPolicy) WithShouldRetryFunc(fn func(error) bool) *RetryPolicy
WithShouldRetryFunc sets a custom function to determine if an error should trigger a retry
type Snapshot ¶
type Snapshot struct {
ID string `json:"id"`
AggregateID string `json:"aggregate_id"`
Type string `json:"type"`
Data interface{} `json:"data"`
Version int `json:"version"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
Snapshot represents a point-in-time snapshot of aggregate state
func NewSnapshot ¶
NewSnapshot creates a new snapshot
func (*Snapshot) WithMetadata ¶
WithMetadata adds metadata to the snapshot
type TransactionalEventStore ¶
type TransactionalEventStore interface {
EventStore
// BeginTransaction begins a new transaction
BeginTransaction(ctx context.Context) (EventStoreTransaction, error)
// ExecuteInTransaction executes a function within a transaction
ExecuteInTransaction(ctx context.Context, fn func(tx EventStoreTransaction) error) error
}
TransactionalEventStore extends EventStore with transaction support
type TypedEventHandler ¶
type TypedEventHandler struct {
// contains filtered or unexported fields
}
TypedEventHandler is a handler for specific event types
func NewTypedEventHandler ¶
func NewTypedEventHandler(name string, eventTypes []string, handler EventHandlerFunc) *TypedEventHandler
NewTypedEventHandler creates a new typed event handler
func (*TypedEventHandler) CanHandle ¶
func (h *TypedEventHandler) CanHandle(event *Event) bool
CanHandle implements EventHandler
func (*TypedEventHandler) Handle ¶
func (h *TypedEventHandler) Handle(ctx context.Context, event *Event) error
Handle implements EventHandler
func (*TypedEventHandler) Name ¶
func (h *TypedEventHandler) Name() string
Name implements EventHandler
func (*TypedEventHandler) WithLogger ¶
func (h *TypedEventHandler) WithLogger(logger forge.Logger) *TypedEventHandler
WithLogger sets the logger for the handler
func (*TypedEventHandler) WithMetrics ¶
func (h *TypedEventHandler) WithMetrics(metrics forge.Metrics) *TypedEventHandler
WithMetrics sets the metrics collector for the handler
func (*TypedEventHandler) WithMiddleware ¶
func (h *TypedEventHandler) WithMiddleware(middleware ...HandlerMiddleware) *TypedEventHandler
WithMiddleware adds middleware to the handler
func (*TypedEventHandler) WithRetryPolicy ¶
func (h *TypedEventHandler) WithRetryPolicy(policy *RetryPolicy) *TypedEventHandler
WithRetryPolicy sets the retry policy for the handler