core

package
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DomainEventHandler

type DomainEventHandler struct {
	*TypedEventHandler
	// contains filtered or unexported fields
}

DomainEventHandler is a specialized handler for domain events

func NewDomainEventHandler

func NewDomainEventHandler(name, aggregateType string, eventTypes []string, handler EventHandlerFunc) *DomainEventHandler

NewDomainEventHandler creates a new domain event handler

func (*DomainEventHandler) CanHandle

func (dh *DomainEventHandler) CanHandle(event *Event) bool

CanHandle implements EventHandler with aggregate type checking

type Event

type Event struct {
	// ID is the unique identifier for the event
	ID string `json:"id"`

	// Type identifies the type of event (e.g., "user.created", "order.completed")
	Type string `json:"type"`

	// AggregateID identifies the aggregate root that produced this event
	AggregateID string `json:"aggregate_id"`

	// Data contains the event payload
	Data interface{} `json:"data"`

	// Metadata contains additional event information
	Metadata map[string]interface{} `json:"metadata"`

	// Timestamp when the event occurred
	Timestamp time.Time `json:"timestamp"`

	// Version for event versioning and schema evolution
	Version int `json:"version"`

	// Source indicates where the event originated
	Source string `json:"source,omitempty"`

	// CorrelationID for request correlation
	CorrelationID string `json:"correlation_id,omitempty"`

	// CausationID for event causation tracking
	CausationID string `json:"causation_id,omitempty"`
}

Event represents a domain event in the system

func NewEvent

func NewEvent(eventType, aggregateID string, data interface{}) *Event

NewEvent creates a new event with required fields

func (*Event) Clone

func (e *Event) Clone() *Event

Clone creates a deep copy of the event

func (*Event) GetMetadata

func (e *Event) GetMetadata(key string) (interface{}, bool)

GetMetadata retrieves metadata by key

func (*Event) GetMetadataString

func (e *Event) GetMetadataString(key string) string

GetMetadataString retrieves string metadata by key

func (*Event) MarshalJSON

func (e *Event) MarshalJSON() ([]byte, error)

MarshalJSON customizes JSON marshaling

func (*Event) String

func (e *Event) String() string

String returns a string representation of the event

func (*Event) UnmarshalJSON

func (e *Event) UnmarshalJSON(data []byte) error

UnmarshalJSON customizes JSON unmarshaling

func (*Event) Validate

func (e *Event) Validate() error

Validate validates the event structure

func (*Event) WithCausationID

func (e *Event) WithCausationID(causationID string) *Event

WithCausationID sets the causation ID for event causation tracking

func (*Event) WithCorrelationID

func (e *Event) WithCorrelationID(correlationID string) *Event

WithCorrelationID sets the correlation ID for request tracking

func (*Event) WithMetadata

func (e *Event) WithMetadata(key string, value interface{}) *Event

WithMetadata adds metadata to the event

func (*Event) WithSource

func (e *Event) WithSource(source string) *Event

WithSource sets the event source

func (*Event) WithVersion

func (e *Event) WithVersion(version int) *Event

WithVersion sets the event version for schema evolution

type EventBus

type EventBus interface {
	// Publish publishes an event to all registered brokers
	Publish(ctx context.Context, event *Event) error

	// PublishTo publishes an event to a specific broker
	PublishTo(ctx context.Context, brokerName string, event *Event) error

	// Subscribe subscribes to events of a specific type
	Subscribe(eventType string, handler EventHandler) error

	// Unsubscribe unsubscribes from events of a specific type
	Unsubscribe(eventType string, handlerName string) error

	// RegisterBroker registers a message broker
	RegisterBroker(name string, broker MessageBroker) error

	// UnregisterBroker unregisters a message broker
	UnregisterBroker(name string) error

	// GetBroker returns a broker by name
	GetBroker(name string) (MessageBroker, error)

	// GetBrokers returns all registered brokers
	GetBrokers() map[string]MessageBroker

	// SetDefaultBroker sets the default broker for publishing
	SetDefaultBroker(name string) error

	// GetStats returns event bus statistics
	GetStats() map[string]interface{}

	// Start starts the event bus
	Start(ctx context.Context) error

	// Stop stops the event bus
	Stop(ctx context.Context) error

	// HealthCheck checks the health of the event bus
	HealthCheck(ctx context.Context) error
}

EventBus defines the interface for the event bus

type EventCollection

type EventCollection struct {
	Events []Event `json:"events"`
	Total  int     `json:"total"`
	Offset int64   `json:"offset"`
	Limit  int     `json:"limit"`
}

EventCollection represents a collection of events

func NewEventCollection

func NewEventCollection(events []Event, total int, offset int64, limit int) *EventCollection

NewEventCollection creates a new event collection

func (*EventCollection) Add

func (ec *EventCollection) Add(event Event)

Add adds an event to the collection

func (*EventCollection) Filter

func (ec *EventCollection) Filter(filter EventFilter) *EventCollection

Filter filters events in the collection

func (*EventCollection) SortByTimestamp

func (ec *EventCollection) SortByTimestamp(ascending bool)

SortByTimestamp sorts events by timestamp

type EventCriteria

type EventCriteria struct {
	EventTypes   []string               `json:"event_types,omitempty"`
	AggregateIDs []string               `json:"aggregate_ids,omitempty"`
	Sources      []string               `json:"sources,omitempty"`
	StartTime    *time.Time             `json:"start_time,omitempty"`
	EndTime      *time.Time             `json:"end_time,omitempty"`
	Metadata     map[string]interface{} `json:"metadata,omitempty"`
	Limit        int                    `json:"limit,omitempty"`
	Offset       int64                  `json:"offset,omitempty"`
	SortBy       string                 `json:"sort_by,omitempty"`    // timestamp, type, aggregate_id
	SortOrder    string                 `json:"sort_order,omitempty"` // asc, desc
}

EventCriteria defines criteria for querying events

func NewEventCriteria

func NewEventCriteria() *EventCriteria

NewEventCriteria creates a new event criteria

func (*EventCriteria) Validate

func (ec *EventCriteria) Validate() error

Validate validates the event criteria

func (*EventCriteria) WithAggregateIDs

func (ec *EventCriteria) WithAggregateIDs(aggregateIDs ...string) *EventCriteria

WithAggregateIDs adds aggregate IDs to the criteria

func (*EventCriteria) WithEndTime

func (ec *EventCriteria) WithEndTime(end time.Time) *EventCriteria

WithEndTime sets the end time for the criteria

func (*EventCriteria) WithEventTypes

func (ec *EventCriteria) WithEventTypes(eventTypes ...string) *EventCriteria

WithEventTypes adds event types to the criteria

func (*EventCriteria) WithLimit

func (ec *EventCriteria) WithLimit(limit int) *EventCriteria

WithLimit sets the limit for the criteria

func (*EventCriteria) WithMetadata

func (ec *EventCriteria) WithMetadata(key string, value interface{}) *EventCriteria

WithMetadata adds metadata filter to the criteria

func (*EventCriteria) WithOffset

func (ec *EventCriteria) WithOffset(offset int64) *EventCriteria

WithOffset sets the offset for the criteria

func (*EventCriteria) WithSort

func (ec *EventCriteria) WithSort(sortBy, sortOrder string) *EventCriteria

WithSort sets the sort parameters for the criteria

func (*EventCriteria) WithSources

func (ec *EventCriteria) WithSources(sources ...string) *EventCriteria

WithSources adds sources to the criteria

func (*EventCriteria) WithStartTime

func (ec *EventCriteria) WithStartTime(start time.Time) *EventCriteria

WithStartTime sets the start time for the criteria

func (*EventCriteria) WithTimeRange

func (ec *EventCriteria) WithTimeRange(start, end time.Time) *EventCriteria

WithTimeRange sets the time range for the criteria

type EventEnvelope

type EventEnvelope struct {
	Event     *Event                 `json:"event"`
	Topic     string                 `json:"topic"`
	Partition int                    `json:"partition,omitempty"`
	Offset    int64                  `json:"offset,omitempty"`
	Headers   map[string]string      `json:"headers,omitempty"`
	Retry     int                    `json:"retry,omitempty"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
}

EventEnvelope wraps an event with delivery metadata

func NewEventEnvelope

func NewEventEnvelope(event *Event, topic string) *EventEnvelope

NewEventEnvelope creates a new event envelope

func (*EventEnvelope) WithHeader

func (ee *EventEnvelope) WithHeader(key, value string) *EventEnvelope

WithHeader adds a header to the envelope

func (*EventEnvelope) WithOffset

func (ee *EventEnvelope) WithOffset(offset int64) *EventEnvelope

WithOffset sets the offset for the envelope

func (*EventEnvelope) WithPartition

func (ee *EventEnvelope) WithPartition(partition int) *EventEnvelope

WithPartition sets the partition for the envelope

type EventFilter

type EventFilter func(event *Event) bool

EventFilter defines a filter for events

func AggregateFilter

func AggregateFilter(aggregateIDs ...string) EventFilter

AggregateFilter creates a filter for specific aggregate IDs

func CombineFilters

func CombineFilters(filters ...EventFilter) EventFilter

CombineFilters combines multiple filters with AND logic

func SourceFilter

func SourceFilter(sources ...string) EventFilter

SourceFilter creates a filter for specific event sources

func TypeFilter

func TypeFilter(eventTypes ...string) EventFilter

TypeFilter creates a filter for specific event types

type EventHandler

type EventHandler interface {
	// Handle processes an event
	Handle(ctx context.Context, event *Event) error

	// CanHandle checks if this handler can process the event
	CanHandle(event *Event) bool

	// Name returns the handler name
	Name() string
}

EventHandler defines the interface for handling events

type EventHandlerFunc

type EventHandlerFunc func(ctx context.Context, event *Event) error

EventHandlerFunc is a function adapter for EventHandler

func (EventHandlerFunc) CanHandle

func (f EventHandlerFunc) CanHandle(event *Event) bool

CanHandle implements EventHandler - always returns true for function handlers

func (EventHandlerFunc) Handle

func (f EventHandlerFunc) Handle(ctx context.Context, event *Event) error

Handle implements EventHandler

func (EventHandlerFunc) Name

func (f EventHandlerFunc) Name() string

Name implements EventHandler

type EventProjection

type EventProjection interface {
	// Name returns the projection name
	Name() string

	// When defines when this projection should be applied
	When() map[string]EventHandler

	// Apply applies events to the projection
	Apply(ctx context.Context, event *Event) error

	// Reset resets the projection state
	Reset(ctx context.Context) error

	// GetState returns the current projection state
	GetState(ctx context.Context) (interface{}, error)

	// SaveState saves the projection state
	SaveState(ctx context.Context, state interface{}) error
}

EventProjection represents a projection of events

type EventStore

type EventStore interface {
	// SaveEvent persists a single event
	SaveEvent(ctx context.Context, event *Event) error

	// SaveEvents persists multiple events atomically
	SaveEvents(ctx context.Context, events []*Event) error

	// GetEvent retrieves a single event by ID
	GetEvent(ctx context.Context, eventID string) (*Event, error)

	// GetEvents retrieves events by various criteria
	GetEvents(ctx context.Context, criteria EventCriteria) (*EventCollection, error)

	// GetEventsByAggregate retrieves all events for a specific aggregate
	GetEventsByAggregate(ctx context.Context, aggregateID string, fromVersion int) ([]*Event, error)

	// GetEventsByType retrieves events of a specific type
	GetEventsByType(ctx context.Context, eventType string, limit int, offset int64) ([]*Event, error)

	// GetEventsSince retrieves events since a specific timestamp
	GetEventsSince(ctx context.Context, since time.Time, limit int, offset int64) ([]*Event, error)

	// GetEventsInRange retrieves events within a time range
	GetEventsInRange(ctx context.Context, start, end time.Time, limit int, offset int64) ([]*Event, error)

	// DeleteEvent deletes an event by ID (soft delete)
	DeleteEvent(ctx context.Context, eventID string) error

	// DeleteEventsByAggregate deletes all events for an aggregate (soft delete)
	DeleteEventsByAggregate(ctx context.Context, aggregateID string) error

	// GetLastEvent gets the last event for an aggregate
	GetLastEvent(ctx context.Context, aggregateID string) (*Event, error)

	// GetEventCount gets the total count of events
	GetEventCount(ctx context.Context) (int64, error)

	// GetEventCountByType gets the count of events by type
	GetEventCountByType(ctx context.Context, eventType string) (int64, error)

	// CreateSnapshot creates a snapshot of an aggregate's state
	CreateSnapshot(ctx context.Context, snapshot *Snapshot) error

	// GetSnapshot retrieves the latest snapshot for an aggregate
	GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error)

	// DeleteSnapshot deletes a snapshot
	DeleteSnapshot(ctx context.Context, snapshotID string) error

	// Close closes the event store connection
	Close(ctx context.Context) error

	// HealthCheck checks if the event store is healthy
	HealthCheck(ctx context.Context) error
}

EventStore defines the interface for persisting and retrieving events

type EventStoreConfig

type EventStoreConfig struct {
	Type             string        `yaml:"type" json:"type"`
	ConnectionName   string        `yaml:"connection_name" json:"connection_name"`
	ConnectionString string        `yaml:"connection_string" json:"connection_string"`
	Database         string        `yaml:"database" json:"database"`
	EventsTable      string        `yaml:"events_table" json:"events_table"`
	SnapshotsTable   string        `yaml:"snapshots_table" json:"snapshots_table"`
	MaxConnections   int           `yaml:"max_connections" json:"max_connections"`
	ConnTimeout      time.Duration `yaml:"connection_timeout" json:"connection_timeout"`
	ReadTimeout      time.Duration `yaml:"read_timeout" json:"read_timeout"`
	WriteTimeout     time.Duration `yaml:"write_timeout" json:"write_timeout"`
	EnableMetrics    bool          `yaml:"enable_metrics" json:"enable_metrics"`
	EnableTracing    bool          `yaml:"enable_tracing" json:"enable_tracing"`
}

EventStoreConfig defines configuration for event stores

func DefaultEventStoreConfig

func DefaultEventStoreConfig() *EventStoreConfig

DefaultEventStoreConfig returns default configuration

func (*EventStoreConfig) Validate

func (cfg *EventStoreConfig) Validate() error

Validate validates the event store configuration

type EventStoreMetrics

type EventStoreMetrics struct {
	EventsSaved       int64         `json:"events_saved"`
	EventsRead        int64         `json:"events_read"`
	SnapshotsCreated  int64         `json:"snapshots_created"`
	SnapshotsRead     int64         `json:"snapshots_read"`
	Errors            int64         `json:"errors"`
	AverageWriteTime  time.Duration `json:"average_write_time"`
	AverageReadTime   time.Duration `json:"average_read_time"`
	ConnectionsActive int           `json:"connections_active"`
}

EventStoreMetrics defines metrics for event stores

type EventStoreMigration

type EventStoreMigration interface {
	// Version returns the migration version
	Version() int

	// Description returns the migration description
	Description() string

	// Up applies the migration
	Up(ctx context.Context, store EventStore) error

	// Down reverts the migration
	Down(ctx context.Context, store EventStore) error
}

EventStoreMigration represents a migration for the event store

type EventStoreMigrator

type EventStoreMigrator interface {
	// AddMigration adds a migration
	AddMigration(migration EventStoreMigration) error

	// Migrate runs pending migrations
	Migrate(ctx context.Context) error

	// Rollback rolls back the last migration
	Rollback(ctx context.Context) error

	// GetVersion gets the current migration version
	GetVersion(ctx context.Context) (int, error)

	// GetPendingMigrations gets pending migrations
	GetPendingMigrations(ctx context.Context) ([]EventStoreMigration, error)
}

EventStoreMigrator manages event store migrations

type EventStoreStats

type EventStoreStats struct {
	TotalEvents     int64                  `json:"total_events"`
	EventsByType    map[string]int64       `json:"events_by_type"`
	TotalSnapshots  int64                  `json:"total_snapshots"`
	SnapshotsByType map[string]int64       `json:"snapshots_by_type"`
	OldestEvent     *time.Time             `json:"oldest_event,omitempty"`
	NewestEvent     *time.Time             `json:"newest_event,omitempty"`
	Metrics         *EventStoreMetrics     `json:"metrics"`
	Health          forge.HealthStatus     `json:"health"`
	ConnectionInfo  map[string]interface{} `json:"connection_info"`
}

EventStoreStats represents statistics for an event store

type EventStoreTransaction

type EventStoreTransaction interface {
	// SaveEvent saves an event within the transaction
	SaveEvent(ctx context.Context, event *Event) error

	// SaveEvents saves multiple events within the transaction
	SaveEvents(ctx context.Context, events []*Event) error

	// CreateSnapshot creates a snapshot within the transaction
	CreateSnapshot(ctx context.Context, snapshot *Snapshot) error

	// Commit commits the transaction
	Commit(ctx context.Context) error

	// Rollback rolls back the transaction
	Rollback(ctx context.Context) error
}

EventStoreTransaction represents a transaction in the event store

type EventStream

type EventStream interface {
	// Subscribe subscribes to the event stream
	Subscribe(ctx context.Context, handler EventStreamHandler) error

	// Unsubscribe unsubscribes from the event stream
	Unsubscribe(ctx context.Context) error

	// Position returns the current stream position
	Position() int64

	// SeekTo seeks to a specific position in the stream
	SeekTo(position int64) error

	// Close closes the event stream
	Close() error
}

EventStream represents a stream of events

type EventStreamConfig

type EventStreamConfig struct {
	StartPosition int64         `yaml:"start_position" json:"start_position"`
	BatchSize     int           `yaml:"batch_size" json:"batch_size"`
	BufferSize    int           `yaml:"buffer_size" json:"buffer_size"`
	PollInterval  time.Duration `yaml:"poll_interval" json:"poll_interval"`
	MaxRetries    int           `yaml:"max_retries" json:"max_retries"`
	RetryDelay    time.Duration `yaml:"retry_delay" json:"retry_delay"`
}

EventStreamConfig defines configuration for event streams

func DefaultEventStreamConfig

func DefaultEventStreamConfig() *EventStreamConfig

DefaultEventStreamConfig returns default stream configuration

type EventStreamHandler

type EventStreamHandler interface {
	// HandleEvent handles an event from the stream
	HandleEvent(ctx context.Context, event *Event) error

	// HandleError handles errors from the stream
	HandleError(ctx context.Context, err error)

	// HandleEnd handles the end of the stream
	HandleEnd(ctx context.Context)
}

EventStreamHandler handles events from an event stream

type HandlerMiddleware

type HandlerMiddleware func(next EventHandlerFunc) EventHandlerFunc

HandlerMiddleware defines middleware for event handlers

func LoggingMiddleware

func LoggingMiddleware(l forge.Logger) HandlerMiddleware

LoggingMiddleware creates logging middleware for handlers

func MetricsMiddleware

func MetricsMiddleware(metrics forge.Metrics) HandlerMiddleware

MetricsMiddleware creates metrics middleware for handlers

func ValidationMiddleware

func ValidationMiddleware() HandlerMiddleware

ValidationMiddleware creates validation middleware for handlers

type HandlerRegistry

type HandlerRegistry struct {
	// contains filtered or unexported fields
}

HandlerRegistry manages event handlers

func NewHandlerRegistry

func NewHandlerRegistry(logger forge.Logger, metrics forge.Metrics) *HandlerRegistry

NewHandlerRegistry creates a new handler registry

func (*HandlerRegistry) GetAllHandlers

func (hr *HandlerRegistry) GetAllHandlers() map[string][]EventHandler

GetAllHandlers returns all registered handlers

func (*HandlerRegistry) GetHandlers

func (hr *HandlerRegistry) GetHandlers(eventType string) []EventHandler

GetHandlers returns all handlers for an event type

func (*HandlerRegistry) HandleEvent

func (hr *HandlerRegistry) HandleEvent(ctx context.Context, event *Event) error

HandleEvent dispatches an event to all registered handlers

func (*HandlerRegistry) Register

func (hr *HandlerRegistry) Register(eventType string, handler EventHandler) error

Register registers a handler for specific event types

func (*HandlerRegistry) Stats

func (hr *HandlerRegistry) Stats() map[string]interface{}

Stats returns handler registry statistics

func (*HandlerRegistry) Unregister

func (hr *HandlerRegistry) Unregister(eventType string, handlerName string) error

Unregister removes a handler for an event type

type MessageBroker

type MessageBroker interface {
	// Connect connects to the message broker
	Connect(ctx context.Context, config interface{}) error

	// Publish publishes an event to a topic
	Publish(ctx context.Context, topic string, event Event) error

	// Subscribe subscribes to a topic
	Subscribe(ctx context.Context, topic string, handler EventHandler) error

	// Unsubscribe unsubscribes from a topic
	Unsubscribe(ctx context.Context, topic string, handlerName string) error

	// Close closes the connection to the message broker
	Close(ctx context.Context) error

	// HealthCheck checks if the broker is healthy
	HealthCheck(ctx context.Context) error

	// GetStats returns broker statistics
	GetStats() map[string]interface{}
}

MessageBroker defines the interface for message brokers

type ProjectionManager

type ProjectionManager interface {
	// RegisterProjection registers a projection
	RegisterProjection(projection EventProjection) error

	// UnregisterProjection unregisters a projection
	UnregisterProjection(name string) error

	// GetProjection gets a projection by name
	GetProjection(name string) (EventProjection, error)

	// GetProjections gets all registered projections
	GetProjections() []EventProjection

	// RebuildProjection rebuilds a projection from scratch
	RebuildProjection(ctx context.Context, name string) error

	// RebuildAllProjections rebuilds all projections
	RebuildAllProjections(ctx context.Context) error

	// Start starts the projection manager
	Start(ctx context.Context) error

	// Stop stops the projection manager
	Stop(ctx context.Context) error
}

ProjectionManager manages event projections

type ReflectionEventHandler

type ReflectionEventHandler struct {
	// contains filtered or unexported fields
}

ReflectionEventHandler uses reflection to call methods based on event type

func NewReflectionEventHandler

func NewReflectionEventHandler(name string, target interface{}) *ReflectionEventHandler

NewReflectionEventHandler creates a new reflection-based event handler

func (*ReflectionEventHandler) CanHandle

func (rh *ReflectionEventHandler) CanHandle(event *Event) bool

CanHandle implements EventHandler

func (*ReflectionEventHandler) Handle

func (rh *ReflectionEventHandler) Handle(ctx context.Context, event *Event) error

Handle implements EventHandler using reflection

func (*ReflectionEventHandler) Name

func (rh *ReflectionEventHandler) Name() string

Name implements EventHandler

type RetryPolicy

type RetryPolicy struct {
	MaxRetries      int
	InitialDelay    time.Duration
	MaxDelay        time.Duration
	BackoffFactor   float64
	RetryableErrors []error
	ShouldRetryFunc func(error) bool
}

RetryPolicy defines retry behavior for event handlers

func NewRetryPolicy

func NewRetryPolicy(maxRetries int, initialDelay time.Duration) *RetryPolicy

NewRetryPolicy creates a new retry policy

func (*RetryPolicy) CalculateDelay

func (rp *RetryPolicy) CalculateDelay(attempt int) time.Duration

CalculateDelay calculates the delay for a retry attempt

func (*RetryPolicy) ShouldRetry

func (rp *RetryPolicy) ShouldRetry(err error) bool

ShouldRetry determines if an error should trigger a retry

func (*RetryPolicy) WithBackoffFactor

func (rp *RetryPolicy) WithBackoffFactor(factor float64) *RetryPolicy

WithBackoffFactor sets the backoff factor

func (*RetryPolicy) WithMaxDelay

func (rp *RetryPolicy) WithMaxDelay(maxDelay time.Duration) *RetryPolicy

WithMaxDelay sets the maximum delay between retries

func (*RetryPolicy) WithRetryableErrors

func (rp *RetryPolicy) WithRetryableErrors(errors ...error) *RetryPolicy

WithRetryableErrors sets specific errors that should trigger retries

func (*RetryPolicy) WithShouldRetryFunc

func (rp *RetryPolicy) WithShouldRetryFunc(fn func(error) bool) *RetryPolicy

WithShouldRetryFunc sets a custom function to determine if an error should trigger a retry

type Snapshot

type Snapshot struct {
	ID          string                 `json:"id"`
	AggregateID string                 `json:"aggregate_id"`
	Type        string                 `json:"type"`
	Data        interface{}            `json:"data"`
	Version     int                    `json:"version"`
	Timestamp   time.Time              `json:"timestamp"`
	Metadata    map[string]interface{} `json:"metadata,omitempty"`
}

Snapshot represents a point-in-time snapshot of aggregate state

func NewSnapshot

func NewSnapshot(aggregateID, snapshotType string, data interface{}, version int) *Snapshot

NewSnapshot creates a new snapshot

func (*Snapshot) Validate

func (s *Snapshot) Validate() error

Validate validates the snapshot

func (*Snapshot) WithMetadata

func (s *Snapshot) WithMetadata(key string, value interface{}) *Snapshot

WithMetadata adds metadata to the snapshot

type TransactionalEventStore

type TransactionalEventStore interface {
	EventStore

	// BeginTransaction begins a new transaction
	BeginTransaction(ctx context.Context) (EventStoreTransaction, error)

	// ExecuteInTransaction executes a function within a transaction
	ExecuteInTransaction(ctx context.Context, fn func(tx EventStoreTransaction) error) error
}

TransactionalEventStore extends EventStore with transaction support

type TypedEventHandler

type TypedEventHandler struct {
	// contains filtered or unexported fields
}

TypedEventHandler is a handler for specific event types

func NewTypedEventHandler

func NewTypedEventHandler(name string, eventTypes []string, handler EventHandlerFunc) *TypedEventHandler

NewTypedEventHandler creates a new typed event handler

func (*TypedEventHandler) CanHandle

func (h *TypedEventHandler) CanHandle(event *Event) bool

CanHandle implements EventHandler

func (*TypedEventHandler) Handle

func (h *TypedEventHandler) Handle(ctx context.Context, event *Event) error

Handle implements EventHandler

func (*TypedEventHandler) Name

func (h *TypedEventHandler) Name() string

Name implements EventHandler

func (*TypedEventHandler) WithLogger

func (h *TypedEventHandler) WithLogger(logger forge.Logger) *TypedEventHandler

WithLogger sets the logger for the handler

func (*TypedEventHandler) WithMetrics

func (h *TypedEventHandler) WithMetrics(metrics forge.Metrics) *TypedEventHandler

WithMetrics sets the metrics collector for the handler

func (*TypedEventHandler) WithMiddleware

func (h *TypedEventHandler) WithMiddleware(middleware ...HandlerMiddleware) *TypedEventHandler

WithMiddleware adds middleware to the handler

func (*TypedEventHandler) WithRetryPolicy

func (h *TypedEventHandler) WithRetryPolicy(policy *RetryPolicy) *TypedEventHandler

WithRetryPolicy sets the retry policy for the handler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL