eventstore

package
v2.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotSupported = errors.New("not supported")

ErrNotSupported is returned when the operation is not supported.

Functions

func ValidateEventsBeforeSave added in v2.16.0

func ValidateEventsBeforeSave(events []Event) error

Validate events before saving them in the database Prerequisites that must hold:

  1. AggregateID, GroupID and EventType are not empty
  2. Version for each event is by 1 greater than the version of the previous event
  3. Only the first event can be a snapshot
  4. All events have the same AggregateID and GroupID
  5. Timestamps are non-zero
  6. Timestamps are non-decreasing

Types

type DeleteQuery

type DeleteQuery struct {
	GroupID string // filter by group ID, required
}

Delete documents with given group id

type DeviceDocumentMetadata added in v2.16.0

type DeviceDocumentMetadata struct {
	DeviceID  string
	ServiceID string
}

type ETagData added in v2.10.0

type ETagData struct {
	ETag      []byte
	Timestamp int64
}

type Event

type Event = interface {
	Version() uint64
	EventType() string
	AggregateID() string
	GroupID() string
	IsSnapshot() bool
	ServiceID() (string, bool)
	Timestamp() time.Time
	ETag() *ETagData
	Types() []string
}

Event interface over event created by user.

type EventStore

type EventStore interface {
	// Get events from the eventstore with timestamp larger than given value
	// If timestamp is <=0 then the argument is ignored.
	GetEvents(ctx context.Context, queries []GetEventsQuery, timestamp int64, eventHandler Handler) error
	// Save save events to eventstore.
	// AggregateID, GroupID and EventType are required.
	// All events within one Save operation shall have the same AggregateID and GroupID.
	// Versions shall be unique and ascend continually.
	// Only first event can be a snapshot.
	Save(ctx context.Context, events ...Event) (status SaveStatus, err error)
	LoadUpToVersion(ctx context.Context, queries []VersionQuery, eventHandler Handler) error
	LoadFromVersion(ctx context.Context, queries []VersionQuery, eventHandler Handler) error
	LoadFromSnapshot(ctx context.Context, queries []SnapshotQuery, eventHandler Handler) error
	RemoveUpToVersion(ctx context.Context, queries []VersionQuery) error
	Delete(ctx context.Context, queries []DeleteQuery) error

	LoadDeviceMetadataByServiceIDs(ctx context.Context, serviceIDs []string, limit int64) ([]DeviceDocumentMetadata, error)
	GetLatestDeviceETags(ctx context.Context, deviceID string, limit uint32) ([][]byte, error)
	Close(ctx context.Context) error
	Clear(ctx context.Context) error
}

EventStore provides interface over eventstore. More aggregates can be grouped by groupID, but aggregateID of aggregates must be unique against whole DB.

type EventUnmarshaler

type EventUnmarshaler = interface {
	Version() uint64
	EventType() string
	AggregateID() string
	GroupID() string
	IsSnapshot() bool
	Timestamp() time.Time
	Unmarshal(v interface{}) error
}

EventUnmarshaler provides event.

type FactoryModelFunc

type FactoryModelFunc func(ctx context.Context, groupID, aggregateID string) (Model, error)

FactoryModelFunc creates user model.

type GetEventsQuery

type GetEventsQuery struct {
	GroupID     string   // filter by group ID, optional
	AggregateID string   // filter to certain aggregateID, optional
	Types       []string // filter to certain event types, optional
}

Get events with given attributes. All filtering options are optional, if none are given then all events are returned,

type Handler

type Handler = interface {
	Handle(ctx context.Context, iter Iter) (err error)
}

Handler provides handler for eventstore or eventbus.

type Iter

type Iter = interface {
	Next(ctx context.Context) (EventUnmarshaler, bool)
	Err() error
}

Iter provides iterator over events from eventstore or eventbus.

type LoadedEvent

type LoadedEvent struct {
	// contains filtered or unexported fields
}

func NewLoadedEvent

func NewLoadedEvent(
	version uint64,
	eventType string,
	aggregateID string,
	groupID string,
	isSnapshot bool,
	timestamp time.Time,
	dataUnmarshaler func(v interface{}) error,
) LoadedEvent

func (LoadedEvent) AggregateID

func (e LoadedEvent) AggregateID() string

func (LoadedEvent) EventType

func (e LoadedEvent) EventType() string

func (LoadedEvent) GroupID

func (e LoadedEvent) GroupID() string

func (LoadedEvent) IsSnapshot

func (e LoadedEvent) IsSnapshot() bool

func (LoadedEvent) Timestamp

func (e LoadedEvent) Timestamp() time.Time

func (LoadedEvent) Unmarshal

func (e LoadedEvent) Unmarshal(v interface{}) error

func (LoadedEvent) Version

func (e LoadedEvent) Version() uint64

type LogDebugfFunc

type LogDebugfFunc func(fmt string, args ...interface{})

LogDebugfFunc log debug messages

type Model

type Model interface {
	Handler
}

Model user defined model where events from eventstore will be projected.

type Projection

type Projection struct {
	LogDebugfFunc LogDebugfFunc
	// contains filtered or unexported fields
}

Projection projects events from eventstore to user model.

func NewProjection

func NewProjection(store EventStore, factoryModel FactoryModelFunc, logDebugfFunc LogDebugfFunc) *Projection

NewProjection projection over eventstore.

func (*Projection) Forget

func (p *Projection) Forget(queries []SnapshotQuery) (err error)

Forget drop projection by query.Version in Query is ignored.

func (*Projection) Handle

func (p *Projection) Handle(ctx context.Context, iter Iter) error

Handle update projection by events.

func (*Projection) HandleWithReload

func (p *Projection) HandleWithReload(ctx context.Context, iter Iter) error

HandleWithReload update projection by events and reload events if it is needed.

func (*Projection) Models

func (p *Projection) Models(queries []SnapshotQuery, onModel func(m Model) (wantNext bool))

Models return models from projection.

func (*Projection) Project

func (p *Projection) Project(ctx context.Context, queries []SnapshotQuery) (err error)

Project update projection from snapshots defined by query. Verson in Query is ignored.

type SaveStatus

type SaveStatus int
const (
	Ok                   SaveStatus = 0  // events were stored
	ConcurrencyException SaveStatus = 1  // events with this version already exists
	SnapshotRequired     SaveStatus = 2  // event store requires aggregated snapshot before applying new event; snapshot shall not contain this new event
	Fail                 SaveStatus = -1 // error occurred
)

type SnapshotQuery

type SnapshotQuery struct {
	GroupID     string   // filter by group ID
	AggregateID string   // filter to certain aggregateID, groupID is required
	Types       []string // filter to certain event types, optional
}

SnapshotQuery used to load events from snapshot.

type VersionQuery

type VersionQuery struct {
	GroupID     string // required
	AggregateID string // required
	Version     uint64 // required
}

VersionQuery used to load events from version.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL