Documentation
¶
Overview ¶
Package ycq provides a CQRS reference implementation.
The implementation follows as much as possible the classic reference implementation m-r by Greg Young.
The implmentation differs in a number of respects becasue the original is written in C# and uses Generics where generics are not available in Go. This implementation instead uses interfaces to deal with types in a generic manner and used delegate functions to instantiate specific types.
Index ¶
- func Int(i int) *int
- func NewUUID() string
- type AggregateBase
- func (a *AggregateBase) AggregateID() string
- func (a *AggregateBase) ClearChanges()
- func (a *AggregateBase) CurrentVersion() int
- func (a *AggregateBase) GetChanges() []EventMessage
- func (a *AggregateBase) IncrementVersion()
- func (a *AggregateBase) OriginalVersion() int
- func (a *AggregateBase) TrackChange(event EventMessage)
- type AggregateFactory
- type AggregateRoot
- type CommandDescriptor
- type CommandHandler
- type CommandHandlerBase
- type CommandMessage
- type DelegateAggregateFactory
- type DelegateEventFactory
- type DelegateStreamNamer
- type Dispatcher
- type DomainRepository
- type ErrAggregateNotFound
- type ErrCommandExecution
- type ErrConcurrencyViolation
- type ErrRepositoryUnavailable
- type ErrUnauthorized
- type ErrUnexpected
- type EventBus
- type EventDescriptor
- func (c *EventDescriptor) AggregateID() string
- func (c *EventDescriptor) Event() interface{}
- func (c *EventDescriptor) EventType() string
- func (c *EventDescriptor) GetHeaders() map[string]interface{}
- func (c *EventDescriptor) SetHeader(key string, value interface{})
- func (c *EventDescriptor) Version() *int
- type EventFactory
- type EventHandler
- type EventMessage
- type GetEventStoreCommonDomainRepo
- func (r *GetEventStoreCommonDomainRepo) Load(aggregateType, id string) (AggregateRoot, error)
- func (r *GetEventStoreCommonDomainRepo) Save(aggregate AggregateRoot, expectedVersion *int) error
- func (r *GetEventStoreCommonDomainRepo) SetAggregateFactory(factory AggregateFactory)
- func (r *GetEventStoreCommonDomainRepo) SetEventFactory(factory EventFactory)
- func (r *GetEventStoreCommonDomainRepo) SetStreamNameDelegate(delegate StreamNamer)
- type InMemoryDispatcher
- type InternalEventBus
- type StreamNamer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AggregateBase ¶
type AggregateBase struct {
// contains filtered or unexported fields
}
AggregateBase is a type that can be embedded in an AggregateRoot implementation to handle common aggragate behaviour
All required methods to implement an aggregate are here, to implement the Aggregate root interface your aggregate will need to implement the Apply method that will contain behaviour specific to your aggregate.
func NewAggregateBase ¶
func NewAggregateBase(id string) *AggregateBase
NewAggregateBase contructs a new AggregateBase.
func (*AggregateBase) AggregateID ¶
func (a *AggregateBase) AggregateID() string
AggregateID returns the AggregateID
func (*AggregateBase) ClearChanges ¶
func (a *AggregateBase) ClearChanges()
ClearChanges removes all unpersisted events from the aggregate.
func (*AggregateBase) CurrentVersion ¶
func (a *AggregateBase) CurrentVersion() int
CurrentVersion returns the version of the aggregate as it was when it was instantiated or loaded from the repository.
Importantly an aggregate with one event applied will be at version 0 this allows the aggregates to match the version in the eventstore where the first event will be version 0.
func (*AggregateBase) GetChanges ¶
func (a *AggregateBase) GetChanges() []EventMessage
GetChanges returns the collection of new unpersisted events that have been applied to the aggregate.
func (*AggregateBase) IncrementVersion ¶
func (a *AggregateBase) IncrementVersion()
IncrementVersion increments the aggregate version number by one.
func (*AggregateBase) OriginalVersion ¶
func (a *AggregateBase) OriginalVersion() int
OriginalVersion returns the version of the aggregate as it was when it was instantiated or loaded from the repository.
Importantly an aggregate with one event applied will be at version 0 this allows the aggregates to match the version in the eventstore where the first event will be version 0.
func (*AggregateBase) TrackChange ¶
func (a *AggregateBase) TrackChange(event EventMessage)
TrackChange stores the EventMessage in the changes collection.
Changes are new, unpersisted events that have been applied to the aggregate.
type AggregateFactory ¶
type AggregateFactory interface {
GetAggregate(string, string) AggregateRoot
}
AggregateFactory returns aggregate instances of a specified type with the AggregateID set to the uuid provided.
An aggregate factory is typically a dependency of the repository that will delegate instantiation of aggregate instances to the Aggregate factory.
type AggregateRoot ¶
type AggregateRoot interface { AggregateID() string OriginalVersion() int CurrentVersion() int IncrementVersion() Apply(events EventMessage, isNew bool) TrackChange(EventMessage) GetChanges() []EventMessage ClearChanges() }
AggregateRoot is the interface that all aggregates should implement
type CommandDescriptor ¶
type CommandDescriptor struct {
// contains filtered or unexported fields
}
CommandDescriptor is an implementation of the command message interface.
func NewCommandMessage ¶
func NewCommandMessage(aggregateID string, command interface{}) *CommandDescriptor
NewCommandMessage returns a new command descriptor
func (*CommandDescriptor) AggregateID ¶
func (c *CommandDescriptor) AggregateID() string
AggregateID returns the ID of the aggregate that the command relates to.
func (*CommandDescriptor) Command ¶
func (c *CommandDescriptor) Command() interface{}
Command returns the actual command payload of the message.
func (*CommandDescriptor) CommandType ¶
func (c *CommandDescriptor) CommandType() string
CommandType returns the command type name as a string
func (*CommandDescriptor) Headers ¶
func (c *CommandDescriptor) Headers() map[string]interface{}
Headers returns the collection of headers for the command.
func (*CommandDescriptor) SetHeader ¶
func (c *CommandDescriptor) SetHeader(key string, value interface{})
SetHeader sets the value of the header with the specified key
type CommandHandler ¶
type CommandHandler interface {
Handle(CommandMessage) error
}
CommandHandler is the interface that all command handlers should implement.
type CommandHandlerBase ¶
type CommandHandlerBase struct {
// contains filtered or unexported fields
}
CommandHandlerBase is an embedded type that supports chaining of command handlers through provision of a next field that will hold a reference to the next handler in the chain.
type CommandMessage ¶
type CommandMessage interface { // AggregateID returns the ID of the Aggregate that the command relates to AggregateID() string // Headers returns the key value collection of headers for the command. Headers() map[string]interface{} // SetHeader sets the value of the header specified by the key SetHeader(string, interface{}) // Command returns the actual command which is the payload of the command message. Command() interface{} // CommandType returns a string descriptor of the command name CommandType() string }
CommandMessage is the interface that a command message must implement.
type DelegateAggregateFactory ¶
type DelegateAggregateFactory struct {
// contains filtered or unexported fields
}
DelegateAggregateFactory is an implementation of the AggregateFactory interface that supports registration of delegate functions to perform aggregate instantiation.
func NewDelegateAggregateFactory ¶
func NewDelegateAggregateFactory() *DelegateAggregateFactory
NewDelegateAggregateFactory contructs a new DelegateAggregateFactory
func (*DelegateAggregateFactory) GetAggregate ¶
func (t *DelegateAggregateFactory) GetAggregate(typeName string, id string) AggregateRoot
GetAggregate calls the delegate for the type specified and returns the result.
func (*DelegateAggregateFactory) RegisterDelegate ¶
func (t *DelegateAggregateFactory) RegisterDelegate(aggregate AggregateRoot, delegate func(string) AggregateRoot) error
RegisterDelegate is used to register a new funtion for instantiation of an aggregate instance.
func(id string) AggregateRoot {return NewMyAggregateType(id)} func(id string) AggregateRoot { return &MyAggregateType{AggregateBase:NewAggregateBase(id)} }
type DelegateEventFactory ¶
type DelegateEventFactory struct {
// contains filtered or unexported fields
}
DelegateEventFactory uses delegate functions to instantiate event instances given the name of the event type as a string.
func NewDelegateEventFactory ¶
func NewDelegateEventFactory() *DelegateEventFactory
NewDelegateEventFactory constructs a new DelegateEventFactory
func (*DelegateEventFactory) GetEvent ¶
func (t *DelegateEventFactory) GetEvent(typeName string) interface{}
GetEvent returns an event instance given an event type as a string.
An appropriate delegate must be registered for the event type. If an appropriate delegate is not registered, the method will return nil.
func (*DelegateEventFactory) RegisterDelegate ¶
func (t *DelegateEventFactory) RegisterDelegate(event interface{}, delegate func() interface{}) error
RegisterDelegate registers a delegate that will return an event instance given an event type name as a string.
If an attempt is made to register multiple delegates for an event type, an error is returned.
type DelegateStreamNamer ¶
type DelegateStreamNamer struct {
// contains filtered or unexported fields
}
DelegateStreamNamer stores delegates per aggregate type allowing fine grained control of stream names for event streams.
func NewDelegateStreamNamer ¶
func NewDelegateStreamNamer() *DelegateStreamNamer
NewDelegateStreamNamer constructs a delegate stream namer
func (*DelegateStreamNamer) GetStreamName ¶
func (r *DelegateStreamNamer) GetStreamName(aggregateTypeName string, id string) (string, error)
GetStreamName gets the result of the stream name delgate registered for the aggregate type.
func (*DelegateStreamNamer) RegisterDelegate ¶
func (r *DelegateStreamNamer) RegisterDelegate(delegate func(string, string) string, aggregates ...AggregateRoot) error
RegisterDelegate allows registration of a stream name delegate function for the aggregates specified in the variadic aggregates argument.
type Dispatcher ¶
type Dispatcher interface { Dispatch(CommandMessage) error RegisterHandler(CommandHandler, ...interface{}) error }
Dispatcher is the interface that should be implemented by command dispatcher
The dispatcher is the mechanism through which commands are distributed to the appropriate command handler.
Command handlers are registered with the dispatcher for a given command type. It is good practice in CQRS to have only one command handler for a given command. When a command is passed to the dispatcher it will look for the registered command handler and call that handler's Handle method passing the command message as an argument.
Commands contained in a CommandMessage envelope are passed to the Dispatcher via the dispatch method.
type DomainRepository ¶
type DomainRepository interface { //Loads an aggregate of the given type and ID Load(aggregateTypeName string, aggregateID string) (AggregateRoot, error) //Saves the aggregate. Save(aggregate AggregateRoot, expectedVersion *int) error }
DomainRepository is the interface that all domain repositories should implement.
type ErrAggregateNotFound ¶
ErrAggregateNotFound error returned when an aggregate was not found in the repository.
func (*ErrAggregateNotFound) Error ¶
func (e *ErrAggregateNotFound) Error() string
type ErrCommandExecution ¶
type ErrCommandExecution struct { Command CommandMessage Reason string }
ErrCommandExecution is the error returned in response to a failed command.
func (*ErrCommandExecution) Error ¶
func (e *ErrCommandExecution) Error() string
Error fulfills the error interface.
type ErrConcurrencyViolation ¶
type ErrConcurrencyViolation struct { Aggregate AggregateRoot ExpectedVersion *int StreamName string }
ErrConcurrencyViolation is returned when a concurrency error is raised by the event store when events are persisted to a stream and the version of the stream does not match the expected version.
func (*ErrConcurrencyViolation) Error ¶
func (e *ErrConcurrencyViolation) Error() string
type ErrRepositoryUnavailable ¶
type ErrRepositoryUnavailable struct{}
ErrRepositoryUnavailable is returned when the eventstore is temporarily unavailable
func (*ErrRepositoryUnavailable) Error ¶
func (e *ErrRepositoryUnavailable) Error() string
type ErrUnauthorized ¶
type ErrUnauthorized struct { }
ErrUnauthorized is returned when a request to the repository is not authorized
func (*ErrUnauthorized) Error ¶
func (e *ErrUnauthorized) Error() string
type ErrUnexpected ¶
type ErrUnexpected struct {
Err error
}
ErrUnexpected is returned for all errors that are not otherwise represented explicitly.
The original error is available for inspection in the Err field.
func (*ErrUnexpected) Error ¶
func (e *ErrUnexpected) Error() string
type EventBus ¶
type EventBus interface { PublishEvent(EventMessage) AddHandler(EventHandler, ...interface{}) }
EventBus is the inteface that an event bus must implement.
type EventDescriptor ¶
type EventDescriptor struct {
// contains filtered or unexported fields
}
EventDescriptor is an implementation of the event message interface.
func NewEventMessage ¶
func NewEventMessage(aggregateID string, event interface{}, version *int) *EventDescriptor
NewEventMessage returns a new event descriptor
func (*EventDescriptor) AggregateID ¶
func (c *EventDescriptor) AggregateID() string
AggregateID returns the ID of the Aggregate that the event relates to.
func (*EventDescriptor) Event ¶
func (c *EventDescriptor) Event() interface{}
Event the event payload of the event message
func (*EventDescriptor) EventType ¶
func (c *EventDescriptor) EventType() string
EventType returns the name of the event type as a string.
func (*EventDescriptor) GetHeaders ¶
func (c *EventDescriptor) GetHeaders() map[string]interface{}
GetHeaders returns the headers for the event.
func (*EventDescriptor) SetHeader ¶
func (c *EventDescriptor) SetHeader(key string, value interface{})
SetHeader sets the value of the header specified by the key
func (*EventDescriptor) Version ¶
func (c *EventDescriptor) Version() *int
Version returns the version of the event
type EventFactory ¶
type EventFactory interface {
GetEvent(string) interface{}
}
EventFactory is the interface that an event factory should implement.
An event factory returns instances of an event given the event type as a string. An event factory is required during deserialisation of events by the eventstore or repository depending on your implementation.
The eventstore will return a string describing the event type. To unmarshal the contents of the persisted event which will typically be in some serialised format such as JSON an instance of the event type will need to be created.
type EventHandler ¶
type EventHandler interface {
Handle(EventMessage)
}
type EventMessage ¶
type EventMessage interface { // AggregateID returns the ID of the Aggregate that the event relates to AggregateID() string // GetHeaders returns the key value collection of headers for the event. // // Headers are metadata about the event that do not form part of the // actual event but are still required to be persisted alongside the event. GetHeaders() map[string]interface{} // SetHeader sets the value of the header specified by the key SetHeader(string, interface{}) // Returns the actual event which is the payload of the event message. Event() interface{} // EventType returns a string descriptor of the command name EventType() string // Version returns the version of the event Version() *int }
EventMessage is the interface that a command must implement.
type GetEventStoreCommonDomainRepo ¶
type GetEventStoreCommonDomainRepo struct {
// contains filtered or unexported fields
}
GetEventStoreCommonDomainRepo is an implementation of the DomainRepository that uses GetEventStore for persistence
func NewCommonDomainRepository ¶
func NewCommonDomainRepository(eventStore *goes.Client, eventBus EventBus) (*GetEventStoreCommonDomainRepo, error)
NewCommonDomainRepository constructs a new CommonDomainRepository
func (*GetEventStoreCommonDomainRepo) Load ¶
func (r *GetEventStoreCommonDomainRepo) Load(aggregateType, id string) (AggregateRoot, error)
Load will load all events from a stream and apply those events to an aggregate of the type specified.
The aggregate type and id will be passed to the configured StreamNamer to get the stream name.
func (*GetEventStoreCommonDomainRepo) Save ¶
func (r *GetEventStoreCommonDomainRepo) Save(aggregate AggregateRoot, expectedVersion *int) error
Save persists an aggregate
func (*GetEventStoreCommonDomainRepo) SetAggregateFactory ¶
func (r *GetEventStoreCommonDomainRepo) SetAggregateFactory(factory AggregateFactory)
SetAggregateFactory sets the aggregate factory that should be used to instantate aggregate instances
Only one AggregateFactory can be registered at any one time. Any registration will overwrite the provious registration.
func (*GetEventStoreCommonDomainRepo) SetEventFactory ¶
func (r *GetEventStoreCommonDomainRepo) SetEventFactory(factory EventFactory)
SetEventFactory sets the event factory that should be used to instantiate event instances.
Only one event factory can be set at a time. Any subsequent registration will overwrite the previous factory.
func (*GetEventStoreCommonDomainRepo) SetStreamNameDelegate ¶
func (r *GetEventStoreCommonDomainRepo) SetStreamNameDelegate(delegate StreamNamer)
SetStreamNameDelegate sets the stream name delegate
type InMemoryDispatcher ¶
type InMemoryDispatcher struct {
// contains filtered or unexported fields
}
InMemoryDispatcher provides a lightweight and performant in process dispatcher
func NewInMemoryDispatcher ¶
func NewInMemoryDispatcher() *InMemoryDispatcher
NewInMemoryDispatcher constructs a new in memory dispatcher
func (*InMemoryDispatcher) Dispatch ¶
func (b *InMemoryDispatcher) Dispatch(command CommandMessage) error
Dispatch passes the CommandMessage on to all registered command handlers.
func (*InMemoryDispatcher) RegisterHandler ¶
func (b *InMemoryDispatcher) RegisterHandler(handler CommandHandler, commands ...interface{}) error
RegisterHandler registers a command handler for the command types specified by the variadic commands parameter.
type InternalEventBus ¶
type InternalEventBus struct {
// contains filtered or unexported fields
}
InternalEventBus provides a lightweight in process event bus
func NewInternalEventBus ¶
func NewInternalEventBus() *InternalEventBus
NewInternalEventBus constructs a new InternalEventBus
func (*InternalEventBus) AddHandler ¶
func (b *InternalEventBus) AddHandler(handler EventHandler, events ...interface{})
AddHandler registers an event handler for all of the events specified in the variadic events parameter.
func (*InternalEventBus) PublishEvent ¶
func (b *InternalEventBus) PublishEvent(event EventMessage)
PublishEvent publishes events to all registered event handlers