persistence

package
v0.0.0-...-eadad3b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2020 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package persistence provides abstractions for data persistence.

Index

Constants

This section is empty.

Variables

View Source
var ErrDataStoreClosed = errors.New("data store is closed")

ErrDataStoreClosed is returned when performing any persistence operation on a closed data-store.

View Source
var ErrDataStoreLocked = errors.New("data store is locked")

ErrDataStoreLocked indicates that an application's data store can not be opened because it is locked by another engine instance.

Functions

This section is empty.

Types

type AggregateMetaData

type AggregateMetaData struct {
	// HandlerKey is the identity key of the aggregate message handler.
	HandlerKey string

	// InstanceID is the aggregate instance ID.
	InstanceID string

	// Revision is the instance's current version, used to enforce optimistic
	// concurrency control.
	Revision uint64

	// InstanceExists is true if the instance currently exists.
	//
	// When an aggregate instance is destroyed, its meta-data is retained but
	// this flag is set to false.
	InstanceExists bool

	// LastDestroyedBy is the ID of the last event message recorded in when the
	// instance was most recently destroyed.
	LastDestroyedBy string
}

AggregateMetaData contains meta-data about an aggregate instance.

type AggregateRepository

type AggregateRepository interface {
	// LoadAggregateMetaData loads the meta-data for an aggregate instance.
	//
	// hk is the aggregate handler's identity key, id is the instance ID.
	LoadAggregateMetaData(
		ctx context.Context,
		hk, id string,
	) (AggregateMetaData, error)
}

AggregateRepository is an interface for reading aggregate state.

type Batch

type Batch []Operation

Batch is a set of operations that are committed to the data store atomically using a Persister.

func (Batch) AcceptVisitor

func (b Batch) AcceptVisitor(ctx context.Context, v OperationVisitor) error

AcceptVisitor visits each operation in the batch.

func (Batch) MustValidate

func (b Batch) MustValidate()

MustValidate panics if the batch contains any operations that operate on the same entity.

type ConflictError

type ConflictError struct {
	// Cause is the operation that caused the conflict.
	Cause Operation
}

ConflictError is an error indicating one or more operations within a batch caused an optimistic concurrency conflict.

func (ConflictError) Error

func (e ConflictError) Error() string

type DataStore

type DataStore interface {
	AggregateRepository
	EventRepository
	OffsetRepository
	ProcessRepository
	QueueRepository

	Persister

	// Close closes the data store.
	//
	// Closing a data-store causes any future calls to Persist() to return
	// ErrDataStoreClosed.
	//
	// The behavior read operations on a closed data-store is
	// implementation-defined.
	//
	// In general use it is expected that all pending calls to Persist() will
	// have finished before a data-store is closed. Close() may block until any
	// in-flight calls to Persist() return, or may prevent any such calls from
	// succeeding.
	Close() error
}

DataStore is an interface used by the engine to persist and retrieve data for a specific application.

type DataStoreSet

type DataStoreSet struct {
	Provider Provider
	// contains filtered or unexported fields
}

DataStoreSet is a collection of data-stores for several applications.

func (*DataStoreSet) Close

func (s *DataStoreSet) Close() error

Close closes all datastores in the set.

func (*DataStoreSet) Get

func (s *DataStoreSet) Get(ctx context.Context, k string) (DataStore, error)

Get returns the data store for a given application.

If the set already contains a data-store for the given application it is returned. Otherwise it is opened and added to the set. The caller is NOT responsible for closing the data store.

type Event

type Event struct {
	Offset   uint64
	Envelope *envelopespec.Envelope
}

Event is a persisted event message.

func (Event) ID

func (e Event) ID() string

ID returns the ID of the message.

type EventRepository

type EventRepository interface {
	// NextEventOffset returns the next "unused" offset.
	NextEventOffset(ctx context.Context) (uint64, error)

	// LoadEventsByType loads events that match a specific set of message types.
	//
	// f is the set of message types to include in the result. The keys of f are
	// the "portable type name" produced when the events are marshaled.
	//
	// o specifies the (inclusive) lower-bound of the offset range to include in
	// the results.
	LoadEventsByType(
		ctx context.Context,
		f map[string]struct{},
		o uint64,
	) (EventResult, error)

	// LoadEventsBySource loads the events produced by a specific handler.
	//
	// hk is the handler's identity key.
	//
	// id is the instance ID, which must be empty if the handler type does not
	// use instances.
	//
	// m is ID of a "barrier" message. If supplied, the results are limited to
	// events with higher offsets than the barrier message. If the message
	// cannot be found, UnknownMessageError is returned.
	LoadEventsBySource(
		ctx context.Context,
		hk, id, m string,
	) (EventResult, error)
}

EventRepository is an interface for reading event messages.

type EventResult

type EventResult interface {
	// Next returns the next event in the result.
	//
	// It returns false if the are no more events in the result.
	Next(ctx context.Context) (Event, bool, error)

	// Close closes the cursor.
	Close() error
}

EventResult is the result of a query made using an EventRepository.

EventResult values are not safe for concurrent use.

type OffsetRepository

type OffsetRepository interface {
	// LoadOffset loads the offset associated with a specific application.
	//
	// ak is the application's identity key.
	LoadOffset(ctx context.Context, ak string) (uint64, error)
}

OffsetRepository is an interface for reading event stream offsets associated with remote applications.

type Operation

type Operation interface {
	// AcceptVisitor calls the appropriate visit method on the given visitor.
	AcceptVisitor(context.Context, OperationVisitor) error
	// contains filtered or unexported methods
}

Operation is a persistence operation that can be performed as part of an atomic batch.

type OperationVisitor

type OperationVisitor interface {
	VisitSaveAggregateMetaData(context.Context, SaveAggregateMetaData) error
	VisitSaveEvent(context.Context, SaveEvent) error
	VisitSaveProcessInstance(context.Context, SaveProcessInstance) error
	VisitRemoveProcessInstance(context.Context, RemoveProcessInstance) error
	VisitSaveQueueMessage(context.Context, SaveQueueMessage) error
	VisitRemoveQueueMessage(context.Context, RemoveQueueMessage) error
	VisitSaveOffset(context.Context, SaveOffset) error
}

OperationVisitor visits persistence operations.

type Persister

type Persister interface {
	// Persist commits a batch of operations atomically.
	//
	// If any one of the operations causes an optimistic concurrency conflict
	// the entire batch is aborted and a ConflictError is returned.
	Persist(context.Context, Batch) (Result, error)
}

A Persister is an interface for committing batches of atomic operations to the data store.

type ProcessInstance

type ProcessInstance struct {
	// HandlerKey is the identity key of the process message handler.
	HandlerKey string

	// InstanceID is the process instance ID.
	InstanceID string

	// Revision is the instance's current version, used to enforce optimistic
	// concurrency control.
	Revision uint64

	// Packet contains the binary representation of the process state.
	Packet marshalkit.Packet
}

ProcessInstance contains the state of a process instance.

type ProcessRepository

type ProcessRepository interface {
	// LoadProcessInstance loads a process instance.
	//
	// hk is the process handler's identity key, id is the instance ID.
	LoadProcessInstance(
		ctx context.Context,
		hk, id string,
	) (ProcessInstance, error)
}

ProcessRepository is an interface for reading process state.

type Provider

type Provider interface {
	// Open returns a data-store for a specific application.
	//
	// k is the identity key of the application.
	//
	// Data stores are opened for exclusive use. If another engine instance has
	// already opened this application's data-store, ErrDataStoreLocked is
	// returned.
	Open(ctx context.Context, k string) (DataStore, error)
}

Provider is an interface used by the engine to obtain application-specific DataStore instances.

type QueueMessage

type QueueMessage struct {
	Revision      uint64
	FailureCount  uint
	NextAttemptAt time.Time
	Envelope      *envelopespec.Envelope
}

QueueMessage is a message persisted in the message queue.

func (QueueMessage) ID

func (m QueueMessage) ID() string

ID returns the ID of the message.

type QueueRepository

type QueueRepository interface {
	// LoadQueueMessages loads the next n messages from the queue.
	LoadQueueMessages(ctx context.Context, n int) ([]QueueMessage, error)
}

QueueRepository is an interface for reading queued messages.

type RemoveProcessInstance

type RemoveProcessInstance struct {
	// Instance is the instance to remove.
	//
	// Instance.Revision must be the revision of the process instance as
	// currently persisted, otherwise an optimistic concurrency conflict occurs
	// and the entire batch of operations is rejected.
	Instance ProcessInstance
}

RemoveProcessInstance is an Operation that removes a process instance.

The instance's pending timeout messages are removed from the message queue.

func (RemoveProcessInstance) AcceptVisitor

func (op RemoveProcessInstance) AcceptVisitor(ctx context.Context, v OperationVisitor) error

AcceptVisitor calls v.VisitRemoveProcessInstance().

type RemoveQueueMessage

type RemoveQueueMessage struct {
	// Message is the message to remove from the queue.
	//
	// The message's revision field must be the revision of the message as
	// currently persisted, otherwise an optimistic concurrency conflict occurs
	// and the entire batch of operations is rejected.
	Message QueueMessage
}

RemoveQueueMessage is an Operation that removes a message from the queue.

func (RemoveQueueMessage) AcceptVisitor

func (op RemoveQueueMessage) AcceptVisitor(ctx context.Context, v OperationVisitor) error

AcceptVisitor calls v.VisitRemoveQueueMessage().

type Result

type Result struct {
	// EventOffset contains the offsets of the events saved within the batch,
	// keyed by their message ID.
	EventOffsets map[string]uint64
}

Result is the result of a successfully persisted batch of operations.

type SaveAggregateMetaData

type SaveAggregateMetaData struct {
	// MetaData is the meta-data to persist.
	//
	// MetaData.Revision must be the revision of the aggregate instance as
	// currently persisted, otherwise an optimistic concurrency conflict occurs
	// and the entire batch of operations is rejected.
	MetaData AggregateMetaData
}

SaveAggregateMetaData is an Operation that creates or updates meta-data about an aggregate instance.

func (SaveAggregateMetaData) AcceptVisitor

func (op SaveAggregateMetaData) AcceptVisitor(ctx context.Context, v OperationVisitor) error

AcceptVisitor calls v.VisitSaveAggregateMetaData().

type SaveEvent

type SaveEvent struct {
	// Envelope is the envelope containing the event to persist.
	Envelope *envelopespec.Envelope
}

SaveEvent is an Operation that persists an event message.

func (SaveEvent) AcceptVisitor

func (op SaveEvent) AcceptVisitor(ctx context.Context, v OperationVisitor) error

AcceptVisitor calls v.VisitSaveEvent().

type SaveOffset

type SaveOffset struct {
	// ApplicationKey is the identity key of the source application.
	ApplicationKey string

	// CurrentOffset must be offset currently associated with this application,
	// otherwise an optimistic concurrency conflict occurs and the entire batch
	// of operations is rejected.
	CurrentOffset uint64

	// NextOffset is the next offset to consume from this application.
	NextOffset uint64
}

SaveOffset is an Operation that persists the offset of the next event to be consumed from a specific application.

func (SaveOffset) AcceptVisitor

func (op SaveOffset) AcceptVisitor(ctx context.Context, v OperationVisitor) error

AcceptVisitor calls v.VisitSaveOffset().

type SaveProcessInstance

type SaveProcessInstance struct {
	// Instance is the instance to persist.
	//
	// Instance.Revision must be the revision of the process instance as
	// currently persisted, otherwise an optimistic concurrency conflict occurs
	// and the entire batch of operations is rejected.
	Instance ProcessInstance
}

SaveProcessInstance is an Operation that creates or updates a process instance.

func (SaveProcessInstance) AcceptVisitor

func (op SaveProcessInstance) AcceptVisitor(ctx context.Context, v OperationVisitor) error

AcceptVisitor calls v.VisitSaveProcessInstance().

type SaveQueueMessage

type SaveQueueMessage struct {
	// Message is the message to persist to the queue.
	//
	// The message's revision field must be the revision of the message as
	// currently persisted, otherwise an optimistic concurrency conflict occurs
	// and the entire batch of operations is rejected.
	Message QueueMessage
}

SaveQueueMessage is an Operation that saves a message to the queue, or updates a message that is already on the queue.

func (SaveQueueMessage) AcceptVisitor

func (op SaveQueueMessage) AcceptVisitor(ctx context.Context, v OperationVisitor) error

AcceptVisitor calls v.VisitSaveQueueMessage().

type UnknownMessageError

type UnknownMessageError struct {
	MessageID string
}

UnknownMessageError is the error returned when a message referenced by its ID does not exist.

func (UnknownMessageError) Error

func (e UnknownMessageError) Error() string

Error returns a string representation of UnknownMessageError.

Directories

Path Synopsis
internal
providertest
Package providertest contains a common test suite for persistence.Provider implementations.
Package providertest contains a common test suite for persistence.Provider implementations.
provider
boltdb
Package boltdb is a BoltDB (bbolt) persistence provider.
Package boltdb is a BoltDB (bbolt) persistence provider.
memory
Package memory is an in-memory persistence provider.
Package memory is an in-memory persistence provider.
sql
Package sql is an SQL-based persistence provider with drivers for several popular SQL database systems.
Package sql is an SQL-based persistence provider with drivers for several popular SQL database systems.
sql/mysql
Package mysql is a MySQL driver for the SQL persistence provider.
Package mysql is a MySQL driver for the SQL persistence provider.
sql/postgres
Package postgres is a PostgreSQL driver for the SQL persistence provider.
Package postgres is a PostgreSQL driver for the SQL persistence provider.
sql/sqlite
Package sqlite is an SQlite v3 driver for the SQL persistence provider.
Package sqlite is an SQlite v3 driver for the SQL persistence provider.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL