Documentation ¶
Overview ¶
Package eventstore provides a simple light-weight event store implementation that uses sqlite as a backing storage. Apart from the event store, mechanisms for building projections and working with aggregate roots are provided
Index ¶
- Constants
- Variables
- type AggregateRoot
- type AppendStreamConfig
- type AppendStreamOpt
- type EncodedEvt
- type Encoder
- type EventData
- type EventStore
- func (es *EventStore) AppendStream(ctx context.Context, stream string, expectedVer int, evts []interface{}, ...) error
- func (es *EventStore) Close() error
- func (es *EventStore) ReadAll(ctx context.Context, opts ...SubAllOpt) ([]EventData, error)
- func (es *EventStore) ReadStream(ctx context.Context, stream string) ([]EventData, error)
- func (es *EventStore) SubscribeAll(ctx context.Context, opts ...SubAllOpt) (Subscription, error)
- type EventStreamer
- type JsonEncoder
- type Projection
- type Projector
- type SubAllConfig
- type SubAllOpt
- type Subscription
Constants ¶
const ( // InitialStreamVersion can be used as an initial expectedVer for // new streams (as an argument to AppendStream) InitialStreamVersion int = 0 )
Variables ¶
var ( // ErrStreamNotFound indicates that the requested stream does not exist in the event store ErrStreamNotFound = errors.New("stream not found") // ErrConcurrencyCheckFailed indicates that stream entry related to a particular version already exists ErrConcurrencyCheckFailed = errors.New("optimistic concurrency check failed: stream version exists") // ErrSubscriptionClosedByClient is produced by sub.Err if client cancels the subscription using sub.Close() ErrSubscriptionClosedByClient = errors.New("subscription closed by client") )
Functions ¶
This section is empty.
Types ¶
type AggregateRoot ¶
type AggregateRoot struct {
// contains filtered or unexported fields
}
AggregateRoot represents reusable DDD Event Sourcing friendly Aggregate base type which provides helpers for easy aggregate initialization and event handler execution
func (*AggregateRoot) Apply ¶
func (a *AggregateRoot) Apply(evts ...interface{}) error
Apply mutates aggregate (calls respective event handle) and appends event to internal slice so they can be retrieved with Events method In order for Apply to work the derived aggregate struct needs to implement an event handler method for all events it produces eg:
If it produces event of type: SomethingImportantHappened Derived aggregate should have the following method implemented: func (a *Aggr) OnSomethingImportantHappened(e SomethingImportantHappened) error
func (*AggregateRoot) Events ¶
func (a *AggregateRoot) Events() []interface{}
Events returns uncommitted domain events (produced by calling Apply)
func (*AggregateRoot) Init ¶
func (a *AggregateRoot) Init(aggrPtr interface{}, evts ...interface{}) error
Init is used to initialize aggregate (store pointer to the derived type) and/or initialize it with provided events (execute all event handlers)
func (*AggregateRoot) Version ¶
func (a *AggregateRoot) Version() int
Version returns current version of the aggregate (incremented every time Apply is successfully called)
type AppendStreamConfig ¶
type AppendStreamConfig struct {
// contains filtered or unexported fields
}
AppendStreamConfig (configure using AppendStreamOpt)
type AppendStreamOpt ¶
type AppendStreamOpt func(AppendStreamConfig) AppendStreamConfig
AppendStreamOpt represents append to stream option
func WithMetaData ¶
func WithMetaData(meta map[string]string) AppendStreamOpt
WithMetaData is an AppendStream option that can be used to associate arbitrary meta data to a batch of events to store
type EncodedEvt ¶
EncodedEvt represents encoded event used by a specific encoder implementation
type Encoder ¶
type Encoder interface { Encode(interface{}) (*EncodedEvt, error) Decode(*EncodedEvt) (interface{}, error) }
Encoder is used by the event store in order to correctly marshal and unmarshal event types
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore represents a sqlite event store implementation
func New ¶
func New(dial gorm.Dialector, enc Encoder) (*EventStore, error)
New construct new event store dbname - a path to sqlite database on disk enc - a specific encoder implementation (see bundled JsonEncoder)
func (*EventStore) AppendStream ¶
func (es *EventStore) AppendStream( ctx context.Context, stream string, expectedVer int, evts []interface{}, opts ...AppendStreamOpt) error
AppendStream will encode provided event slice and try to append them to an indicated stream. If the stream does not exist it will be created. If the stream already exists an optimistic concurrency check will be performed using a compound key (stream-expectedVer). expectedVer should be InitialStreamVersion for new streams and the latest stream version for existing streams, otherwise a concurrency error will be raised
func (*EventStore) Close ¶
func (es *EventStore) Close() error
Close should be called as a part of cleanup process in order to close the underlying sql connection
func (*EventStore) ReadAll ¶
ReadAll will read all events from the event store by internally creating a a subscription and depleting it until io.EOF is encountered WARNING: Use with caution as this method will read the entire event store in a blocking fashion (porbably best used in combination with offset option)
func (*EventStore) ReadStream ¶
ReadStream will read all events associated with provided stream If there are no events stored for a given stream ErrStreamNotFound will be returned
func (*EventStore) SubscribeAll ¶
func (es *EventStore) SubscribeAll(ctx context.Context, opts ...SubAllOpt) (Subscription, error)
SubscribeAll will create a subscription which can be used to stream all events in an orderly fashion. This mechanism should probably be mostly useful for building projections
type EventStreamer ¶
type EventStreamer interface {
SubscribeAll(context.Context, ...SubAllOpt) (Subscription, error)
}
EventStreamer represents an event stream that can be subscribed to This package offers EventStore as EventStreamer implementation
type JsonEncoder ¶
type JsonEncoder struct {
// contains filtered or unexported fields
}
JsonEncoder provides default json Encoder implementation It will marshal and unmarshal events to/from json and store the type name
func NewJSONEncoder ¶
func NewJSONEncoder(evts ...interface{}) *JsonEncoder
NewJSONEncoder constructs json encoder It receives a slice of event types it should be able to encode/decode
func (*JsonEncoder) Decode ¶
func (e *JsonEncoder) Decode(evt *EncodedEvt) (interface{}, error)
Decode unmarshals incoming event to it's corresponding go type
func (*JsonEncoder) Encode ¶
func (e *JsonEncoder) Encode(evt interface{}) (*EncodedEvt, error)
Encode marshals incoming event to it's json representation
type Projection ¶
Projection represents a projection that should be able to handle projected events
func FlushAfter ¶ added in v0.2.0
func FlushAfter( p Projection, flush func() error, flushInt time.Duration) Projection
FlushAfter wraps the projection passed in and it calls the projection itself as new events come (as usual) in addition to calling the provided flush function periodically each time flush interval expires
type Projector ¶
type Projector struct {
// contains filtered or unexported fields
}
Projector is an event projector which will subscribe to an event stream (evet store) and project events to each individual projection in an asynchronous manner
func NewProjector ¶
func NewProjector(s EventStreamer) *Projector
NewProjector constructs a Projector TODO Configure logger, pollInterval, and retry
func (*Projector) Add ¶
func (p *Projector) Add(projections ...Projection)
Add effectively registers a projection with the projector Make sure to add all of your projections before calling Run
type SubAllConfig ¶
type SubAllConfig struct {
// contains filtered or unexported fields
}
SubAllConfig (configure using SubAllOpt)
type SubAllOpt ¶
type SubAllOpt func(SubAllConfig) SubAllConfig
SubAllOpt represents subscribe to all events option
func WithBatchSize ¶
WithBatchSize is a subscription/read all option that specifies the read batch size (limit) when reading events from the event store
func WithOffset ¶
WithOffset is a subscription / read all option that indicates an offset in the event store from which to start reading events (exclusive)
func WithPollInterval ¶ added in v0.2.0
WithPollInterval is a subscription/read all option that specifies the poolling interval of the underlying sqlite database
type Subscription ¶
type Subscription struct { // Err chan will produce any errors that might occur while reading events // If Err produces io.EOF error, that indicates that we have caught up // with the event store and that there are no more events to read after which // the subscription itself will continue polling the event store for new events // each time we empty the Err channel. This means that reading from Err (in // case of io.EOF) can be strategically used in order to achieve backpressure Err chan error EventData chan EventData // contains filtered or unexported fields }
Subscription represents ReadAll subscription that is used for streaming incoming events
func (Subscription) Close ¶
func (s Subscription) Close()
Close closes the subscription and halts the polling of sqldb