Documentation ¶
Overview ¶
Package `events` implements event sourcing with MongoDB as the event store. The main classes are `Journal` and `Engine`.
`Journal`: Event log backed by MongoDB collection. Event notification via Go channels.
`Engine`: Building block for event sourcing aggregates. See packages `fsomain`, `fsoregistry`, `fsorepos` as examples how to use `Engine`.
Index ¶
- Constants
- Variables
- func IsVersionConflictError(err error) bool
- type Advancer
- type Behavior
- type Command
- type CorruptedDataError
- type DBError
- type DuplicateEventMismatchError
- type DuplicateJournalEntryProtobufMismatchError
- type DuplicateJournalEntrySerialMismatchError
- type Engine
- func (eng *Engine) DeleteIdVid(id uuid.I, vid ulid.I, cmd Command) error
- func (eng *Engine) FindFromState(s State) (State, error)
- func (eng *Engine) FindId(id uuid.I) (State, error)
- func (eng *Engine) TellIdVid(id uuid.I, vid ulid.I, cmd Command) (ulid.I, error)
- func (eng *Engine) TellIdVidState(id uuid.I, vid ulid.I, cmd Command) (State, error)
- func (eng *Engine) TellState(s State, cmd Command) (State, error)
- type EpochTime
- type Event
- type EventDoc
- type EventError
- type EventIdMismatchError
- type EventMarshaler
- type EventUnmarshaler
- type EventsGarbageCollector
- func (gc *EventsGarbageCollector) Gc(ctx context.Context) error
- func (gc *EventsGarbageCollector) GcDeletedRefs(ctx context.Context) error
- func (gc *EventsGarbageCollector) GcDeletingJournals(ctx context.Context) error
- func (gc *EventsGarbageCollector) GcEvents(ctx context.Context) error
- func (gc *EventsGarbageCollector) GcJournalTail(ctx context.Context) error
- type InternalError
- type Iter
- type Journal
- func (j *Journal) Commit(historyId uuid.I, evs []Event) ([]Event, error)
- func (j *Journal) Delete(historyId uuid.I, head ulid.I) error
- func (j *Journal) Find(historyId uuid.I, after ulid.I) *Iter
- func (j *Journal) Head(historyId uuid.I) (ulid.I, error)
- func (j *Journal) Serve(ctx context.Context) error
- func (j *Journal) SetTrimPolicy(tp TrimPolicy)
- func (j *Journal) Subscribe(ch chan<- uuid.I, historyId uuid.I)
- func (j *Journal) Unsubscribe(ch chan<- uuid.I)
- type JournalDoc
- type JournalEntryParentMismatchError
- type Logger
- type MissingHeadEventError
- type MissingTailEventError
- type Op
- type PhaseCode
- type RefsDoc
- type RetryNoVCError
- type State
- type TrimPolicy
- type Trimmer
- type UnknownHistoryError
- type VersionConflictError
Constants ¶
const ( OpFindHead Op = "finding history in heads collection" OpInsertHead = "inserting into heads collection" OpUpdateHead = "updating heads collection" OpUpdateHeadSerial = "updating head serial" OpFindEvent = "finding event in events collection" OpFindPreviousEvent = "finding previous event in events collection" OpInsertEvent = "inserting into events collection" OpInsertJournal = "inserting journal entry" OpFindJournalDuplicate = "finding duplicate journal entry" OpFindJournal = "finding event in journal collection" OpScanJournal = "scanning journal" OpFindStart = "finding start" OpEventIds = "computing event ids" OpEncodeEventProto = "encoding event protobuf" OpDecodeJournalProto = "decoding journal protobuf" OpDecodeEventProto = "decoding event protobuf" OpParseEventId = "parsing event ID" OpCheckEventId = "checking event ID" OpParseEventParent = "parsing event parent ID" OpHeadEventLookup = "head event lookup" OpTailEventLookup = "tail event lookup" )
`OpX` are operation codes for errors. The `OpX` strings are chosen such that `${OpX} failed: ...` is valid English.
const ( KeyId = "_id" KeyProtobuf = "pb" KeyHead = "h" KeyEpoch = "e" KeyEpochLog = "el" KeyTail = "t" KeyPhase = "ph" KeyDtime = "dt" KeyTime = "ts" KeySerial = "s" // `KeyJournalIsUpToDate` is only used to migrate from the schema that // used a boolean to indicate that the serialized journal is up to date // without tracking the head serial. See `Commit()`. KeyJournalIsUpToDate = "u" )
`KeyX` is the Mongo field name for the Go field `X`.
const ConfigDeletedDays = 30
`ConfigDeletedDays` is the minimal duration since refs were marked for deletion before the refs doc may be removed from the database.
const ConfigDeletedDuration = ConfigDeletedDays * 24 * time.Hour
const ConfigDeletingDays = 15
`ConfigDeletingDays` is the minimal duration since refs were marked for deletion before the corresponding journal may be deleted.
const ConfigDeletingDuration = ConfigDeletingDays * 24 * time.Hour
const ConfigRecentDays = 30
`ConfigRecentDays` is the minimal age of an event before it may be deleted. The age is based on etime.
const ConfigRecentDuration = ConfigRecentDays * 24 * time.Hour
const ConfigTrimInterval = ConfigTrimIntervalDays * 24 * time.Hour
const ConfigTrimIntervalDays = 30
const HeadSerialUnspecified int64 = 0
Variables ¶
var EventEpoch ulid.I
`EventEpoch` is the id that indicates the beginning of an event history.
var NoVC = ulid.One
`NoVC` is a sentinel that indicates `TellIdVid()` to skip the version check.
var RetryNoVC = ulid.Two
`RetryNoVC` is a sentinel that indicates `TellIdVid()` to skip the version check like `NoVC` but retry the command a few times if committing events fails due to a concurrent update.
var WildcardTopic uuid.I
`WildcardTopic` indicates that a subscribed channel receives a signal for any `post()`.
Functions ¶
func IsVersionConflictError ¶
Types ¶
type Behavior ¶
type Behavior interface { NewState(id uuid.I) State NewEvent() Event NewAdvancer() Advancer Tell(State, Command) ([]Event, error) }
`Behavior` is used to customize `Engine` to implement a specific aggregate.
type Command ¶
type Command interface {
AggregateCommand()
}
A `Command` can be applied to change `State`.
type CorruptedDataError ¶
func (*CorruptedDataError) Error ¶
func (err *CorruptedDataError) Error() string
func (*CorruptedDataError) Unwrap ¶
func (err *CorruptedDataError) Unwrap() error
type DuplicateEventMismatchError ¶
func (*DuplicateEventMismatchError) Error ¶
func (err *DuplicateEventMismatchError) Error() string
type DuplicateJournalEntryProtobufMismatchError ¶
func (*DuplicateJournalEntryProtobufMismatchError) Error ¶
func (err *DuplicateJournalEntryProtobufMismatchError) Error() string
type DuplicateJournalEntrySerialMismatchError ¶
type DuplicateJournalEntrySerialMismatchError struct { HistoryId uuid.I EventId ulid.I Stored int64 Expected int64 }
func (*DuplicateJournalEntrySerialMismatchError) Error ¶
func (err *DuplicateJournalEntrySerialMismatchError) Error() string
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
`Engine` combines an event `Journal` with `Behavior` into a building block for an event sourcing aggregate. See `fsomain` as an example.
func (*Engine) DeleteIdVid ¶
`DeleteIdVid()` deletes a history. The history is not deleted immediately, but only marked for deletion. The actual deletion of the journal, refs, and events happens during garbage collection.
For `NoVC` and `RetryNoVC`, a missing or already deleted history does not cause an error.
For a real `vid`, the history must exist and its state must match the `vid`, and the `Tell()` behavior command handler must allow deletion by returning `nil, nil`.
func (*Engine) TellIdVidState ¶
type Event ¶
type Event interface { EventUnmarshaler EventMarshaler }
type EventError ¶
func (*EventError) Error ¶
func (err *EventError) Error() string
func (*EventError) Unwrap ¶
func (err *EventError) Unwrap() error
type EventIdMismatchError ¶
func (*EventIdMismatchError) Error ¶
func (err *EventIdMismatchError) Error() string
type EventMarshaler ¶
type EventUnmarshaler ¶
type EventsGarbageCollector ¶
type EventsGarbageCollector struct {
// contains filtered or unexported fields
}
func NewEventsGarbageCollector ¶
func NewEventsGarbageCollector( lg Logger, j *Journal, ) *EventsGarbageCollector
func (*EventsGarbageCollector) Gc ¶
func (gc *EventsGarbageCollector) Gc(ctx context.Context) error
`Gc()` runs all garbage collectors, specifically:
GcEvents() GcJournalTail() GcDeletedRefs() GcDeletingJournals()
The order of the `GcX()` calls prefers slow over aggressive garbage collection. Some garbage that could in principle be deleted right away may require another gc cycle to be actually deleted.
func (*EventsGarbageCollector) GcDeletedRefs ¶
func (gc *EventsGarbageCollector) GcDeletedRefs( ctx context.Context, ) error
`GcDeletedRefs()` deletes refs from the database that are in `PhaseDeleted` and have initially been marked for deletion more than `ConfigDeletedDuration` ago.
func (*EventsGarbageCollector) GcDeletingJournals ¶
func (gc *EventsGarbageCollector) GcDeletingJournals( ctx context.Context, ) error
`GcDeletedJournals()` removes journals of refs that are in `PhaseDeleting` and have been initially marked for deletion more than `ConfigDeletingDuration` ago. The refs are then changed to `PhaseDeleted`, so that they are deleted by a future `GcDeletedRefs()`.
func (*EventsGarbageCollector) GcEvents ¶
func (gc *EventsGarbageCollector) GcEvents(ctx context.Context) error
`GcEvents()` deletes unreachable events.
It uses mark and sweep with the following colors:
- unspecified: event is not in Mongo collection.
- white: event is old and unreachable.
- gray: event is recent or reachable via a head.
- black: event is a tail or has been painted.
Painting starts from gray nodes and walks along parents, marking the visited nodes in black, until the first black node is reached, i.e. each stroke stops either at a tail or at a node that has been already painted (and its parents have been painted, too). After painting, white events can be deleted.
All refs, including refs in `PhaseDeleting` or `PhaseDeleted`, prevent events from beeing deleted. Events will only be deleted after the refs doc has been deleted from the database.
func (*EventsGarbageCollector) GcJournalTail ¶
func (gc *EventsGarbageCollector) GcJournalTail(ctx context.Context) error
`GcJournalTail()` deletes journal entries before the tail.
type InternalError ¶
func (*InternalError) Error ¶
func (err *InternalError) Error() string
func (*InternalError) Unwrap ¶
func (err *InternalError) Unwrap() error
type Iter ¶
type Iter struct {
// contains filtered or unexported fields
}
Valid `Iter` states:
- `mongo == nil && err == nil`: empty iterator.
- `mongo == nil && err != nil`: error before Mongo query.
- `mongo != nil`: active Mongo query.
func (*Iter) Next ¶
func (it *Iter) Next(ev EventUnmarshaler) bool
type Journal ¶
type Journal struct {
// contains filtered or unexported fields
}
func NewJournal ¶
`NewJournal(conn, ns)` creates a new event journal that is backed by MongoDB collections `<ns>.events`, `<ns>.refs`, and `<ns>.journal`.
To activate event notification via Go channels, `go Serve()` with a context that is canceled during shutdown. Example, w/o error handling:
var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) // ... mainJ, err := events.NewJournal(mgs, "evjournal.fsomain") wg.Add(1) go func() { err := mainJ.Serve(ctx) wg.Done() }() // ... // Shutdown cancel()
func (*Journal) SetTrimPolicy ¶
func (j *Journal) SetTrimPolicy(tp TrimPolicy)
func (*Journal) Subscribe ¶
`Subscribe()` registers the channel `ch` to receive notifications after new events were added to the journal for `historyId`. Use the empty string `historyId=""` to receive notifications for any event.
Sends are non-blocking. Usually use a buffered channel of size 1.
func (*Journal) Unsubscribe ¶
type JournalDoc ¶
type JournalEntryParentMismatchError ¶
func (*JournalEntryParentMismatchError) Error ¶
func (err *JournalEntryParentMismatchError) Error() string
type MissingHeadEventError ¶
func (*MissingHeadEventError) Error ¶
func (err *MissingHeadEventError) Error() string
type MissingTailEventError ¶
func (*MissingTailEventError) Error ¶
func (err *MissingTailEventError) Error() string
type PhaseCode ¶
type PhaseCode int32
func (PhaseCode) IsActive ¶
`PhaseUnspecified` is treated as `PhaseActive` for backward compatibility with old MongoDB docs that were created without phase.
func (PhaseCode) IsInactive ¶
type RefsDoc ¶
type RefsDoc struct { Id uuid.I `bson:"_id"` Head ulid.I `bson:"h"` Epoch ulid.I `bson:"e"` EpochLog []EpochTime `bson:"el"` Tail ulid.I `bson:"t"` Serial int64 `bson:"s"` // `Phase` may be missing in old docs. A missing `Phase` is handled as // `PhaseActive`. Phase PhaseCode `bson:"ph"` // `Dtime` is the time when the deletion started, i.e. when `Phase` was // set to `PhaseDeleting`. Dtime time.Time `bson:"dt"` }
`Head` contains the latest event. `Epoch` contains the parent of the first visible event. `Head` may equal `Epoch` to indicate an empty history. `Tail` contains the last event that must not be deleted.
`Tail <= Epoch <= Head`.
`EpochLog` contains all epochs. When advancing `Epoch`, the epoch is also appended to `EpochLog`. See `advanceOneEpoch()`.
type RetryNoVCError ¶
type RetryNoVCError struct {
Err error
}
func (*RetryNoVCError) Error ¶
func (err *RetryNoVCError) Error() string
func (*RetryNoVCError) Unwrap ¶
func (err *RetryNoVCError) Unwrap() error
type State ¶
`State` is the general aggregate state as managed by the `Engine`. Specific aggregates, like `fsomain`, downcast to access their state.
type TrimPolicy ¶
type TrimPolicy interface { NewEvent() EventUnmarshaler IsNewEpoch(epoch, first EventUnmarshaler, now time.Time) bool IsNewTail(eventId ulid.I, epochTime, now time.Time) bool }
type UnknownHistoryError ¶
func (*UnknownHistoryError) Error ¶
func (err *UnknownHistoryError) Error() string
type VersionConflictError ¶
func (*VersionConflictError) Error ¶
func (err *VersionConflictError) Error() string