es

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2021 License: Apache-2.0 Imports: 11 Imported by: 1

Documentation

Overview

Package es provides event store structures and abstractions.

Index

Constants

This section is empty.

Variables

View Source
var File_event_proto protoreflect.FileDescriptor

Functions

This section is empty.

Types

type Aggregate

type Aggregate interface {
	Apply(e *Event) error
	SetBase(base *AggregateBase)
	AggBase() *AggregateBase
	Reset()
}

Aggregate is an interface used for the aggregate models

type AggregateBase

type AggregateBase struct {
	// contains filtered or unexported fields
}

AggregateBase is the base of the aggregate which

func (*AggregateBase) CommittedEvents

func (a *AggregateBase) CommittedEvents() []*Event

CommittedEvents gets the committed event messages.

func (*AggregateBase) DecodeEventAs

func (a *AggregateBase) DecodeEventAs(eventData []byte, eventMsg interface{}) error

DecodeEventAs decodes provided in put eventData into the structure of eventMsg. THe eventMsg is expected to be a pointer to the event msg.

func (*AggregateBase) ID

func (a *AggregateBase) ID() string

ID gets the aggregate identifier.

func (*AggregateBase) LatestCommittedEvent

func (a *AggregateBase) LatestCommittedEvent() (*Event, bool)

LatestCommittedEvent gets the latest committed event message.

func (*AggregateBase) MarkEventsCommitted

func (a *AggregateBase) MarkEventsCommitted()

MarkEventsCommitted marks the aggregate events as committed. NOTE: Use this function carefully, as the event store wouldn't try to commit events, already marked as committed.

func (*AggregateBase) MustLatestCommittedEvent

func (a *AggregateBase) MustLatestCommittedEvent() *Event

MustLatestCommittedEvent gets the latest committed event message or panics.

func (*AggregateBase) NewEvent added in v0.0.22

func (a *AggregateBase) NewEvent(id string, msg EventMessage, timestamp time.Time) (*Event, error)

NewEvent creates a new event with given identifier, at given timestamp and with given message. An event is added to uncommitted events of the aggregate base. NOTE: Created event is not applied to given aggregate base.

func (*AggregateBase) Revision

func (a *AggregateBase) Revision() int64

Revision gets aggregate current revision.

func (*AggregateBase) SetEvent

func (a *AggregateBase) SetEvent(eventMsg EventMessage) error

SetEvent sets new event message into given aggregate.

func (*AggregateBase) SetID

func (a *AggregateBase) SetID(id string)

SetID sets aggregate id.

func (*AggregateBase) Timestamp

func (a *AggregateBase) Timestamp() int64

Timestamp gets the aggregate base timestamp.

func (*AggregateBase) Type

func (a *AggregateBase) Type() string

Type gets the aggregate type.

func (*AggregateBase) UncommittedEvents added in v0.0.17

func (a *AggregateBase) UncommittedEvents() []*Event

UncommittedEvents gets the slice of uncommitted event messages.

func (*AggregateBase) Version

func (a *AggregateBase) Version() int64

Version gets aggregate version.

type AggregateBaseSetter

type AggregateBaseSetter struct {
	// contains filtered or unexported fields
}

AggregateBaseSetter is a structure responsible for setting the aggregate base.

func NewAggregateBaseSetter

func NewAggregateBaseSetter(eventCodec, snapCodec codec.Codec, idGen IdGenerator) *AggregateBaseSetter

NewAggregateBaseSetter creates new aggregate setter.

func (*AggregateBaseSetter) SetAggregateBase

func (a *AggregateBaseSetter) SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)

SetAggregateBase implements AggregateBaseSetter interface.

type Config

type Config struct {
	BufferSize int
}

Config is the configuration for the eventsource storage.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig sets up the default config for the event store.

func (*Config) Validate

func (c *Config) Validate() error

type Event

type Event struct {
	EventId       string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
	EventType     string `protobuf:"bytes,2,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
	AggregateType string `protobuf:"bytes,3,opt,name=aggregate_type,json=aggregateType,proto3" json:"aggregate_type,omitempty"`
	AggregateId   string `protobuf:"bytes,4,opt,name=aggregate_id,json=aggregateId,proto3" json:"aggregate_id,omitempty"`
	EventData     []byte `protobuf:"bytes,5,opt,name=event_data,json=eventData,proto3" json:"event_data,omitempty"`
	Timestamp     int64  `protobuf:"varint,6,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	Revision      int64  `protobuf:"varint,7,opt,name=revision,proto3" json:"revision,omitempty"`
	// contains filtered or unexported fields
}

Event is the event source message model.

func (*Event) Copy

func (x *Event) Copy() *Event

Copy creates a copy of given event.

func (*Event) Descriptor deprecated

func (*Event) Descriptor() ([]byte, []int)

Deprecated: Use Event.ProtoReflect.Descriptor instead.

func (*Event) GetAggregateId

func (x *Event) GetAggregateId() string

func (*Event) GetAggregateType

func (x *Event) GetAggregateType() string

func (*Event) GetEventData

func (x *Event) GetEventData() []byte

func (*Event) GetEventId

func (x *Event) GetEventId() string

func (*Event) GetEventType

func (x *Event) GetEventType() string

func (*Event) GetRevision

func (x *Event) GetRevision() int64

func (*Event) GetTimestamp

func (x *Event) GetTimestamp() int64

func (*Event) ProtoMessage

func (*Event) ProtoMessage()

func (*Event) ProtoReflect

func (x *Event) ProtoReflect() protoreflect.Message

func (*Event) Reset

func (x *Event) Reset()

func (*Event) String

func (x *Event) String() string

func (*Event) Time

func (x *Event) Time() time.Time

type EventCodec

type EventCodec codec.Codec

EventCodec is the type wrapper over the codec.Codec used for event encoding in wire injection.

type EventHandlingFailed added in v0.0.20

type EventHandlingFailed struct {
	HandlerName string `protobuf:"bytes,1,opt,name=handler_name,json=handlerName,proto3" json:"handler_name,omitempty"`
	Err         string `protobuf:"bytes,2,opt,name=err,proto3" json:"err,omitempty"`
	ErrCode     int32  `protobuf:"varint,3,opt,name=err_code,json=errCode,proto3" json:"err_code,omitempty"`
	// contains filtered or unexported fields
}

EventHandlingFailed is an event message occurred on a failure when handling given event.

func (*EventHandlingFailed) Descriptor deprecated added in v0.0.20

func (*EventHandlingFailed) Descriptor() ([]byte, []int)

Deprecated: Use EventHandlingFailed.ProtoReflect.Descriptor instead.

func (*EventHandlingFailed) GetErr added in v0.0.20

func (x *EventHandlingFailed) GetErr() string

func (*EventHandlingFailed) GetErrCode added in v0.0.20

func (x *EventHandlingFailed) GetErrCode() int32

func (*EventHandlingFailed) GetHandlerName added in v0.0.20

func (x *EventHandlingFailed) GetHandlerName() string

func (*EventHandlingFailed) ProtoMessage added in v0.0.20

func (*EventHandlingFailed) ProtoMessage()

func (*EventHandlingFailed) ProtoReflect added in v0.0.20

func (x *EventHandlingFailed) ProtoReflect() protoreflect.Message

func (*EventHandlingFailed) Reset added in v0.0.20

func (x *EventHandlingFailed) Reset()

func (*EventHandlingFailed) String added in v0.0.20

func (x *EventHandlingFailed) String() string

type EventHandlingFinished added in v0.0.20

type EventHandlingFinished struct {
	HandlerName string `protobuf:"bytes,1,opt,name=handler_name,json=handlerName,proto3" json:"handler_name,omitempty"`
	// contains filtered or unexported fields
}

EventHandlingFinished is an event message occurred when given handler just finished successfully handling an event.

func (*EventHandlingFinished) Descriptor deprecated added in v0.0.20

func (*EventHandlingFinished) Descriptor() ([]byte, []int)

Deprecated: Use EventHandlingFinished.ProtoReflect.Descriptor instead.

func (*EventHandlingFinished) GetHandlerName added in v0.0.20

func (x *EventHandlingFinished) GetHandlerName() string

func (*EventHandlingFinished) ProtoMessage added in v0.0.20

func (*EventHandlingFinished) ProtoMessage()

func (*EventHandlingFinished) ProtoReflect added in v0.0.20

func (x *EventHandlingFinished) ProtoReflect() protoreflect.Message

func (*EventHandlingFinished) Reset added in v0.0.20

func (x *EventHandlingFinished) Reset()

func (*EventHandlingFinished) String added in v0.0.20

func (x *EventHandlingFinished) String() string

type EventHandlingStarted added in v0.0.20

type EventHandlingStarted struct {
	HandlerName string `protobuf:"bytes,1,opt,name=handler_name,json=handlerName,proto3" json:"handler_name,omitempty"`
	// contains filtered or unexported fields
}

EventHandlingStarted is an event message occurred when given handler just started handling an event.

func (*EventHandlingStarted) Descriptor deprecated added in v0.0.20

func (*EventHandlingStarted) Descriptor() ([]byte, []int)

Deprecated: Use EventHandlingStarted.ProtoReflect.Descriptor instead.

func (*EventHandlingStarted) GetHandlerName added in v0.0.20

func (x *EventHandlingStarted) GetHandlerName() string

func (*EventHandlingStarted) ProtoMessage added in v0.0.20

func (*EventHandlingStarted) ProtoMessage()

func (*EventHandlingStarted) ProtoReflect added in v0.0.20

func (x *EventHandlingStarted) ProtoReflect() protoreflect.Message

func (*EventHandlingStarted) Reset added in v0.0.20

func (x *EventHandlingStarted) Reset()

func (*EventHandlingStarted) String added in v0.0.20

func (x *EventHandlingStarted) String() string

type EventMessage

type EventMessage interface {
	MessageType() string
}

EventMessage is an interface that defines event messages.

type EventStore

type EventStore interface {
	// LoadEvents loads all events for given aggregate.
	LoadEvents(ctx context.Context, aggregate Aggregate) error
	// LoadEventsWithSnapshot loads the latest snapshot with the events that happened after it.
	LoadEventsWithSnapshot(ctx context.Context, aggregate Aggregate) error
	// Commit commits the event changes done in given aggregate.
	Commit(ctx context.Context, aggregate Aggregate) error
	// SaveSnapshot saves the snapshot of given aggregate.
	SaveSnapshot(ctx context.Context, aggregate Aggregate) error
	// StreamEvents opens stream events that matches given request.
	StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)
	// SetAggregateBase sets the AggregateBase within an aggregate.
	SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)
}

EventStore is an interface used by the event store to load, commit and create snapshot on aggregates.

type EventUnhandled added in v0.0.20

type EventUnhandled struct {
	// contains filtered or unexported fields
}

EventUnhandled is an event message which states that an event is marked as unhandled.

func (*EventUnhandled) Descriptor deprecated added in v0.0.20

func (*EventUnhandled) Descriptor() ([]byte, []int)

Deprecated: Use EventUnhandled.ProtoReflect.Descriptor instead.

func (*EventUnhandled) ProtoMessage added in v0.0.20

func (*EventUnhandled) ProtoMessage()

func (*EventUnhandled) ProtoReflect added in v0.0.20

func (x *EventUnhandled) ProtoReflect() protoreflect.Message

func (*EventUnhandled) Reset added in v0.0.20

func (x *EventUnhandled) Reset()

func (*EventUnhandled) String added in v0.0.20

func (x *EventUnhandled) String() string

type IdGenerator

type IdGenerator interface {
	GenerateId() string
}

IdGenerator is the interface used by identity generators.

type Snapshot

type Snapshot struct {
	AggregateId      string `json:"aggregate_id,omitempty"`
	AggregateType    string `json:"aggregate_type,omitempty"`
	AggregateVersion int64  `json:"aggregate_version,omitempty"`
	Revision         int64  `json:"revision,omitempty"`
	Timestamp        int64  `json:"timestamp,omitempty"`
	SnapshotData     []byte `json:"snapshot_data,omitempty"`
}

Snapshot is a structure that define basic fields of the aggregate snapshot.

type SnapshotCodec

type SnapshotCodec codec.Codec

SnapshotCodec is the type wrapper over the codec.Codec used for wire injection.

type Storage

type Storage interface {
	BeginTx(ctx context.Context) (TxStorage, error)
	StorageBase
}

Storage is a transaction beginner.

type StorageBase added in v0.0.18

type StorageBase interface {
	// SaveEvents all input events atomically in the storage.
	SaveEvents(ctx context.Context, es []*Event) error
	// ListEvents lists all events for given aggregate type with given id.
	ListEvents(ctx context.Context, aggId string, aggType string) ([]*Event, error)
	// SaveSnapshot stores a snapshot.
	SaveSnapshot(ctx context.Context, snap *Snapshot) error
	// GetSnapshot gets the snapshot of the aggregate with it's id, type and version.
	GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*Snapshot, error)
	// ListEventsAfterRevision gets the event stream for given aggregate id, type starting after given revision.
	ListEventsAfterRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*Event, error)
	// StreamEvents streams the events that matching given request.
	StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)
	// As allows drivers to expose driver-specific types.
	As(dst interface{}) error
	// ErrorCode gets the error code from the storage.
	ErrorCode(err error) cgerrors.ErrorCode
}

StorageBase is the interface used by the event store as a storage for its events and snapshots.

type Store

type Store struct {
	*AggregateBaseSetter
	// contains filtered or unexported fields
}

Store is the default implementation for the EventStore interface.

func New

func New(cfg *Config, eventCodec EventCodec, snapCodec SnapshotCodec, storage StorageBase) (*Store, error)

New creates new EventStore implementation.

func (*Store) Commit

func (e *Store) Commit(ctx context.Context, agg Aggregate) error

Commit commits all uncommitted events within given aggregate.

func (*Store) LoadEvents

func (e *Store) LoadEvents(ctx context.Context, agg Aggregate) error

LoadEvents gets the event stream and applies on provided aggregate.

func (*Store) LoadEventsWithSnapshot

func (e *Store) LoadEventsWithSnapshot(ctx context.Context, agg Aggregate) error

LoadEventsWithSnapshot gets the aggregate stream with the latest possible snapshot.

func (*Store) SaveSnapshot

func (e *Store) SaveSnapshot(ctx context.Context, agg Aggregate) error

SaveSnapshot stores the snapshot

func (*Store) StreamEvents

func (e *Store) StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)

StreamEvents opens an event stream that matches given request.

func (*Store) WithStorage added in v0.0.20

func (e *Store) WithStorage(base StorageBase) *Store

WithStorage creates a copy of the event store with given storage base.

type StreamEventsRequest

type StreamEventsRequest struct {
	// AggregateTypes streams the events for selected aggregate types.
	AggregateTypes []string
	// AggregateIDs is the filter that streams events for selected aggregate ids.
	AggregateIDs []string
	// ExcludeEventTypes is the filter that provides a stream with excluded event types.
	ExcludeEventTypes []string
	// EventTypes is the filter that gets only selected event types.
	EventTypes []string
	// BuffSize defines the size of the stream channel buffer.
	BuffSize int
}

StreamEventsRequest is a request for the stream events query.

type TxStorage added in v0.0.18

type TxStorage interface {
	StorageBase
	Commit(ctx context.Context) error
	Rollback(ctx context.Context) error
}

TxStorage is the interface that describes a storage base with a started transaction.

type UUIDGenerator

type UUIDGenerator struct{}

UUIDGenerator implements IdGenerator interface. Generates UUID V4 identifier.

func (UUIDGenerator) GenerateId

func (u UUIDGenerator) GenerateId() string

GenerateId generates identified. Implements IdGenerator interface.

Directories

Path Synopsis
esxsql module
esxsql_test Module
Package mockes is a generated GoMock package.
Package mockes is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL