Documentation ¶
Overview ¶
Package eventhorizon is a CQRS/ES toolkit for Go.
Index ¶
- Constants
- Variables
- func AggregateIDFromContext(ctx context.Context) (uuid.UUID, bool)
- func CheckCommand(cmd Command) error
- func CompareEventSlices(evts1, evts2 []Event, opts ...CompareOption) bool
- func CompareEvents(e1, e2 Event, options ...CompareOption) error
- func CopyContext(from, to context.Context) context.Context
- func MarshalContext(ctx context.Context) map[string]interface{}
- func NewContextWithAggregateID(ctx context.Context, aggregateID uuid.UUID) context.Context
- func NewContextWithAggregateType(ctx context.Context, aggregateType AggregateType) context.Context
- func NewContextWithCommandType(ctx context.Context, commandType CommandType) context.Context
- func RegisterAggregate(factory func(uuid.UUID) Aggregate)
- func RegisterCommand(factory func() Command)
- func RegisterContextMarshaler(f ContextMarshalFunc)
- func RegisterContextUnmarshaler(f ContextUnmarshalFunc)
- func RegisterEventData(eventType EventType, factory func() EventData)
- func RegisterSnapshotData(aggregateType AggregateType, factory func(id uuid.UUID) SnapshotData)
- func RegisteredCommands() map[CommandType]func() Command
- func UnmarshalContext(ctx context.Context, vals map[string]interface{}) context.Context
- func UnregisterAggregate(aggregateType AggregateType)
- func UnregisterCommand(commandType CommandType)
- func UnregisterEventData(eventType EventType)
- type Aggregate
- type AggregateError
- type AggregateStore
- type AggregateStoreError
- type AggregateStoreOperation
- type AggregateType
- type BasicMongoDB
- func (db *BasicMongoDB) Close() error
- func (db *BasicMongoDB) CollectionDrop(ctx context.Context, collectionName string) error
- func (db *BasicMongoDB) CollectionExec(ctx context.Context, collectionName string, ...) error
- func (db *BasicMongoDB) CollectionExecWithTransaction(ctx context.Context, collectionName string, ...) error
- func (db *BasicMongoDB) CollectionWatchChangeStream(ctx context.Context, collectionName string, pipeline interface{}, ...) error
- func (db *BasicMongoDB) DatabaseExec(ctx context.Context, fn func(context.Context, *mongo.Database) error) error
- func (db *BasicMongoDB) DatabaseExecWithTransaction(ctx context.Context, fn func(mongo.SessionContext, *mongo.Database) error) error
- func (db *BasicMongoDB) Errors() <-chan error
- func (db *BasicMongoDB) Ping(ctx context.Context, rp *readpref.ReadPref) error
- type Command
- type CommandCodec
- type CommandFieldError
- type CommandHandler
- type CommandHandlerFunc
- type CommandHandlerMiddleware
- type CommandIDer
- type CommandType
- type CompareConfig
- type CompareOption
- type ContextMarshalFunc
- type ContextUnmarshalFunc
- type Entity
- type Event
- type EventBus
- type EventBusError
- type EventCodec
- type EventData
- type EventHandler
- type EventHandlerChain
- type EventHandlerError
- type EventHandlerFunc
- type EventHandlerMiddleware
- type EventHandlerType
- type EventMatcher
- type EventOption
- type EventSource
- type EventStore
- type EventStoreError
- type EventStoreMaintenance
- type EventStoreOperation
- type EventType
- type IsZeroer
- type Iter
- type MatchAggregates
- type MatchAll
- type MatchAny
- type MatchEvents
- type MongoDB
- type Outbox
- type OutboxError
- type ReadRepo
- type ReadWriteRepo
- type RepoError
- type RepoOperation
- type Snapshot
- type SnapshotData
- type SnapshotStore
- type SnapshotStrategy
- type Snapshotable
- type Versionable
- type WriteRepo
Constants ¶
const ( // Errors during loading of aggregates. AggregateStoreOpLoad = "load" // Errors during saving of aggregates. AggregateStoreOpSave = "save" )
const ( // Errors during loading of events. EventStoreOpLoad = "load" // Errors during saving of events. EventStoreOpSave = "save" // Errors during replacing of events. EventStoreOpReplace = "replace" // Errors during renaming of event types. EventStoreOpRename = "rename" // Errors during removing of events. EventStoreOpRemove = "remove" // Errors during clearing of the event store. EventStoreOpClear = "clear" // Errors during loading of snapshot. EventStoreOpLoadSnapshot = "load_snapshot" // Errors during saving of snapshot. EventStoreOpSaveSnapshot = "save_snapshot" )
const ( // Errors during finding of an entity. RepoOpFind = "find" // Errors during finding of all entities. RepoOpFindAll = "find all" // Errors during finding of entities by query. RepoOpFindQuery = "find query" // Errors during saving of an entity. RepoOpSave = "save" // Errors during removing of an entity. RepoOpRemove = "remove" // Errors during clearing of all entities. RepoOpClear = "clear" )
const DefaultMinVersionDeadline = 10 * time.Second
DefaultMinVersionDeadline is the deadline to use when creating a min version context that waits.
Variables ¶
var ( // ErrAggregateNotFound is when no aggregate can be found. ErrAggregateNotFound = errors.New("aggregate not found") // ErrAggregateNotRegistered is when no aggregate factory was registered. ErrAggregateNotRegistered = errors.New("aggregate not registered") )
var ( // ErrMissingCommand is when there is no command to be handled. ErrMissingCommand = errors.New("missing command") // ErrMissingAggregateID is when a command is missing an aggregate ID. ErrMissingAggregateID = errors.New("missing aggregate ID") )
var ( // ErrMissingMatcher is returned when calling AddHandler without a matcher. ErrMissingMatcher = errors.New("missing matcher") // ErrMissingHandler is returned when calling AddHandler with a nil handler. ErrMissingHandler = errors.New("missing handler") // ErrHandlerAlreadyAdded is returned when calling AddHandler weth the same handler twice. ErrHandlerAlreadyAdded = errors.New("handler already added") )
var ( // Missing events for save operation. ErrMissingEvents = errors.New("missing events") // Events in the same save operation is for different aggregate IDs. ErrMismatchedEventAggregateIDs = errors.New("mismatched event aggregate IDs") // Events in the same save operation is for different aggregate types. ErrMismatchedEventAggregateTypes = errors.New("mismatched event aggregate types") // Events in the same operation have non-serial versions or is not matching the original version. ErrIncorrectEventVersion = errors.New("incorrect event version") // Other events has been saved for this aggregate since the operation started. ErrEventConflictFromOtherSave = errors.New("event conflict from other save") // No matching event could be found (for maintenance operations etc). ErrEventNotFound = errors.New("event not found") )
var ( // ErrEntityNotFound is when a entity could not be found. ErrEntityNotFound = errors.New("could not find entity") // ErrEntityHasNoVersion is when an entity has no version number. ErrEntityHasNoVersion = errors.New("entity has no version") // ErrIncorrectEntityVersion is when an entity has an incorrect version. ErrIncorrectEntityVersion = errors.New("incorrect entity version") )
var ErrCommandNotRegistered = errors.New("command not registered")
ErrCommandNotRegistered is when no command factory was registered.
var ErrEventDataNotRegistered = errors.New("event data not registered")
ErrEventDataNotRegistered is when no event data factory was registered.
var ErrMissingEvent = errors.New("missing event")
ErrMissingEvent is when there is no event to be handled.
var ErrSnapshotDataNotRegistered = errors.New("snapshot data not registered")
Functions ¶
func AggregateIDFromContext ¶ added in v0.18.0
AggregateIDFromContext return the command type from the context.
func CompareEventSlices ¶ added in v0.18.0
func CompareEventSlices(evts1, evts2 []Event, opts ...CompareOption) bool
CompareEventSlices compares two slices of events, using options.
func CompareEvents ¶ added in v0.18.0
func CompareEvents(e1, e2 Event, options ...CompareOption) error
CompareEvents compares two events, with options for ignoring timestamp, version etc.
func CopyContext ¶ added in v0.18.0
CopyContext copies all values that are registered and exists in the `from` context to the `to` context. It basically runs a marshal/unmarshal back-to-back.
func MarshalContext ¶
MarshalContext marshals a context into a map.
func NewContextWithAggregateID ¶ added in v0.18.0
NewContextWithAggregateID adds a aggregate ID on the context.
func NewContextWithAggregateType ¶ added in v0.18.0
func NewContextWithAggregateType(ctx context.Context, aggregateType AggregateType) context.Context
NewContextWithAggregateType adds a aggregate type on the context.
func NewContextWithCommandType ¶ added in v0.18.0
func NewContextWithCommandType(ctx context.Context, commandType CommandType) context.Context
NewContextWithCommandType adds a command type on the context.
func RegisterAggregate ¶
RegisterAggregate registers an aggregate factory for a type. The factory is used to create concrete aggregate types when loading from the database.
An example would be:
RegisterAggregate(func(id UUID) Aggregate { return &MyAggregate{id} })
func RegisterCommand ¶
func RegisterCommand(factory func() Command)
RegisterCommand registers an command factory for a type. The factory is used to create concrete command types.
An example would be:
RegisterCommand(func() Command { return &MyCommand{} })
func RegisterContextMarshaler ¶
func RegisterContextMarshaler(f ContextMarshalFunc)
RegisterContextMarshaler registers a marshaler function used by MarshalContext.
func RegisterContextUnmarshaler ¶
func RegisterContextUnmarshaler(f ContextUnmarshalFunc)
RegisterContextUnmarshaler registers a marshaler function used by UnmarshalContext.
func RegisterEventData ¶
RegisterEventData registers an event data factory for a type. The factory is used to create concrete event data structs when loading from the database.
An example would be:
RegisterEventData(MyEventType, func() Event { return &MyEventData{} })
func RegisterSnapshotData ¶ added in v0.18.0
func RegisterSnapshotData(aggregateType AggregateType, factory func(id uuid.UUID) SnapshotData)
RegisterSnapshotData registers an snapshot factory for a type. The factory is used to create concrete snapshot state type when unmarshalling.
An example would be:
RegisterSnapshotData("aggregateType1", func() SnapshotData { return &MySnapshotData{} })
func RegisteredCommands ¶ added in v0.18.0
func RegisteredCommands() map[CommandType]func() Command
ListCommands returns a list of all registered command types.
func UnmarshalContext ¶
UnmarshalContext unmarshals a context from a map.
func UnregisterAggregate ¶ added in v0.18.0
func UnregisterAggregate(aggregateType AggregateType)
RegisterAggregate un-registers an aggregate by a given type.
func UnregisterCommand ¶
func UnregisterCommand(commandType CommandType)
UnregisterCommand removes the registration of the command factory for a type. This is mainly useful in mainenance situations where the command type needs to be switched at runtime.
func UnregisterEventData ¶
func UnregisterEventData(eventType EventType)
UnregisterEventData removes the registration of the event data factory for a type. This is mainly useful in mainenance situations where the event data needs to be switched in a migrations.
Types ¶
type Aggregate ¶
type Aggregate interface { // Entity provides the ID of the aggregate. Entity // AggregateType returns the type name of the aggregate. // AggregateType() string AggregateType() AggregateType // CommandHandler is used to handle commands. CommandHandler }
Aggregate is an interface representing a versioned data entity created from events. It receives commands and generates events that are stored.
The aggregate is created/loaded and saved by the Repository inside the Dispatcher. A domain specific aggregate can either implement the full interface, or more commonly embed *AggregateBase to take care of the common methods.
func CreateAggregate ¶
func CreateAggregate(aggregateType AggregateType, id uuid.UUID) (Aggregate, error)
CreateAggregate creates an aggregate of a type with an ID using the factory registered with RegisterAggregate.
type AggregateError ¶ added in v0.18.0
type AggregateError struct { // Err is the error. Err error }
AggregateError is an error caused in the aggregate when handling a command.
func (*AggregateError) Cause ¶ added in v0.18.0
func (e *AggregateError) Cause() error
Cause implements the github.com/pkg/errors Unwrap method.
func (*AggregateError) Error ¶ added in v0.18.0
func (e *AggregateError) Error() string
Error implements the Error method of the errors.Error interface.
func (*AggregateError) Unwrap ¶ added in v0.18.0
func (e *AggregateError) Unwrap() error
Unwrap implements the errors.Unwrap method.
type AggregateStore ¶
type AggregateStore interface { // Load loads the most recent version of an aggregate with a type and id. Load(context.Context, AggregateType, uuid.UUID) (Aggregate, error) // Save saves the uncommitted events for an aggregate. Save(context.Context, Aggregate) error }
AggregateStore is responsible for loading and saving aggregates.
type AggregateStoreError ¶ added in v0.18.0
type AggregateStoreError struct { // Err is the error that happened when applying the event. Err error // Op is the operation for the error. Op AggregateStoreOperation // AggregateType of related operation. AggregateType AggregateType // AggregateID of related operation. AggregateID uuid.UUID }
AggregateStoreError contains related info about errors in the store.
func (*AggregateStoreError) Cause ¶ added in v0.18.0
func (e *AggregateStoreError) Cause() error
Cause implements the github.com/pkg/errors Unwrap method.
func (*AggregateStoreError) Error ¶ added in v0.18.0
func (e *AggregateStoreError) Error() string
Error implements the Error method of the error interface.
func (*AggregateStoreError) Unwrap ¶ added in v0.18.0
func (e *AggregateStoreError) Unwrap() error
Unwrap implements the errors.Unwrap method.
type AggregateStoreOperation ¶ added in v0.18.0
type AggregateStoreOperation string
AggregateStoreOperation is the operation done when an error happened.
type AggregateType ¶
type AggregateType string
AggregateType is the type of an aggregate.
func AggregateTypeFromContext ¶ added in v0.18.0
func AggregateTypeFromContext(ctx context.Context) (AggregateType, bool)
AggregateTypeFromContext return the command type from the context.
func (AggregateType) String ¶ added in v0.18.0
func (at AggregateType) String() string
String returns the string representation of an aggregate type.
type BasicMongoDB ¶ added in v0.18.0
type BasicMongoDB struct {
// contains filtered or unexported fields
}
BasicMongoDB is a basic implementation of the MongoDB interface.
func NewMongoDB ¶ added in v0.18.0
func NewMongoDB(uri string, dbName string) (*BasicMongoDB, error)
NewMongoDBWithClient returns a new MongoDB instance.
func NewMongoDBWithClient ¶ added in v0.18.0
func NewMongoDBWithClient(client *mongo.Client, dbName string) *BasicMongoDB
NewMongoDBWithClient returns a new MongoDB instance.
func (*BasicMongoDB) Close ¶ added in v0.18.0
func (db *BasicMongoDB) Close() error
Close implements the Close method of the MongoDB interface.
func (*BasicMongoDB) CollectionDrop ¶ added in v0.18.0
func (db *BasicMongoDB) CollectionDrop(ctx context.Context, collectionName string) error
CollectionDrop implements the CollectionDrop method of the MongoDB interface.
func (*BasicMongoDB) CollectionExec ¶ added in v0.18.0
func (db *BasicMongoDB) CollectionExec(ctx context.Context, collectionName string, fn func(context.Context, *mongo.Collection) error) error
CollectionExec implements the CollectionExec method of the MongoDB interface.
func (*BasicMongoDB) CollectionExecWithTransaction ¶ added in v0.18.0
func (db *BasicMongoDB) CollectionExecWithTransaction(ctx context.Context, collectionName string, fn func(mongo.SessionContext, *mongo.Collection) error) error
CollectionExecWithTransaction implements the CollectionExecWithTransaction method of the MongoDB interface.
func (*BasicMongoDB) CollectionWatchChangeStream ¶ added in v0.18.0
func (db *BasicMongoDB) CollectionWatchChangeStream( ctx context.Context, collectionName string, pipeline interface{}, resumeToken *bson.Raw, fn func(context.Context, <-chan bson.Raw) error, opts ...*options.ChangeStreamOptions, ) error
CollectionWatchChangeStream implements the CollectionWatchChangeStream method of the MongoDB interface.
func (*BasicMongoDB) DatabaseExec ¶ added in v0.18.0
func (db *BasicMongoDB) DatabaseExec(ctx context.Context, fn func(context.Context, *mongo.Database) error) error
DatabaseExec implements the DatabaseExec method of the MongoDB interface.
func (*BasicMongoDB) DatabaseExecWithTransaction ¶ added in v0.18.0
func (db *BasicMongoDB) DatabaseExecWithTransaction(ctx context.Context, fn func(mongo.SessionContext, *mongo.Database) error) error
DatabaseExecWithTransaction implements the DatabaseExecWithTransaction method of the MongoDB interface.
func (*BasicMongoDB) Errors ¶ added in v0.18.0
func (db *BasicMongoDB) Errors() <-chan error
Errors implements the Errors method of the MongoDB interface.
type Command ¶
type Command interface { // AggregateID returns the ID of the aggregate that the command should be // handled by. AggregateID() uuid.UUID // AggregateType returns the type of the aggregate that the command can be // handled by. AggregateType() AggregateType // CommandType returns the type of the command. CommandType() CommandType }
Command is a domain command that is sent to a Dispatcher.
A command name should 1) be in present tense and 2) contain the intent (MoveCustomer vs CorrectCustomerAddress).
The command should contain all the data needed when handling it as fields. These fields can take an optional "eh" tag, which adds properties. For now only "optional" is a valid tag: `eh:"optional"`.
func CreateCommand ¶
func CreateCommand(commandType CommandType) (Command, error)
CreateCommand creates an command of a type with an ID using the factory registered with RegisterCommand.
type CommandCodec ¶ added in v0.18.0
type CommandCodec interface { // MarshalCommand marshals a command and the supported parts of context into bytes. MarshalCommand(context.Context, Command) ([]byte, error) // UnmarshalCommand unmarshals a command and supported parts of context from bytes. UnmarshalCommand(context.Context, []byte) (Command, context.Context, error) }
CommandCodec is a codec for marshaling and unmarshaling commands to and from bytes.
type CommandFieldError ¶
type CommandFieldError struct {
Field string
}
CommandFieldError is returned by Dispatch when a field is incorrect.
func (*CommandFieldError) Error ¶
func (c *CommandFieldError) Error() string
Error implements the Error method of the error interface.
type CommandHandler ¶
CommandHandler is an interface that all handlers of commands should implement.
func UseCommandHandlerMiddleware ¶
func UseCommandHandlerMiddleware(h CommandHandler, middleware ...CommandHandlerMiddleware) CommandHandler
UseCommandHandlerMiddleware wraps a CommandHandler in one or more middleware.
type CommandHandlerFunc ¶
CommandHandlerFunc is a function that can be used as a command handler.
func (CommandHandlerFunc) HandleCommand ¶
func (h CommandHandlerFunc) HandleCommand(ctx context.Context, cmd Command) error
HandleCommand implements the HandleCommand method of the CommandHandler.
type CommandHandlerMiddleware ¶
type CommandHandlerMiddleware func(CommandHandler) CommandHandler
CommandHandlerMiddleware is a function that middlewares can implement to be able to chain.
type CommandIDer ¶ added in v0.18.0
type CommandIDer interface { // CommandID returns the ID of the command instance being handled. CommandID() uuid.UUID }
CommandIDer provides a unique command ID to be used for request tracking etc.
type CommandType ¶
type CommandType string
CommandType is the type of a command, used as its unique identifier.
func CommandTypeFromContext ¶ added in v0.18.0
func CommandTypeFromContext(ctx context.Context) (CommandType, bool)
CommandTypeFromContext return the command type from the context.
func (CommandType) String ¶ added in v0.18.0
func (ct CommandType) String() string
String returns the string representation of a command type.
type CompareConfig ¶ added in v0.18.0
type CompareConfig struct {
// contains filtered or unexported fields
}
CompareConfig is a config for the ComparEvents function.
type CompareOption ¶ added in v0.18.0
type CompareOption func(*CompareConfig)
CompareOption is an option setter used to configure comparing of events.
func IgnorePositionMetadata ¶ added in v0.18.0
func IgnorePositionMetadata() CompareOption
IgnorePositionMetadata ignores the position in metadata when comparing events.
func IgnoreTimestamp ¶ added in v0.18.0
func IgnoreTimestamp() CompareOption
IgnoreTimestamp ignores the timestamps of events when comparing.
func IgnoreVersion ¶ added in v0.18.0
func IgnoreVersion() CompareOption
IgnoreVersion ignores the versions of events when comparing.
type ContextMarshalFunc ¶
ContextMarshalFunc is a function that marshalls any context values to a map, used for sending context on the wire.
type ContextUnmarshalFunc ¶
ContextUnmarshalFunc is a function that marshalls any context values to a map, used for sending context on the wire.
type Entity ¶
Entity is an item which is identified by an ID.
From http://cqrs.nu/Faq: "Entities or reference types are characterized by having an identity that's not tied to their attribute values. All attributes in an entity can change and it's still "the same" entity. Conversely, two entities might be equivalent in all their attributes, but will still be distinct".
type Event ¶
type Event interface { // EventType returns the type of the event. EventType() EventType // The data attached to the event. Data() EventData // Timestamp of when the event was created. Timestamp() time.Time // AggregateType is the type of the aggregate that the event can be // applied to. AggregateType() AggregateType // AggregateID is the ID of the aggregate that the event belongs to. AggregateID() uuid.UUID // Version is the version of the aggregate after the event has been applied. Version() int // Metadata is app-specific metadata such as request ID, originating user etc. Metadata() map[string]interface{} // A string representation of the event. String() string }
Event is a domain event describing a change that has happened to an aggregate.
An event struct and type name should:
- Be in past tense (CustomerMoved)
- Contain the intent (CustomerMoved vs CustomerAddressCorrected).
The event should contain all the data needed when applying/handling it.
func NewEvent ¶
func NewEvent(eventType EventType, data EventData, timestamp time.Time, options ...EventOption) Event
NewEvent creates a new event with a type and data, setting its timestamp.
func NewEventForAggregate ¶
func NewEventForAggregate(eventType EventType, data EventData, timestamp time.Time, aggregateType AggregateType, aggregateID uuid.UUID, version int, options ...EventOption) Event
NewEventForAggregate creates a new event with a type and data, setting its timestamp. It also sets the aggregate data on it. DEPRECATED, use NewEvent() with the WithAggregate() option instead.
type EventBus ¶
type EventBus interface { EventHandler // AddHandler adds a handler for an event. Returns an error if either the // matcher or handler is nil, the handler is already added or there was some // other problem adding the handler (for networked handlers for example). AddHandler(context.Context, EventMatcher, EventHandler) error // Errors returns an error channel where async handling errors are sent. Errors() <-chan error // Close closes the EventBus and waits for all handlers to finish. Close() error }
EventBus is an EventHandler that distributes published events to all matching handlers that are registered, but only one of each type will handle the event. Events are not guaranteed to be handeled in order.
type EventBusError ¶ added in v0.18.0
type EventBusError struct { // Err is the error. Err error // Ctx is the context used when the error happened. Ctx context.Context // Event is the event handeled when the error happened. Event Event }
EventBusError is an async error containing the error returned from a handler and the event that it happened on.
func (*EventBusError) Cause ¶ added in v0.18.0
func (e *EventBusError) Cause() error
Cause implements the github.com/pkg/errors Unwrap method.
func (*EventBusError) Error ¶ added in v0.18.0
func (e *EventBusError) Error() string
Error implements the Error method of the error interface.
func (*EventBusError) Unwrap ¶ added in v0.18.0
func (e *EventBusError) Unwrap() error
Unwrap implements the errors.Unwrap method.
type EventCodec ¶ added in v0.18.0
type EventCodec interface { // MarshalEvent marshals an event and the supported parts of context into bytes. MarshalEvent(context.Context, Event) ([]byte, error) // UnmarshalEvent unmarshals an event and supported parts of context from bytes. UnmarshalEvent(context.Context, []byte) (Event, context.Context, error) }
EventCodec is a codec for marshaling and unmarshaling events to and from bytes.
type EventData ¶
type EventData interface{}
EventData is any additional data for an event.
func CreateEventData ¶
CreateEventData creates an event data of a type using the factory registered with RegisterEventData.
type EventHandler ¶
type EventHandler interface { // HandlerType is the type of the handler. HandlerType() EventHandlerType // HandleEvent handles an event. HandleEvent(context.Context, Event) error }
EventHandler is a handler of events. If registered on a bus as a handler only one handler of the same type will receive each event. If registered on a bus as an observer all handlers of the same type will receive each event.
func UseEventHandlerMiddleware ¶
func UseEventHandlerMiddleware(h EventHandler, middleware ...EventHandlerMiddleware) EventHandler
UseEventHandlerMiddleware wraps a EventHandler in one or more middleware.
type EventHandlerChain ¶ added in v0.18.0
type EventHandlerChain interface {
InnerHandler() EventHandler
}
EventHandlerChain declares InnerHandler that returns the inner handler of a event handler middleware. This enables an endpoint or other middlewares to traverse the chain of handlers in order to find a specific middleware that can be interacted with.
For handlers who's intrinsic properties requires them to be the last responder of a chain, or can't produce an InnerHandler, a nil response can be implemented thereby hindering any further attempt to traverse the chain.
type EventHandlerError ¶ added in v0.18.0
type EventHandlerError struct { // Err is the error. Err error // Event is the event that failed to be handled. Event Event }
EventHandlerError is an error returned when an event could not be handled by an event handler.
func (*EventHandlerError) Cause ¶ added in v0.18.0
func (e *EventHandlerError) Cause() error
Cause implements the github.com/pkg/errors Unwrap method.
func (*EventHandlerError) Error ¶ added in v0.18.0
func (e *EventHandlerError) Error() string
Error implements the Error method of the errors.Error interface.
func (*EventHandlerError) Unwrap ¶ added in v0.18.0
func (e *EventHandlerError) Unwrap() error
Unwrap implements the errors.Unwrap method.
type EventHandlerFunc ¶
EventHandlerFunc is a function that can be used as a event handler.
func (EventHandlerFunc) HandleEvent ¶
func (f EventHandlerFunc) HandleEvent(ctx context.Context, e Event) error
HandleEvent implements the HandleEvent method of the EventHandler.
func (EventHandlerFunc) HandlerType ¶ added in v0.18.0
func (f EventHandlerFunc) HandlerType() EventHandlerType
HandlerType implements the HandlerType method of the EventHandler by returning the name of the package and function: "github.com/Clarilab/eventhorizon.Function" becomes "eventhorizon-Function".
type EventHandlerMiddleware ¶
type EventHandlerMiddleware func(EventHandler) EventHandler
EventHandlerMiddleware is a function that middlewares can implement to be able to chain.
type EventHandlerType ¶ added in v0.18.0
type EventHandlerType string
EventHandlerType is the type of an event handler, used as its unique identifier.
func (EventHandlerType) String ¶ added in v0.18.0
func (ht EventHandlerType) String() string
String returns the string representation of an event handler type.
type EventMatcher ¶
type EventMatcher interface { // Match returns true if the matcher matches an event. Match(Event) bool }
EventMatcher matches, for example on event types, aggregate types etc.
type EventOption ¶ added in v0.18.0
type EventOption func(Event)
EventOption is an option to use when creating events.
func ForAggregate ¶ added in v0.18.0
func ForAggregate(aggregateType AggregateType, aggregateID uuid.UUID, version int) EventOption
ForAggregate adds aggregate data when creating an event.
func FromCommand ¶ added in v0.18.0
func FromCommand(cmd Command) EventOption
FromCommand adds metadata for the originating command when crating an event. Currently it adds the command type and optionally a command ID (if the CommandIDer interface is implemented).
func WithGlobalPosition ¶ added in v0.18.0
func WithGlobalPosition(position int) EventOption
WithGlobalPosition sets the global event position in the metadata.
func WithMetadata ¶ added in v0.18.0
func WithMetadata(metadata map[string]interface{}) EventOption
WithMetadata adds metadata when creating an event. Note that the values types must be supported by the event marshalers in use.
type EventSource ¶ added in v0.18.0
type EventSource interface { // UncommittedEvents returns events that are not committed to the event store, // or handeled in other ways (depending on the caller). UncommittedEvents() []Event // ClearUncommittedEvents clears uncommitted events, used after they have been // committed to the event store or handled in other ways (depending on the caller). ClearUncommittedEvents() }
EventSource is a source of events, used for getting events for handling, storing, publishing etc. Mostly used in the aggregate stores.
type EventStore ¶
type EventStore interface { // Save appends all events in the event stream to the store. Save(ctx context.Context, events []Event, originalVersion int) error // Load loads all events for the aggregate id from the store. Load(context.Context, uuid.UUID) ([]Event, error) // LoadFrom loads all events from version for the aggregate id from the store. LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]Event, error) // Close closes the EventStore. Close() error }
EventStore is an interface for an event sourcing event store.
type EventStoreError ¶
type EventStoreError struct { // Err is the error. Err error // Op is the operation for the error. Op EventStoreOperation // AggregateType of related operation. AggregateType AggregateType // AggregateID of related operation. AggregateID uuid.UUID // AggregateVersion of related operation. AggregateVersion int // Events of the related operation. Events []Event }
EventStoreError is an error in the event store.
func (*EventStoreError) Cause ¶ added in v0.18.0
func (e *EventStoreError) Cause() error
Cause implements the github.com/pkg/errors Unwrap method.
func (*EventStoreError) Error ¶
func (e *EventStoreError) Error() string
Error implements the Error method of the errors.Error interface.
func (*EventStoreError) Unwrap ¶ added in v0.18.0
func (e *EventStoreError) Unwrap() error
Unwrap implements the errors.Unwrap method.
type EventStoreMaintenance ¶ added in v0.18.0
type EventStoreMaintenance interface { // Replace replaces an event, the version must match. Useful for maintenance actions. // Returns ErrAggregateNotFound if there is no aggregate. Replace(ctx context.Context, event Event) error // RenameEvent renames all instances of the event type. RenameEvent(ctx context.Context, from, to EventType) error // Remove removes all events for a given aggregate. Remove(ctx context.Context, aggregateID uuid.UUID) error // Clear clears the event storage. Clear(ctx context.Context) error }
EventStoreMaintenance is an interface with maintenance tools for an EventStore. NOTE: Should not be used in apps, useful for migration tools etc.
type EventStoreOperation ¶ added in v0.18.0
type EventStoreOperation string
EventStoreOperation is the operation done when an error happened.
type EventType ¶
type EventType string
EventType is the type of an event, used as its unique identifier.
type IsZeroer ¶ added in v0.18.0
type IsZeroer interface {
IsZero() bool
}
IsZeroer is used to check if a type is zero-valued, and in that case is not allowed to be used in a command. See CheckCommand.
type Iter ¶
type Iter interface { Next(context.Context) bool Value() interface{} // Close must be called after the last Next() to retrieve error if any Close(context.Context) error }
Iter is a stateful iterator object that when called Next() readies the next value that can be retrieved from Value(). Enables incremental object retrieval from repos that support it. You must call Close() on each Iter even when results were delivered without apparent error.
type MatchAggregates ¶ added in v0.18.0
type MatchAggregates []AggregateType
MatchAggregates matches any of the aggregate types, nil events never match.
func (MatchAggregates) Match ¶ added in v0.18.0
func (types MatchAggregates) Match(e Event) bool
Match implements the Match method of the EventMatcher interface.
type MatchEvents ¶ added in v0.18.0
type MatchEvents []EventType
MatchEvents matches any of the event types, nil events never match.
func (MatchEvents) Match ¶ added in v0.18.0
func (types MatchEvents) Match(e Event) bool
Match implements the Match method of the EventMatcher interface.
type MongoDB ¶ added in v0.18.0
type MongoDB interface { // Ping pings the MongoDB server. Ping(ctx context.Context, rp *readpref.ReadPref) error // Close gracefully closes the MongoDB connection. Close() error // Errors returns an error channel, that contains errors that occur concurrently. Errors() <-chan error // DatabaseExec executes one or more operations on the database. DatabaseExec(ctx context.Context, fn func(context.Context, *mongo.Database) error) error // DatabaseExec executes one or more operations on the database using transactions. DatabaseExecWithTransaction(ctx context.Context, fn func(mongo.SessionContext, *mongo.Database) error) error // CollectionExec executes one or more operations on the given collection. CollectionExec(ctx context.Context, collectionName string, fn func(context.Context, *mongo.Collection) error) error // CollectionExecWithTransaction executes one or more operations on the database using transactions. CollectionExecWithTransaction(ctx context.Context, collectionName string, fn func(mongo.SessionContext, *mongo.Collection) error) error // CollectionWatchChangeStream can be used to receive events from a collection change-stream. CollectionWatchChangeStream(ctx context.Context, collectionName string, pipeline interface{}, resumeToken *bson.Raw, fn func(context.Context, <-chan bson.Raw) error, opts ...*options.ChangeStreamOptions) error // CollectionDrop drops the given collection. CollectionDrop(ctx context.Context, collectionName string) error }
MongoDB is an interface for a MongoDB database.
type Outbox ¶ added in v0.18.0
type Outbox interface { EventHandler // AddHandler adds a handler for an event. Returns an error if either the // matcher or handler is nil, the handler is already added or there was some // other problem adding the handler. AddHandler(context.Context, EventMatcher, EventHandler) error // Start starts processing the outbox until the Close() is cancelled by // handling all events using the added handlers. Should be called after all // handlers are setup but before new events are starting to be handled. Start() // Close closes the Outbox and waits for all handlers to finish. Close() error // Errors returns an error channel where async handling errors are sent. Errors() <-chan error }
Outbox is an outbox for events. It ensures that all handled events get handled by all added handlers. Handling of events must be atomic to storing them for at least once handling of events, which is often provided by storage adapter using a transaction.
type OutboxError ¶ added in v0.18.0
type OutboxError struct { // Err is the error. Err error // Ctx is the context used when the error happened. Ctx context.Context // Event is the event handled when the error happened. Event Event }
OutboxError is an error in the outbox.
func (*OutboxError) Cause ¶ added in v0.18.0
func (e *OutboxError) Cause() error
Cause implements the github.com/pkg/errors Unwrap method.
func (*OutboxError) Error ¶ added in v0.18.0
func (e *OutboxError) Error() string
Error implements the Error method of the errors.Error interface.
func (*OutboxError) Unwrap ¶ added in v0.18.0
func (e *OutboxError) Unwrap() error
Unwrap implements the errors.Unwrap method.
type ReadRepo ¶
type ReadRepo interface { // InnerRepo returns the inner read repository, if there is one. // Useful for iterating a wrapped set of repositories to get a specific one. InnerRepo(context.Context) ReadRepo // Find returns an entity for an ID. Find(context.Context, uuid.UUID) (Entity, error) // FindAll returns all entities in the repository. FindAll(context.Context) ([]Entity, error) // Close closes the ReadRepo. Close() error }
ReadRepo is a read repository for entities.
type ReadWriteRepo ¶
ReadWriteRepo is a combined read and write repo, mainly useful for testing.
type RepoError ¶
type RepoError struct { // Err is the error. Err error // Op is the operation for the error. Op RepoOperation // EntityID of related operation. EntityID uuid.UUID }
RepoError is an error in the read repository.
func (*RepoError) Cause ¶ added in v0.18.0
Cause implements the github.com/pkg/errors Unwrap method.
type RepoOperation ¶ added in v0.18.0
type RepoOperation string
RepoOperation is the operation done when an error happened.
type Snapshot ¶ added in v0.18.0
type Snapshot struct { Version int AggregateType AggregateType Timestamp time.Time State interface{} }
Snapshot is a recording of the state of an aggregate at a point in time
type SnapshotData ¶ added in v0.18.0
type SnapshotData interface{}
func CreateSnapshotData ¶ added in v0.18.0
func CreateSnapshotData(AggregateID uuid.UUID, aggregateType AggregateType) (SnapshotData, error)
CreateSnapshotData create a concrete instance using the registered snapshot factories.
type SnapshotStore ¶ added in v0.18.0
type SnapshotStore interface { LoadSnapshot(ctx context.Context, id uuid.UUID) (*Snapshot, error) SaveSnapshot(ctx context.Context, id uuid.UUID, snapshot Snapshot) error }
SnapshotStore is an interface for snapshot store.
type SnapshotStrategy ¶ added in v0.18.0
type SnapshotStrategy interface { ShouldTakeSnapshot(lastSnapshotVersion int, lastSnapshotTimestamp time.Time, event Event) bool }
SnapshotStrategy determines if a snapshot should be taken or not.
type Snapshotable ¶ added in v0.18.0
Snapshotable is an interface for creating and applying a Snapshot record.
type Versionable ¶
type Versionable interface { // AggregateVersion returns the version of the item. AggregateVersion() int }
Versionable is an item that has a version number, used by version.ReadRepo and projector.EventHandler.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
aggregatestore
|
|
commandhandler
|
|
eventhandler
|
|
examples
|
|
guestlist/memory
Package memory contains an example of a CQRS/ES app using memory as DB.
|
Package memory contains an example of a CQRS/ES app using memory as DB. |
guestlist/mongodb
Package mongodb contains an example of a CQRS/ES app using the MongoDB adapter.
|
Package mongodb contains an example of a CQRS/ES app using the MongoDB adapter. |
guestlist/outbox
Package outbox contains an example of a CQRS/ES app using the outbox pattern.
|
Package outbox contains an example of a CQRS/ES app using the outbox pattern. |
hack
|
|
middleware
|
|
Package UUID provides an easy to replace UUID package.
|
Package UUID provides an easy to replace UUID package. |