cqrs

package module
v4.0.0-beta.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2019 License: MIT Imports: 8 Imported by: 0

README

go-cqrs

Build Status GoDoc GoCover GoReportCard

CQRS/es implementation in go

Aggregates

todo

Commands

todo

Events

todo

Event store and streams

todo

Factories

todo

Aggregate repository

todo

Projections

todo

Middleware for command handler

todo

Event publishing

todo

Examples

Examples are in the _example directory.

Inventory example This is the GO version of the C# Simple CQRS example from Gregory Young.

Passing events by value not by pointer reference

Events are always passed by value, never passed by pointer reference. This is to ensure immutability of the events data.

  • The Load method of the aggregate repository will always convert pointer events and pass them by value to the aggregates Apply function.
  • The Save method of the aggregate repository will convert any pointer referenced events and pass them by value to the aggregate Apply method and to the publish event hooks (PublishEventFunc)

Always refer to the class name of an event in your aggregates and projections. Never to the pointer variant, it will probably never be picked up

EventFactory

The event factory must always return a pointer to the newly created event. This is of the event stream needs to scan/unmarshal the data into the event instance and can only do this for pointer instances. Later on the newly created event will be passed by value to the aggregates and or projections.

Todo
  • Test the domain aggregate repository.
  • More real world examples.
  • See how it performs in my projects.
  • Documentation documentation documentation.
  • Write test for Snapshot repository
  • Create Mysql and Postgres variant for SnapshotStore

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRepositoryNotFound = errors.New("repository not found")
)
View Source
var ErrUnknownCommand = errors.New("Cannot handle unknown Command")
View Source
var ErrUnknownEvent = errors.New("Cannot handle unknown Event")

Functions

func AggregateCommandHandler

func AggregateCommandHandler(repository AggregateRepository) commandbus.CommandHandler

AggregateCommandHandler is a command handler middleware who loads the aggregate calls the aggregate command handler to execute the business logic and saves the events to the aggregate store afterwards.

func AggregateCommandHandlerCallback

func AggregateCommandHandlerCallback(repository AggregateRepository, handler AggregateCommandHandlerFunc) commandbus.CommandHandler

func EventbusWrapper

func EventbusWrapper(handler EventHandler) eventbus.EventHandlerFunc

Types

type Aggregate

type Aggregate interface {
	AggregateContext

	// AggregateName returns the type name of the aggregate.
	AggregateName() string

	// Apply applies an event to the aggregate by setting its values.
	Apply(Event) error
}

type AggregateBuilder

type AggregateBuilder func(aggregateId AggregateId) (Aggregate, error)

AggregateBuilder is the builder function to create new aggregate compositions. This could be used to introduce new strategies how to build a aggregate like the snapshot implementation

func DefaultAggregateBuilder

func DefaultAggregateBuilder(factory AggregateFactoryFunc) AggregateBuilder

func SnapshotAggregateBuilder

func SnapshotAggregateBuilder(factory AggregateFactoryFunc, snapshotStore SnapshotStore) AggregateBuilder

SnapshotAggregateBuilder

type AggregateCommand

type AggregateCommand interface {
	Command
	AggregateName() string
}

type AggregateCommandHandlerFunc

type AggregateCommandHandlerFunc func(aggregate Aggregate, command Command) error

type AggregateComposition

type AggregateComposition interface {
	Context() AggregateContext
	Aggregate() Aggregate
}

type AggregateContext

type AggregateContext interface {
	// AggregateId returns the id of the aggregate.
	AggregateId() AggregateId

	// Version returns the version of the aggregate.
	Version() int

	// OriginalVersion returns the version of the aggregate without the current event modifications.
	OriginalVersion() int

	// StoreEvent stores an event as uncommitted event.
	StoreEvent(EventData)
	// contains filtered or unexported methods
}

func NewAggregateContext

func NewAggregateContext(id AggregateId, version int) AggregateContext

type AggregateFactory

type AggregateFactory interface {
	//MakeAggregate will return a clean Aggregate based on the type provided
	MakeAggregate(string, AggregateContext) Aggregate
}

AggregateFactory returns aggregate instances of a specified type with the AggregateId set to the uuid provided.

type AggregateFactoryFunc

type AggregateFactoryFunc func(AggregateContext) Aggregate

type AggregateHandlesCommands

type AggregateHandlesCommands interface {
	Aggregate

	// HandleCommand handles a command and stores events.
	HandleCommand(Command) error
}

AggregateHandlesCommands indicates a aggregate can directly handle a command

type AggregateId

type AggregateId = uuid.UUID

type AggregateRepository

type AggregateRepository interface {
	//Loads an aggregate based on the aggregate ID
	Load(aggregateId AggregateId) (Aggregate, error)

	//Saves the aggregate.
	Save(aggregate Aggregate) error
}

AggregateRepository is the interface that a specific aggregate repositories should implement.

func NewAggregateRepository

func NewAggregateRepository(
	eventStore EventStore,
	aggregateBuilder AggregateBuilder,
	eventFactory EventFactory) AggregateRepository

NewAggregateRepository is the constructor of the repository

publishEventHooks get called when a new event is successfully persisted to the eventstore. This is very useful to wire it to an eventbus for publishing the event to other listeners (projections)

func NewSnapshotAggregateRepository

func NewSnapshotAggregateRepository(
	snapshotStore SnapshotStore,
	differenceOffset int,
	aggregateRepository AggregateRepository) AggregateRepository

NewSnapshotAggregateRepository is the constructor of the aggregate repository with snapshot functionality A snapshot will be created when the differenceOffset between the snapshot version and the current version is equal or greater than the `differenceOffset`

When the differenceOffset is set to 10 than: - aggregate version 7 (snapshot version 0) will not create a snapshot - aggregate version 10 (snapshot version 0) will create a snapshot for version 10 - aggregate version 13 (snapshot version 0) will create a snapshot for version 13 - aggregate version 21 (snapshot version 13) will not create a snapshot - aggregate version 54 (snapshot version 13) will create a snapshot for version 54

type AggregateRepositoryManager

type AggregateRepositoryManager interface {
	//RepositoryFor will return the repository for the specific named aggregate
	RepositoryFor(aggregateName string) (AggregateRepository, error)

	//Loads an aggregate based on the aggregate ID and aggregateName
	Load(aggregateName string, aggregateId AggregateId) (Aggregate, error)

	//Saves the aggregate.
	Save(aggregate Aggregate) error
}

AggregateRepositoryManager is the managing interface who provide aggregate repository access

type CallbackAggregateFactory

type CallbackAggregateFactory struct {
	// contains filtered or unexported fields
}

CallbackAggregateFactory is an implementation of the AggregateFactory interface that supports registration of delegate/callback functions to perform aggregate instantiation.

func NewCallbackAggregateFactory

func NewCallbackAggregateFactory() *CallbackAggregateFactory

NewCallbackAggregateFactory creates a new CallbackAggregateFactory

func (*CallbackAggregateFactory) MakeAggregate

func (t *CallbackAggregateFactory) MakeAggregate(typeName string, ctx AggregateContext) Aggregate

MakeAggregate calls the callback for the specified type and returns the result.

func (*CallbackAggregateFactory) RegisterCallback

func (t *CallbackAggregateFactory) RegisterCallback(callback AggregateFactoryFunc) error

RegisterCallback is used to register a new function for instantiation of an aggregate instance.

type CallbackEventFactory

type CallbackEventFactory struct {
	// contains filtered or unexported fields
}

CallbackEventFactory uses callback/delegate functions to instantiate event instances given the name of the event type as a string.

func NewCallbackEventFactory

func NewCallbackEventFactory() *CallbackEventFactory

NewCallbackEventFactory constructs a new CallbackEventFactory

func (*CallbackEventFactory) MakeEvent

func (t *CallbackEventFactory) MakeEvent(eventType EventType) EventData

MakeEvent returns an event instance given an event type as a string.

An appropriate delegate must be registered for the event type. If an appropriate delegate is not registered, the method will return nil.

func (*CallbackEventFactory) RegisterCallback

func (t *CallbackEventFactory) RegisterCallback(callback EventFactoryFunc) error

RegisterCallback registers a delegate that will return an event instance given an event type name as a string.

type Command

type Command interface {
	CommandName() string
	AggregateId() AggregateId
}

type DomainAggregateRepository

type DomainAggregateRepository struct {
	// contains filtered or unexported fields
}

func NewCommonDomainRepository

func NewCommonDomainRepository(eventStore EventStore, eventFactory EventFactory, aggregateFactory AggregateFactory) *DomainAggregateRepository

NewRepository instantiates a new repository resolver who accepts a stream resolver

func (*DomainAggregateRepository) Load

func (r *DomainAggregateRepository) Load(aggregateName string, aggregateId AggregateId) (Aggregate, error)

Loads an aggregate of the given type and ID

func (*DomainAggregateRepository) RepositoryFor

func (r *DomainAggregateRepository) RepositoryFor(aggregateName string) (AggregateRepository, error)

func (*DomainAggregateRepository) Save

func (r *DomainAggregateRepository) Save(aggregate Aggregate) error

Save will save all the events to the event store.

type ErrorAggregateCannotHandleCommand

type ErrorAggregateCannotHandleCommand string

func (ErrorAggregateCannotHandleCommand) Error

type ErrorAggregateFactoryAlreadyRegistered

type ErrorAggregateFactoryAlreadyRegistered string

func (ErrorAggregateFactoryAlreadyRegistered) Error

type ErrorAggregateNotFound

type ErrorAggregateNotFound string

func (ErrorAggregateNotFound) Error

func (e ErrorAggregateNotFound) Error() string

type ErrorEventFactoryAlreadyRegistered

type ErrorEventFactoryAlreadyRegistered string

func (ErrorEventFactoryAlreadyRegistered) Error

type ErrorEventFactoryNotReturningPointer

type ErrorEventFactoryNotReturningPointer string

func (ErrorEventFactoryNotReturningPointer) Error

type ErrorNotAnAggregateCommand

type ErrorNotAnAggregateCommand string

func (ErrorNotAnAggregateCommand) Error

type Event

type Event interface {
	EventData
	Version() int
	Timestamp() time.Time
	Data() interface{}
}

Event

func NewEvent

func NewEvent(version int, timestamp time.Time, data EventData) Event

NewEvent constructor with plain version

func NewEventFromAggregate

func NewEventFromAggregate(aggregate AggregateContext, data EventData) Event

NewEventFromAggregate constructor will create a new event based on the latest aggregate state

type EventData

type EventData interface {
	AggregateId() AggregateId
	EventType() EventType
}

EventData is the actual data of the event

type EventFactory

type EventFactory interface {
	MakeEvent(EventType) EventData
}

EventFactory is the interface that an event store should implement. An event factory returns instances of an event given the event type as a string.

type EventFactoryFunc

type EventFactoryFunc func() EventData

EventFactoryFunc should create an Event and return the pointer to the instance.

type EventHandler

type EventHandler interface {
	Handle(Event) error
}

type EventHandlerFunc

type EventHandlerFunc func(event Event) error

func (EventHandlerFunc) Handle

func (h EventHandlerFunc) Handle(event Event) error

type EventPublisher

type EventPublisher interface {
	Publish(...Event) error
}

type EventPublisherFunc

type EventPublisherFunc func(event ...Event) error

func (EventPublisherFunc) Publish

func (h EventPublisherFunc) Publish(event ...Event) error

type EventStore

type EventStore interface {
	LoadStream(aggregateName string, aggregateId AggregateId, version int) (EventStream, error)
	WriteEvent(string, ...Event) error
}

func NewEventPublishingEventStore

func NewEventPublishingEventStore(publisher EventPublisher, store EventStore) EventStore

type EventStream

type EventStream interface {
	EventType() EventType
	AggregateId() AggregateId
	Version() int
	Timestamp() time.Time

	Next() bool
	Error() error
	Scan(EventData) error
}

type EventType

type EventType = eventbus.EventType

type SnapshotStore

type SnapshotStore interface {
	Load(aggregateId AggregateId, aggregate Aggregate) (int, error)
	Write(aggregate Aggregate) error
}

type Validate

type Validate interface {
	Validate() error
}

Validate is the interface an aggregate command can implement to perform validation prior to executing domain logic

Directories

Path Synopsis
_example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL