eventbox

package
v0.0.0-...-51f9457 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2021 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package eventbox batches incoming events for a single Datastore entity for processing.

Index

Constants

This section is empty.

Variables

View Source
var ErrContention = errors.New("Datastore Contention")

ErrContention indicates Datastore contention, usually on the mailbox recipient entity itself.

View Source
var TombstonesDelay = 5 * time.Minute

TombstonesDelay is exposed to mitigate frequent errors in CV e2e tests when tasks are run in parallel with fake clock.

Functions

func Emit

func Emit(ctx context.Context, value []byte, to Recipient) error

Emit emits a new event with provided value and auto-generated unique ID.

func IsErrContention

func IsErrContention(err error) bool

IsErrContention checks if error, possibly wrapped, is ErrContention.

Types

type EVersion

type EVersion int

EVersion is recipient entity version.

type Event

type Event dsset.Item

Event is an incoming event.

type Events

type Events []Event

Events are incoming events.

func List

func List(ctx context.Context, r Recipient) (Events, error)

List returns unprocessed events. For use in tests only.

type PostProcessFn

type PostProcessFn func(context.Context) error

PostProcessFn should be executed after event processing completes.

func ProcessBatch

func ProcessBatch(ctx context.Context, r Recipient, p Processor, maxEvents int) ([]PostProcessFn, error)

ProcessBatch reliably processes outstanding events, while transactionally modifying state and performing arbitrary side effects.

Returns:

  • a slice of non-nil post process functions which SHOULD be executed immediately after calling this function. Those are generally extra work that needs to be done as the result of state modification.
  • error while processing events. Returns wrapped ErrContention if entity's EVersion has changed or there is contention on Datastore entities involved in a transaction.

type Processor

type Processor interface {
	// LoadState is called to load the state before a transaction.
	LoadState(context.Context) (State, EVersion, error)
	// PrepareMutation is called before a transaction to compute transitions based
	// on a batch of events.
	//
	// The events in a batch are an arbitrary subset of all outstanding events.
	// Because loading of events isn't synchronized with event senders,
	// a recipient of events may see them in different order than the origination
	// order, even if events were produced by a single sender.
	//
	// All actions that must be done atomically with updating state must be
	// encapsulated inside Transition.SideEffectFn callback.
	//
	// Garbage events will be deleted non-transactionally before executing
	// transactional transitions. These events may still be processed by a
	// concurrent invocation of a Processor. The garbage events slice may re-use
	// the given events slice. The garbage will be deleted even if PrepareMutation returns
	// non-nil error.
	//
	// For correctness, two concurrent invocation of a Processor must choose the
	// same events to be deleted as garbage. Consider scenario of 2 events A and B
	// deemed semantically the same and 2 concurrent Processor invocations:
	//   P1: let me delete A and hope to transactionally process B.
	//   P2:  ............ B and ............................... A.
	// Then, it's a real possibility that A and B are both deleted AND no neither
	// P1 nor P2 commits a transaction, thus forever forgetting about A and B.
	PrepareMutation(context.Context, Events, State) (transitions []Transition, garbage Events, err error)
	// FetchEVersion is called at the beginning of a transaction.
	//
	// The returned EVersion is compared against the one associated with a state
	// loaded via GetState. If different, the transaction is aborted and new state
	// isn't saved.
	FetchEVersion(ctx context.Context) (EVersion, error)
	// SaveState is called in a transaction to save the state if it has changed.
	//
	// The passed eversion is incremented value of eversion of what GetState
	// returned before.
	SaveState(context.Context, State, EVersion) error
}

Processor defines safe way to process events in a batch.

type Recipient

type Recipient struct {
	// Key is the Datastore key of the recipient.
	//
	// The corresponding entity doesn't have to exist.
	Key *datastore.Key
	// MonitoringString is the value for the metric field "recipient".
	//
	// There should be very few distinct values.
	MonitoringString string
}

Recipient is the recipient of the events.

type SideEffectFn

type SideEffectFn func(context.Context) error

SideEffectFn performs side effects with a Datastore transaction context. See Transition.SideEffectFn doc.

func Chain

func Chain(fs ...SideEffectFn) SideEffectFn

Chain combines several SideEffectFn.

NOTE: modifies incoming ... slice.

type State

type State interface{}

State is an arbitrary object.

Use a pointer to an actual state.

type Transition

type Transition struct {
	// SideEffectFn is called in a transaction to atomically with the state change
	// execute any side effects of a state transition.
	//
	// Typical use is notifying other CV components via TQ tasks.
	// Can be nil, meaning there no side effects to execute.
	//
	// TODO(tandrii): introduce error tag to indicate that failure was clean and
	// should be treated as if Transition wasn't started, s.t. progress of all
	// transitions before can be saved.
	SideEffectFn SideEffectFn
	// Events to consume with this transition.
	Events Events
	// TransitionTo is a state to transition to.
	//
	// It's allowed to transition to the exact same state.
	TransitionTo State
	// PostProcessFn is the function to be called by the eventbox user after
	// event processing completes.
	//
	// Note that it will be called outside of the transaction of all state
	// transitions, so the operation inside this function is not expected
	// to be atomic with this state transition.
	PostProcessFn PostProcessFn
}

Transition is a state transition.

Directories

Path Synopsis
Package dsset implements a particular flavor of Datastore-on-Firestore backed set.
Package dsset implements a particular flavor of Datastore-on-Firestore backed set.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL