events

package
v0.0.0-...-4450389 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2019 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package `events` implements event sourcing with MongoDB as the event store. The main classes are `Journal` and `Engine`.

`Journal`: Event log backed by MongoDB collection. Event notification via Go channels.

`Engine`: Building block for event sourcing aggregates. See packages `fsomain`, `fsoregistry`, `fsorepos` as examples how to use `Engine`.

Index

Constants

View Source
const (
	OpFindHead             Op = "finding history in heads collection"
	OpInsertHead              = "inserting into heads collection"
	OpUpdateHead              = "updating heads collection"
	OpUpdateHeadSerial        = "updating head serial"
	OpFindEvent               = "finding event in events collection"
	OpFindPreviousEvent       = "finding previous event in events collection"
	OpInsertEvent             = "inserting into events collection"
	OpInsertJournal           = "inserting journal entry"
	OpFindJournalDuplicate    = "finding duplicate journal entry"
	OpFindJournal             = "finding event in journal collection"
	OpScanJournal             = "scanning journal"
	OpFindStart               = "finding start"
	OpEventIds                = "computing event ids"
	OpEncodeEventProto        = "encoding event protobuf"
	OpDecodeJournalProto      = "decoding journal protobuf"
	OpDecodeEventProto        = "decoding event protobuf"
	OpParseEventId            = "parsing event ID"
	OpCheckEventId            = "checking event ID"
	OpParseEventParent        = "parsing event parent ID"
	OpHeadEventLookup         = "head event lookup"
	OpTailEventLookup         = "tail event lookup"
)

`OpX` are operation codes for errors. The `OpX` strings are chosen such that `${OpX} failed: ...` is valid English.

View Source
const (
	KeyId       = "_id"
	KeyProtobuf = "pb"
	KeyHead     = "h"
	KeyEpoch    = "e"
	KeyEpochLog = "el"
	KeyTail     = "t"
	KeyPhase    = "ph"
	KeyDtime    = "dt"
	KeyTime     = "ts"
	KeySerial   = "s"
	// `KeyJournalIsUpToDate` is only used to migrate from the schema that
	// used a boolean to indicate that the serialized journal is up to date
	// without tracking the head serial.  See `Commit()`.
	KeyJournalIsUpToDate = "u"
)

`KeyX` is the Mongo field name for the Go field `X`.

View Source
const ConfigDeletedDays = 30

`ConfigDeletedDays` is the minimal duration since refs were marked for deletion before the refs doc may be removed from the database.

View Source
const ConfigDeletedDuration = ConfigDeletedDays * 24 * time.Hour
View Source
const ConfigDeletingDays = 15

`ConfigDeletingDays` is the minimal duration since refs were marked for deletion before the corresponding journal may be deleted.

View Source
const ConfigDeletingDuration = ConfigDeletingDays * 24 * time.Hour
View Source
const ConfigRecentDays = 30

`ConfigRecentDays` is the minimal age of an event before it may be deleted. The age is based on etime.

View Source
const ConfigRecentDuration = ConfigRecentDays * 24 * time.Hour
View Source
const ConfigTrimInterval = ConfigTrimIntervalDays * 24 * time.Hour
View Source
const ConfigTrimIntervalDays = 30
View Source
const HeadSerialUnspecified int64 = 0

Variables

View Source
var EventEpoch ulid.I

`EventEpoch` is the id that indicates the beginning of an event history.

View Source
var NoVC = ulid.One

`NoVC` is a sentinel that indicates `TellIdVid()` to skip the version check.

View Source
var RetryNoVC = ulid.Two

`RetryNoVC` is a sentinel that indicates `TellIdVid()` to skip the version check like `NoVC` but retry the command a few times if committing events fails due to a concurrent update.

View Source
var WildcardTopic uuid.I

`WildcardTopic` indicates that a subscribed channel receives a signal for any `post()`.

Functions

func IsVersionConflictError

func IsVersionConflictError(err error) bool

Types

type Advancer

type Advancer interface {
	Advance(State, Event) State
}

`Advancer` applies events to `State` to produce updated `State`.

type Behavior

type Behavior interface {
	NewState(id uuid.I) State
	NewEvent() Event
	NewAdvancer() Advancer
	Tell(State, Command) ([]Event, error)
}

`Behavior` is used to customize `Engine` to implement a specific aggregate.

type Command

type Command interface {
	AggregateCommand()
}

A `Command` can be applied to change `State`.

type CorruptedDataError

type CorruptedDataError struct {
	Op  Op
	Err error
}

func (*CorruptedDataError) Error

func (err *CorruptedDataError) Error() string

func (*CorruptedDataError) Unwrap

func (err *CorruptedDataError) Unwrap() error

type DBError

type DBError struct {
	Op  Op
	Err error
}

func (*DBError) Error

func (err *DBError) Error() string

func (*DBError) Unwrap

func (err *DBError) Unwrap() error

type DuplicateEventMismatchError

type DuplicateEventMismatchError struct {
	Id ulid.I
}

func (*DuplicateEventMismatchError) Error

func (err *DuplicateEventMismatchError) Error() string

type DuplicateJournalEntryProtobufMismatchError

type DuplicateJournalEntryProtobufMismatchError struct {
	HistoryId uuid.I
	EventId   ulid.I
}

func (*DuplicateJournalEntryProtobufMismatchError) Error

type DuplicateJournalEntrySerialMismatchError

type DuplicateJournalEntrySerialMismatchError struct {
	HistoryId uuid.I
	EventId   ulid.I
	Stored    int64
	Expected  int64
}

func (*DuplicateJournalEntrySerialMismatchError) Error

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

`Engine` combines an event `Journal` with `Behavior` into a building block for an event sourcing aggregate. See `fsomain` as an example.

func NewEngine

func NewEngine(journal *Journal, behavior Behavior) *Engine

func (*Engine) DeleteIdVid

func (eng *Engine) DeleteIdVid(
	id uuid.I, vid ulid.I, cmd Command,
) error

`DeleteIdVid()` deletes a history. The history is not deleted immediately, but only marked for deletion. The actual deletion of the journal, refs, and events happens during garbage collection.

For `NoVC` and `RetryNoVC`, a missing or already deleted history does not cause an error.

For a real `vid`, the history must exist and its state must match the `vid`, and the `Tell()` behavior command handler must allow deletion by returning `nil, nil`.

func (*Engine) FindFromState

func (eng *Engine) FindFromState(s State) (State, error)

func (*Engine) FindId

func (eng *Engine) FindId(id uuid.I) (State, error)

func (*Engine) TellIdVid

func (eng *Engine) TellIdVid(
	id uuid.I, vid ulid.I, cmd Command,
) (ulid.I, error)

func (*Engine) TellIdVidState

func (eng *Engine) TellIdVidState(
	id uuid.I, vid ulid.I, cmd Command,
) (State, error)

func (*Engine) TellState

func (eng *Engine) TellState(s State, cmd Command) (State, error)

type EpochTime

type EpochTime struct {
	Epoch ulid.I    `bson:"e"`
	Time  time.Time `bson:"ts"`
}

type Event

type Event interface {
	EventUnmarshaler
	EventMarshaler
}

type EventDoc

type EventDoc struct {
	Id       ulid.I `bson:"_id"`
	Protobuf []byte `bson:"pb"`
}

type EventError

type EventError struct {
	Op  Op
	Err error
}

func (*EventError) Error

func (err *EventError) Error() string

func (*EventError) Unwrap

func (err *EventError) Unwrap() error

type EventIdMismatchError

type EventIdMismatchError struct {
	MongoId ulid.I
	ProtoId ulid.I
}

func (*EventIdMismatchError) Error

func (err *EventIdMismatchError) Error() string

type EventMarshaler

type EventMarshaler interface {
	MarshalProto() ([]byte, error)
	WithId(ulid.I) Event
	WithParent(ulid.I) Event
}

type EventUnmarshaler

type EventUnmarshaler interface {
	UnmarshalProto([]byte) error
	Id() ulid.I
	Parent() ulid.I
}

type EventsGarbageCollector

type EventsGarbageCollector struct {
	// contains filtered or unexported fields
}

func NewEventsGarbageCollector

func NewEventsGarbageCollector(
	lg Logger, j *Journal,
) *EventsGarbageCollector

func (*EventsGarbageCollector) Gc

`Gc()` runs all garbage collectors, specifically:

GcEvents()
GcJournalTail()
GcDeletedRefs()
GcDeletingJournals()

The order of the `GcX()` calls prefers slow over aggressive garbage collection. Some garbage that could in principle be deleted right away may require another gc cycle to be actually deleted.

func (*EventsGarbageCollector) GcDeletedRefs

func (gc *EventsGarbageCollector) GcDeletedRefs(
	ctx context.Context,
) error

`GcDeletedRefs()` deletes refs from the database that are in `PhaseDeleted` and have initially been marked for deletion more than `ConfigDeletedDuration` ago.

func (*EventsGarbageCollector) GcDeletingJournals

func (gc *EventsGarbageCollector) GcDeletingJournals(
	ctx context.Context,
) error

`GcDeletedJournals()` removes journals of refs that are in `PhaseDeleting` and have been initially marked for deletion more than `ConfigDeletingDuration` ago. The refs are then changed to `PhaseDeleted`, so that they are deleted by a future `GcDeletedRefs()`.

func (*EventsGarbageCollector) GcEvents

func (gc *EventsGarbageCollector) GcEvents(ctx context.Context) error

`GcEvents()` deletes unreachable events.

It uses mark and sweep with the following colors:

  • unspecified: event is not in Mongo collection.
  • white: event is old and unreachable.
  • gray: event is recent or reachable via a head.
  • black: event is a tail or has been painted.

Painting starts from gray nodes and walks along parents, marking the visited nodes in black, until the first black node is reached, i.e. each stroke stops either at a tail or at a node that has been already painted (and its parents have been painted, too). After painting, white events can be deleted.

All refs, including refs in `PhaseDeleting` or `PhaseDeleted`, prevent events from beeing deleted. Events will only be deleted after the refs doc has been deleted from the database.

func (*EventsGarbageCollector) GcJournalTail

func (gc *EventsGarbageCollector) GcJournalTail(ctx context.Context) error

`GcJournalTail()` deletes journal entries before the tail.

type InternalError

type InternalError struct {
	Op  Op
	Err error
}

func (*InternalError) Error

func (err *InternalError) Error() string

func (*InternalError) Unwrap

func (err *InternalError) Unwrap() error

type Iter

type Iter struct {
	// contains filtered or unexported fields
}

Valid `Iter` states:

  • `mongo == nil && err == nil`: empty iterator.
  • `mongo == nil && err != nil`: error before Mongo query.
  • `mongo != nil`: active Mongo query.

func (*Iter) Close

func (it *Iter) Close() error

func (*Iter) Next

func (it *Iter) Next(ev EventUnmarshaler) bool

type Journal

type Journal struct {
	// contains filtered or unexported fields
}

func NewJournal

func NewJournal(conn *mgo.Session, ns string) (*Journal, error)

`NewJournal(conn, ns)` creates a new event journal that is backed by MongoDB collections `<ns>.events`, `<ns>.refs`, and `<ns>.journal`.

To activate event notification via Go channels, `go Serve()` with a context that is canceled during shutdown. Example, w/o error handling:

var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())

// ...

mainJ, err := events.NewJournal(mgs, "evjournal.fsomain")
wg.Add(1)
go func() {
	err := mainJ.Serve(ctx)
	wg.Done()
}()

// ...
// Shutdown
cancel()

func (*Journal) Commit

func (j *Journal) Commit(
	historyId uuid.I, evs []Event,
) ([]Event, error)

func (*Journal) Delete

func (j *Journal) Delete(
	historyId uuid.I, head ulid.I,
) error

func (*Journal) Find

func (j *Journal) Find(historyId uuid.I, after ulid.I) *Iter

func (*Journal) Head

func (j *Journal) Head(historyId uuid.I) (ulid.I, error)

func (*Journal) Serve

func (j *Journal) Serve(ctx context.Context) error

func (*Journal) SetTrimPolicy

func (j *Journal) SetTrimPolicy(tp TrimPolicy)

func (*Journal) Subscribe

func (j *Journal) Subscribe(ch chan<- uuid.I, historyId uuid.I)

`Subscribe()` registers the channel `ch` to receive notifications after new events were added to the journal for `historyId`. Use the empty string `historyId=""` to receive notifications for any event.

Sends are non-blocking. Usually use a buffered channel of size 1.

func (*Journal) Unsubscribe

func (j *Journal) Unsubscribe(ch chan<- uuid.I)

type JournalDoc

type JournalDoc struct {
	Id       idid.I `bson:"_id"`
	Serial   int64  `bson:"s"`
	Protobuf []byte `bson:"pb"`
}

type JournalEntryParentMismatchError

type JournalEntryParentMismatchError struct {
	EventId  ulid.I
	Actual   ulid.I
	Expected ulid.I
}

func (*JournalEntryParentMismatchError) Error

type Logger

type Logger interface {
	Infow(msg string, kv ...interface{})
}

type MissingHeadEventError

type MissingHeadEventError struct {
	Id ulid.I
}

func (*MissingHeadEventError) Error

func (err *MissingHeadEventError) Error() string

type MissingTailEventError

type MissingTailEventError struct {
	Id ulid.I
}

func (*MissingTailEventError) Error

func (err *MissingTailEventError) Error() string

type Op

type Op string

type PhaseCode

type PhaseCode int32
const (
	PhaseUnspecified PhaseCode = iota
	PhaseActive
	PhaseDeleting
	PhaseDeleted
)

func (PhaseCode) IsActive

func (p PhaseCode) IsActive() bool

`PhaseUnspecified` is treated as `PhaseActive` for backward compatibility with old MongoDB docs that were created without phase.

func (PhaseCode) IsInactive

func (p PhaseCode) IsInactive() bool

type RefsDoc

type RefsDoc struct {
	Id       uuid.I      `bson:"_id"`
	Head     ulid.I      `bson:"h"`
	Epoch    ulid.I      `bson:"e"`
	EpochLog []EpochTime `bson:"el"`
	Tail     ulid.I      `bson:"t"`

	Serial int64 `bson:"s"`

	// `Phase` may be missing in old docs.  A missing `Phase` is handled as
	// `PhaseActive`.
	Phase PhaseCode `bson:"ph"`

	// `Dtime` is the time when the deletion started, i.e. when `Phase` was
	// set to `PhaseDeleting`.
	Dtime time.Time `bson:"dt"`
}

`Head` contains the latest event. `Epoch` contains the parent of the first visible event. `Head` may equal `Epoch` to indicate an empty history. `Tail` contains the last event that must not be deleted.

`Tail <= Epoch <= Head`.

`EpochLog` contains all epochs. When advancing `Epoch`, the epoch is also appended to `EpochLog`. See `advanceOneEpoch()`.

type RetryNoVCError

type RetryNoVCError struct {
	Err error
}

func (*RetryNoVCError) Error

func (err *RetryNoVCError) Error() string

func (*RetryNoVCError) Unwrap

func (err *RetryNoVCError) Unwrap() error

type State

type State interface {
	AggregateState()
	Id() uuid.I
	Vid() ulid.I
	SetVid(ulid.I)
}

`State` is the general aggregate state as managed by the `Engine`. Specific aggregates, like `fsomain`, downcast to access their state.

type TrimPolicy

type TrimPolicy interface {
	NewEvent() EventUnmarshaler
	IsNewEpoch(epoch, first EventUnmarshaler, now time.Time) bool
	IsNewTail(eventId ulid.I, epochTime, now time.Time) bool
}

type Trimmer

type Trimmer struct {
	// contains filtered or unexported fields
}

func NewTrimmer

func NewTrimmer(
	lg Logger, j *Journal,
) *Trimmer

func (*Trimmer) Trim

func (t *Trimmer) Trim(ctx context.Context) error

`Trim()` first advances tails and then epochs, so that new epochs cannot immediately become tails, even if the `IsNewTail()` policy would allow it. New epochs can only become tails during future calls to `Trim()`.

type UnknownHistoryError

type UnknownHistoryError struct {
	Id uuid.I
}

func (*UnknownHistoryError) Error

func (err *UnknownHistoryError) Error() string

type VersionConflictError

type VersionConflictError struct {
	Stored   ulid.I
	Expected ulid.I
}

func (*VersionConflictError) Error

func (err *VersionConflictError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL