eventstore

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2023 License: MIT Imports: 11 Imported by: 0

README

Go EventStore

Go Coverage Status Go Report Card

Embeddable EventStore implementation written in Go using gorm as an underlying persistence mechanism meaning it will work with almost (tested sqlite and postgres) whatever underlying database gorm will support (just use the respective gorm driver).

Features

  • Appending (saving) events to a particular stream
  • Reading events from the stream
  • Reading all events
  • Subscribing (streaming) all events from the event store (real-time)
  • Fault-tolerant projection system (Projector)

Upcoming

Add offset handling and retry mechanism to the default Projector.

Example

I provided a simple example that showcases basic usage with sqlite.

Documentation

Overview

Package eventstore provides a simple light-weight event store implementation that uses sqlite as a backing storage. Apart from the event store, mechanisms for building projections and working with aggregate roots are provided

Index

Constants

View Source
const (
	// InitialStreamVersion can be used as an initial expectedVer for
	// new streams (as an argument to AppendStream)
	InitialStreamVersion int = 0
)

Variables

View Source
var (
	// ErrStreamNotFound indicates that the requested stream does not exist in the event store
	ErrStreamNotFound = errors.New("stream not found")

	// ErrConcurrencyCheckFailed indicates that stream entry related to a particular version already exists
	ErrConcurrencyCheckFailed = errors.New("optimistic concurrency check failed: stream version exists")

	// ErrSubscriptionClosedByClient is produced by sub.Err if client cancels the subscription using sub.Close()
	ErrSubscriptionClosedByClient = errors.New("subscription closed by client")
)

Functions

This section is empty.

Types

type AggregateRoot

type AggregateRoot struct {
	// contains filtered or unexported fields
}

AggregateRoot represents reusable DDD Event Sourcing friendly Aggregate base type which provides helpers for easy aggregate initialization and event handler execution

func (*AggregateRoot) Apply

func (a *AggregateRoot) Apply(evts ...interface{}) error

Apply mutates aggregate (calls respective event handle) and appends event to internal slice so they can be retrieved with Events method In order for Apply to work the derived aggregate struct needs to implement an event handler method for all events it produces eg:

If it produces event of type: SomethingImportantHappened Derived aggregate should have the following method implemented: func (a *Aggr) OnSomethingImportantHappened(e SomethingImportantHappened) error

func (*AggregateRoot) Events

func (a *AggregateRoot) Events() []interface{}

Events returns uncommitted domain events (produced by calling Apply)

func (*AggregateRoot) Init

func (a *AggregateRoot) Init(aggrPtr interface{}, evts ...interface{}) error

Init is used to initialize aggregate (store pointer to the derived type) and/or initialize it with provided events (execute all event handlers)

func (*AggregateRoot) Version

func (a *AggregateRoot) Version() int

Version returns current version of the aggregate (incremented every time Apply is successfully called)

type AppendStreamConfig

type AppendStreamConfig struct {
	// contains filtered or unexported fields
}

AppendStreamConfig (configure using AppendStreamOpt)

type AppendStreamOpt

type AppendStreamOpt func(AppendStreamConfig) AppendStreamConfig

AppendStreamOpt represents append to stream option

func WithMetaData

func WithMetaData(meta map[string]string) AppendStreamOpt

WithMetaData is an AppendStream option that can be used to associate arbitrary meta data to a batch of events to store

type EncodedEvt

type EncodedEvt struct {
	Data string
	Type string
}

EncodedEvt represents encoded event used by a specific encoder implementation

type Encoder

type Encoder interface {
	Encode(interface{}) (*EncodedEvt, error)
	Decode(*EncodedEvt) (interface{}, error)
}

Encoder is used by the event store in order to correctly marshal and unmarshal event types

type EventData

type EventData struct {
	Event interface{}
	Meta  map[string]string
	Type  string
}

EventData holds stored event data and meta data

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore represents a sqlite event store implementation

func New

func New(dial gorm.Dialector, enc Encoder) (*EventStore, error)

New construct new event store dbname - a path to sqlite database on disk enc - a specific encoder implementation (see bundled JsonEncoder)

func (*EventStore) AppendStream

func (es *EventStore) AppendStream(
	ctx context.Context,
	stream string,
	expectedVer int,
	evts []interface{},
	opts ...AppendStreamOpt) error

AppendStream will encode provided event slice and try to append them to an indicated stream. If the stream does not exist it will be created. If the stream already exists an optimistic concurrency check will be performed using a compound key (stream-expectedVer). expectedVer should be InitialStreamVersion for new streams and the latest stream version for existing streams, otherwise a concurrency error will be raised

func (*EventStore) Close

func (es *EventStore) Close() error

Close should be called as a part of cleanup process in order to close the underlying sql connection

func (*EventStore) ReadAll

func (es *EventStore) ReadAll(ctx context.Context, opts ...SubAllOpt) ([]EventData, error)

ReadAll will read all events from the event store by internally creating a a subscription and depleting it until io.EOF is encountered WARNING: Use with caution as this method will read the entire event store in a blocking fashion (porbably best used in combination with offset option)

func (*EventStore) ReadStream

func (es *EventStore) ReadStream(ctx context.Context, stream string) ([]EventData, error)

ReadStream will read all events associated with provided stream If there are no events stored for a given stream ErrStreamNotFound will be returned

func (*EventStore) SubscribeAll

func (es *EventStore) SubscribeAll(ctx context.Context, opts ...SubAllOpt) (Subscription, error)

SubscribeAll will create a subscription which can be used to stream all events in an orderly fashion. This mechanism should probably be mostly useful for building projections

type EventStreamer

type EventStreamer interface {
	SubscribeAll(context.Context, ...SubAllOpt) (Subscription, error)
}

EventStreamer represents an event stream that can be subscribed to This package offers EventStore as EventStreamer implementation

type JsonEncoder

type JsonEncoder struct {
	// contains filtered or unexported fields
}

JsonEncoder provides default json Encoder implementation It will marshal and unmarshal events to/from json and store the type name

func NewJSONEncoder

func NewJSONEncoder(evts ...interface{}) *JsonEncoder

NewJSONEncoder constructs json encoder It receives a slice of event types it should be able to encode/decode

func (*JsonEncoder) Decode

func (e *JsonEncoder) Decode(evt *EncodedEvt) (interface{}, error)

Decode unmarshals incoming event to it's corresponding go type

func (*JsonEncoder) Encode

func (e *JsonEncoder) Encode(evt interface{}) (*EncodedEvt, error)

Encode marshals incoming event to it's json representation

type Projection

type Projection func(EventData) error

Projection represents a projection that should be able to handle projected events

func FlushAfter added in v0.2.0

func FlushAfter(
	p Projection,
	flush func() error,
	flushInt time.Duration) Projection

FlushAfter wraps the projection passed in and it calls the projection itself as new events come (as usual) in addition to calling the provided flush function periodically each time flush interval expires

type Projector

type Projector struct {
	// contains filtered or unexported fields
}

Projector is an event projector which will subscribe to an event stream (evet store) and project events to each individual projection in an asynchronous manner

func NewProjector

func NewProjector(s EventStreamer) *Projector

NewProjector constructs a Projector TODO Configure logger, pollInterval, and retry

func (*Projector) Add

func (p *Projector) Add(projections ...Projection)

Add effectively registers a projection with the projector Make sure to add all of your projections before calling Run

func (*Projector) Run

func (p *Projector) Run(ctx context.Context) error

Run will start the projector

type SubAllConfig

type SubAllConfig struct {
	// contains filtered or unexported fields
}

SubAllConfig (configure using SubAllOpt)

type SubAllOpt

type SubAllOpt func(SubAllConfig) SubAllConfig

SubAllOpt represents subscribe to all events option

func WithBatchSize

func WithBatchSize(size int) SubAllOpt

WithBatchSize is a subscription/read all option that specifies the read batch size (limit) when reading events from the event store

func WithOffset

func WithOffset(offset int) SubAllOpt

WithOffset is a subscription / read all option that indicates an offset in the event store from which to start reading events (exclusive)

func WithPollInterval added in v0.2.0

func WithPollInterval(d time.Duration) SubAllOpt

WithPollInterval is a subscription/read all option that specifies the poolling interval of the underlying sqlite database

type Subscription

type Subscription struct {
	// Err chan will produce any errors that might occur while reading events
	// If Err produces io.EOF error, that indicates that we have caught up
	// with the event store and that there are no more events to read after which
	// the subscription itself will continue polling the event store for new events
	// each time we empty the Err channel. This means that reading from Err (in
	// case of io.EOF) can be strategically used in order to achieve backpressure
	Err       chan error
	EventData chan EventData
	// contains filtered or unexported fields
}

Subscription represents ReadAll subscription that is used for streaming incoming events

func (Subscription) Close

func (s Subscription) Close()

Close closes the subscription and halts the polling of sqldb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL