event

package
v0.3.0-prerelease.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2024 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package event contains types and implementations for dealing with Domain Events.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Appender

type Appender interface {
	Append(ctx context.Context, id StreamID, expected version.Check, events ...Envelope) (version.Version, error)
}

Appender is an event.Store trait used to append new Domain Events in the Event Stream.

type Envelope

type Envelope message.GenericEnvelope

Envelope contains a Domain Event and possible metadata associated to it.

Due to lack of sum types (a.k.a enum types), Events cannot currently take advantage of the new generics feature introduced with Go 1.18.

func ToEnvelope

func ToEnvelope(event Event) Envelope

ToEnvelope returns an Envelope instance with the provided Event instance and no Metadata.

func ToEnvelopes

func ToEnvelopes(events ...Event) []Envelope

ToEnvelopes returns a list of Envelopes from a list of Events. The returned Envelopes have no Metadata.

type Event

type Event message.Message

Event is a Message representing some Domain information that has happened in the past, which is of vital information to the Domain itself.

Event type names should be phrased in the past tense, to enforce the notion of "information happened in the past".

type FusedStore

type FusedStore struct {
	Appender
	Streamer
}

FusedStore is a convenience type to fuse multiple Event Store interfaces where you might need to extend the functionality of the Store only partially.

E.g. You might want to extend the functionality of the Append() method, but keep the Streamer methods the same.

If the extension wrapper does not support the Streamer interface, you cannot use the extension wrapper instance as an Event Store in certain cases (e.g. the Aggregate Repository).

Using a FusedStore instance you can fuse both instances together, and use it with the rest of the library ecosystem.

type InMemoryStore

type InMemoryStore struct {
	// contains filtered or unexported fields
}

InMemoryStore is a thread-safe, in-memory event.Store implementation.

func NewInMemoryStore

func NewInMemoryStore() *InMemoryStore

NewInMemoryStore creates a new event.InMemoryStore instance.

func (*InMemoryStore) Append

func (es *InMemoryStore) Append(
	_ context.Context,
	id StreamID,
	expected version.Check,
	events ...Envelope,
) (version.Version, error)

Append inserts the specified Domain Events into the Event Stream specified by the current instance, returning the new version of the Event Stream.

`version.CheckExact` can be specified to enable an Optimistic Concurrency check on append, by using the expected version of the Event Stream prior to appending the new Events.

Alternatively, `version.Any` can be used if no Optimistic Concurrency check should be carried out.

An instance of `version.ConflictError` will be returned if the optimistic locking version check fails against the current version of the Event Stream.

func (*InMemoryStore) Stream

func (es *InMemoryStore) Stream(
	ctx context.Context,
	eventStream StreamWrite,
	id StreamID,
	selector version.Selector,
) error

Stream streams committed events in the Event Store onto the provided EventStream, from the specified Global Sequence Number in `from`, based on the provided stream.Target.

Note: this call is synchronous, and will return when all the Events have been successfully written to the provided EventStream, or when the context has been canceled.

This method fails only when the context is canceled.

type Persisted

type Persisted struct {
	StreamID
	version.Version
	Envelope
}

Persisted represents an Domain Event that has been persisted into the Event Store.

type Processor

type Processor interface {
	Process(ctx context.Context, event Persisted) error
}

Processor represents a component that can process persisted Domain Events.

type ProcessorFunc

type ProcessorFunc func(ctx context.Context, event Persisted) error

ProcessorFunc is a functional implementation of the Processor interface.

func (ProcessorFunc) Process

func (pf ProcessorFunc) Process(ctx context.Context, event Persisted) error

Process implements the event.Processor interface.

type Store

type Store interface {
	Appender
	Streamer
}

Store represents an Event Store, a stateful data source where Domain Events can be safely stored, and easily replayed.

type Stream

type Stream = chan Persisted

Stream represents a stream of persisted Domain Events coming from some stream-able source of data, like an Event Store.

func SliceToStream

func SliceToStream(events []Persisted) Stream

SliceToStream converts a slice of event.Persisted domain events to an event.Stream type.

The event.Stream channel has the same buffer size as the input slice.

The channel returned by the function contains all the original slice elements and is already closed.

type StreamID

type StreamID string

StreamID identifies an Event Stream, which is a log of ordered Domain Events.

type StreamRead

type StreamRead <-chan Persisted

StreamRead provides read-only access to an event.Stream object.

type StreamWrite

type StreamWrite chan<- Persisted

StreamWrite provides write-only access to an event.Stream object.

type Streamer

type Streamer interface {
	Stream(ctx context.Context, stream StreamWrite, id StreamID, selector version.Selector) error
}

Streamer is an event.Store trait used to open a specific Event Stream and stream it back in the application.

type TrackingStore

type TrackingStore struct {
	Appender
	// contains filtered or unexported fields
}

TrackingStore is an Event Store wrapper to track the Events committed to the inner Event Store.

Useful for tests assertion.

func NewTrackingStore

func NewTrackingStore(appender Appender) *TrackingStore

NewTrackingStore wraps an Event Store to capture events that get appended to it.

func (*TrackingStore) Append

func (es *TrackingStore) Append(
	ctx context.Context,
	id StreamID,
	expected version.Check,
	events ...Envelope,
) (version.Version, error)

Append forwards the call to the wrapped Event Store instance and, if the operation concludes successfully, records these events internally.

The recorded events can be accessed by calling Recorded().

func (*TrackingStore) Recorded

func (es *TrackingStore) Recorded() []Persisted

Recorded returns the list of Events that have been appended to the Event Store.

Please note: these events do not record the Sequence Number assigned by the Event Store. Usually you should not need it in test assertions, since the order of Events in the returned slice always follows the global order of the Event Stream (or the Event Store).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL