events

package
v0.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2022 License: Apache-2.0 Imports: 6 Imported by: 30

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidEventStore is when a dispatcher is created with a nil event store.
	ErrInvalidEventStore = errors.New("invalid event store")
	// ErrAggregateNotVersioned is when the aggregate does not implement the VersionedAggregate interface.
	ErrAggregateNotVersioned = errors.New("aggregate is not versioned")
	// ErrMismatchedEventType occurs when loaded events from ID does not match aggregate type.
	ErrMismatchedEventType = errors.New("mismatched event type and aggregate type")
)

Functions

This section is empty.

Types

type AggregateBase

type AggregateBase struct {
	// contains filtered or unexported fields
}

AggregateBase is a event sourced aggregate base to embed in a domain aggregate.

A typical example:

type UserAggregate struct {
    *events.AggregateBase

    name string
}

Using a new function to create aggregates and setting up the aggregate base is recommended:

func NewUserAggregate(id uuid.UUID) *InvitationAggregate {
    return &UserAggregate{
        AggregateBase: events.NewAggregateBase(UserAggregateType, id),
    }
}

The aggregate must also be registered, in this case:

func init() {
    eh.RegisterAggregate(func(id uuid.UUID) eh.Aggregate {
        return NewUserAggregate(id)
    })
}

The aggregate must return an error if the event can not be applied, or nil to signal success (which will increment the version).

func (a *Aggregate) ApplyEvent(event Event) error {
    switch event.EventType() {
    case AddUserEvent:
        // Apply the event data to the aggregate.
    }
}

See the examples folder for a complete use case.

func NewAggregateBase

func NewAggregateBase(t eh.AggregateType, id uuid.UUID) *AggregateBase

NewAggregateBase creates an aggregate.

func (*AggregateBase) AggregateType

func (a *AggregateBase) AggregateType() eh.AggregateType

AggregateType implements the AggregateType method of the eh.Aggregate interface.

func (*AggregateBase) AggregateVersion added in v0.13.0

func (a *AggregateBase) AggregateVersion() int

AggregateVersion implements the AggregateVersion method of the Aggregate interface.

func (*AggregateBase) AppendEvent added in v0.7.0

func (a *AggregateBase) AppendEvent(t eh.EventType, data eh.EventData, timestamp time.Time, options ...eh.EventOption) eh.Event

AppendEvent appends an event for later retrieval by Events().

func (*AggregateBase) ClearUncommittedEvents added in v0.13.0

func (a *AggregateBase) ClearUncommittedEvents()

ClearUncommittedEvents implements the ClearUncommittedEvents method of the eh.EventSource interface.

func (*AggregateBase) EntityID

func (a *AggregateBase) EntityID() uuid.UUID

EntityID implements the EntityID method of the eh.Entity and eh.Aggregate interface.

func (*AggregateBase) SetAggregateVersion added in v0.13.0

func (a *AggregateBase) SetAggregateVersion(v int)

SetAggregateVersion implements the SetAggregateVersion method of the Aggregate interface.

func (*AggregateBase) UncommittedEvents added in v0.13.0

func (a *AggregateBase) UncommittedEvents() []eh.Event

UncommittedEvents implements the UncommittedEvents method of the eh.EventSource interface.

type AggregateStore

type AggregateStore struct {
	// contains filtered or unexported fields
}

AggregateStore is an aggregate store using event sourcing. It uses an event store for loading and saving events used to build the aggregate and an event handler to handle resulting events.

func NewAggregateStore

func NewAggregateStore(store eh.EventStore, options ...Option) (*AggregateStore, error)

NewAggregateStore creates an aggregate store with an event store and an event handler that will handle resulting events (for example by publishing them on an event bus).

func (*AggregateStore) Load

func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateType, id uuid.UUID) (eh.Aggregate, error)

Load implements the Load method of the eventhorizon.AggregateStore interface. It loads an aggregate from the event store by creating a new aggregate of the type with the ID and then applies all events to it, thus making it the most current version of the aggregate.

func (*AggregateStore) Save

func (r *AggregateStore) Save(ctx context.Context, agg eh.Aggregate) error

Save implements the Save method of the eventhorizon.AggregateStore interface. It saves all uncommitted events from an aggregate to the event store.

type EveryNumberEventSnapshotStrategy added in v0.16.0

type EveryNumberEventSnapshotStrategy struct {
	// contains filtered or unexported fields
}

EveryNumberEventSnapshotStrategy use to take a snapshot every n number of events.

func NewEveryNumberEventSnapshotStrategy added in v0.16.0

func NewEveryNumberEventSnapshotStrategy(threshold int) *EveryNumberEventSnapshotStrategy

func (*EveryNumberEventSnapshotStrategy) ShouldTakeSnapshot added in v0.16.0

func (s *EveryNumberEventSnapshotStrategy) ShouldTakeSnapshot(lastSnapshotVersion int,
	_ time.Time,
	event eh.Event) bool

type NoSnapshotStrategy added in v0.16.0

type NoSnapshotStrategy struct {
}

NoSnapshotStrategy no snapshot should be taken.

func (*NoSnapshotStrategy) ShouldTakeSnapshot added in v0.16.0

func (s *NoSnapshotStrategy) ShouldTakeSnapshot(_ int,
	_ time.Time,
	_ eh.Event) bool

type Option added in v0.16.0

type Option func(*AggregateStore) error

Option is an option setter used to configure creation.

func WithSnapshotStrategy added in v0.16.0

func WithSnapshotStrategy(s eh.SnapshotStrategy) Option

WithSnapshotStrategy add the strategy to use when determining if a snapshot should be taken

type PeriodSnapshotStrategy added in v0.16.0

type PeriodSnapshotStrategy struct {
	// contains filtered or unexported fields
}

PeriodSnapshotStrategy use to take a snapshot every time a period has elapsed, for example every hour.

func NewPeriodSnapshotStrategy added in v0.16.0

func NewPeriodSnapshotStrategy(threshold time.Duration) *PeriodSnapshotStrategy

func (*PeriodSnapshotStrategy) ShouldTakeSnapshot added in v0.16.0

func (s *PeriodSnapshotStrategy) ShouldTakeSnapshot(_ int,
	lastSnapshotTimestamp time.Time,
	event eh.Event) bool

type VersionedAggregate added in v0.13.0

type VersionedAggregate interface {
	// Provides all the basic aggregate data.
	eh.Aggregate

	// Provides events to persist and publish from the aggregate.
	eh.EventSource

	// AggregateVersion returns the version of the aggregate.
	AggregateVersion() int
	// SetAggregateVersion sets the version of the aggregate. It should only be
	// called after an event has been successfully applied, often by EH.
	SetAggregateVersion(int)

	// ApplyEvent applies an event on the aggregate by setting its values.
	// If there are no errors the version should be incremented by calling
	// IncrementVersion.
	ApplyEvent(context.Context, eh.Event) error
}

VersionedAggregate is an interface representing a versioned aggregate created from events. It receives commands and generates events that are stored.

The aggregate is created/loaded and saved by the Repository inside the Dispatcher. A domain specific aggregate can either implement the full interface, or more commonly embed *AggregateBase to take care of the common methods.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL