Version: v1.125.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2021 License: Apache-2.0 Imports: 10 Imported by: 0


Vortex - Local Cache for Elemental Objects

Vortex is a generic elemental cache, currently backed by MemDB. Other implementations are possible in the future.

Vortex requires three components:

  • A manipulator that allows Vortex to synchronize objects in the cache with a remote backend.

  • An optional subscriber that allows vortex to synchronize events from an upstream susbscriber.

  • A local datastore where it will cache objects. The datastore must be properly configured with any indexes as required by your queries.

The downstream manipulator is optional and vortex can also be used a standalone embedded database that obays the manipulate interfaces. Note, that vortex supports both the manipulator and subscriber interfaces and it will propagate any events from the downstream to upstream callers.

For every object you must provide a processor configuration that will define any hook configuration. Separate hooks are possible before its transaction:

  • RemoteHook will be applied before the transaction is send upstream.

  • LocalHost will be applied before the transaction is commited locally.

  • RetrieveManyHook is applied before retrieve many operations.

For every identity, you can configure the cache mode:

  • Write-through indicates that the object will be first commited downstream and only if it succeeds it will be committed localy.

*Write-back indicates that the object will be stored in a local queue and later it will be send to the downstream database.

You can take a look at the test files for examples.

Here is a simple example:

// NewMemoryDB will create the DB.
func NewMemoryDB(
    ctx context.Context,
    b manipulate.TokenManager,
    api string,
    model elemental.ModelManager,
) (manipulate.Manipulator, context.CancelFunc, error) {

    connectContext, cancel := context.WithCancel(ctx)

    m, err := maniphttp.New(

    if err != nil {
        return nil, nil, err

    subscriber := maniphttp.NewSubscriber(m, true)
    subscriber.Start(ctx, nil)

    indexConfig := map[string]*config.MemDBIdentity{
        testmodel.ListIdentity.Category: &config.MemDBIdentity{
            Identity: testmodel.ListIdentity,
            Indexes: []*config.IndexConfig{
                    Name:      "id",
                    Type:      config.String,
                    Unique:    true,
                    Attribute: "ID",
                    Name:      "Name",
                    Type:      config.String,
                    Attribute: "Name",
                    Name:      "Slice",
                    Type:      config.Slice,
                    Attribute: "Slice",

// Create a data store, register the identities and start it.
    datastore, err := memdbvortex.NewDatastore(indexConfig)
    if err != nil {
        return nil, cancel, fmt.Errorf("failed to create local memory db: %s", err)

// Create the processors and the vortex.
    processors := map[string]*config.ProcessorConfiguration{
        testmodel.ListIdentity.Name: &config.ProcessorConfiguration{
            Identity:         testmodel.ListIdentity,
            CommitOnEvent:    true,

    v, err := memdbvortex.NewMemDBVortex(

    return v, cancel, err



Package manipvortex contains a Manipulator that can be used as cache in front of another Manipulator.



This section is empty.


This section is empty.


func New

func New(
	ctx context.Context,
	downstreamManipulator manipulate.Manipulator,
	processors map[string]*Processor,
	model elemental.ModelManager,
	options ...Option,
) (manipulate.BufferedManipulator, error)

New will create a new cache. Caller must provide a valid backend manipulator and susbscriber. If the manipulator is nil, it will be assumed that the cache is standalone (ie there is no backend to synchronize with).

func NewSubscriber

func NewSubscriber(m manipulate.Manipulator, queueSize int) (manipulate.Subscriber, error)

NewSubscriber creates a new vortex subscriber.


type Option

type Option func(*config)

Option represents an option can can be passed to NewContext.

func OptionDefaultConsistency

func OptionDefaultConsistency(read manipulate.ReadConsistency, write manipulate.WriteConsistency) Option

OptionDefaultConsistency sets the default read and write consistency.

func OptionDefaultPageSize

func OptionDefaultPageSize(defaultPageSize int) Option

OptionDefaultPageSize is the page size during fetching.

func OptionDisableCommitUpstream added in v1.89.0

func OptionDisableCommitUpstream(disabled bool) Option

OptionDisableCommitUpstream sets the global upstream Reconcilers to use.

func OptionDownstreamReconciler

func OptionDownstreamReconciler(r Reconciler) Option

OptionDownstreamReconciler sets the global downstream Reconcilers to use.

func OptionPrefetcher

func OptionPrefetcher(p Prefetcher) Option

OptionPrefetcher sets the Prefetcher to use.

func OptionTransactionLog

func OptionTransactionLog(filename string) Option

OptionTransactionLog sets the transaction log file.

func OptionTransactionQueueDuration

func OptionTransactionQueueDuration(d time.Duration) Option

OptionTransactionQueueDuration sets the default queue transaction duration. Once expired, the transaction is discarded.

func OptionTransactionQueueLength

func OptionTransactionQueueLength(n int) Option

OptionTransactionQueueLength sets the queue length of the transaction queue.

func OptionUpstreamManipulator

func OptionUpstreamManipulator(manipulator manipulate.Manipulator) Option

OptionUpstreamManipulator sets the upstream manipulator.

func OptionUpstreamReconciler

func OptionUpstreamReconciler(r Reconciler) Option

OptionUpstreamReconciler sets the global upstream Reconcilers to use.

func OptionUpstreamSubscriber

func OptionUpstreamSubscriber(s manipulate.Subscriber) Option

OptionUpstreamSubscriber sets the upstream subscriber. Note the given subscriber must NOT be started or the events will be received twice, needlessly loading the VortexDB.

type Prefetcher

type Prefetcher interface {

	// WarmUp is called during Vortex initialization.
	// Implementations can use the given manipulator to retrieve all the
	// objects to add into the Vortex cache before starting.
	WarmUp(context.Context, manipulate.Manipulator, elemental.ModelManager, elemental.Identity) (elemental.Identifiables, error)

	// Prefetch is called before a Retrieve or RetrieveMany operation to perform
	// cache prefetching. The given operation will be either elemental.OperationRetrieve or
	// elemental.OperationRetrieveMany. The requested identity is passed, alongside
	// with the request manipulate.Context allowing to do additional conditional logic based on
	// the initial request. It is a copy of the original context, so you cannot change anything
	// in the original request.
	// If Prefetch returns some elemental.Identifiables, all of them will be added to the local cache
	// before peforming the initial request.
	// If Prefetch returns nil, the original operation continues with not additional processing. According
	// to the requested consistency, the data will be either retrieved locally or from upstream.
	// If prefetch returns an error, the upstream operation will be canceled and the error returned.
	// You can use the provided manipulator to retrieve the needed data.
	Prefetch(context.Context, elemental.Operation, elemental.Identity, manipulate.Manipulator, manipulate.Context) (elemental.Identifiables, error)

	// If the prefetcher uses some internal state
	// if must reset it when this is called.

A Prefetcher is used to perform prefetching operations in a Vortex manipulator. Vortex will call the Prefetch method before performing a Retrieve (elemental.OperationRetrieve) or a RetrieveMany (elemental.OperationRetrieveMany) operation to eventually lazy load more data than the one requested so they'll be cached in advanced.

func NewDefaultPrefetcher

func NewDefaultPrefetcher() Prefetcher

NewDefaultPrefetcher returns a new Prefetcher that will simply load everything into memory

type Processor

type Processor struct {

	// Identity is the identity of the object that is stored in the DB.
	Identity elemental.Identity

	// TODO: Mode is the type of default consistency mode required from the cache.
	// This consistency can be overwritten by manipulate options.
	WriteConsistency manipulate.WriteConsistency

	// TODO: Mode is the type of default consistency mode required from the cache.
	// This consistency can be overwritten by manipulate options.
	ReadConsistency manipulate.ReadConsistency

	// QueueingDuration is the maximum time that an object should be
	// cached if the backend is not responding.
	QueueingDuration time.Duration

	// RetrieveManyHook is a hook function that can be called
	// before a RetrieveMany call. It returns an error and a continue
	// boolean. If the continue false, we can return without any
	// additional calls.
	RetrieveManyHook RetrieveManyHook

	// DownstreamReconciler is the custom Reconciler for the processor that
	// will be called to reconcile downstream writes.
	DownstreamReconciler Reconciler

	// UpstreamReconciler is the custom Reconciler for the processor that
	// will be called to reconcile upstream writes.
	UpstreamReconciler Reconciler

	// CommitOnEvent with commit the event in the cache even if a client
	// is subscribed for this event. If left false, it will only commit
	// if no client has subscribed for this event. It will always forward
	// the event to the clients.
	CommitOnEvent bool

	// LazySync will not sync all data of the identity on startup, but
	// will only load data on demand based on the transactions.
	LazySync bool

Processor configures the processing details for a specific identity.

type Reconciler

type Reconciler interface {

	// Reconcile is called before a write operation to
	// to determine if the objects needs reconciliation. If it returns
	// false, the objects are ignored.
	// If it returns an error, the error will be forwarded to the caller.
	// The Reconcile function may modify the objects to perform transformations.
	Reconcile(manipulate.Context, elemental.Operation, elemental.Identifiable) (elemental.Identifiable, bool, error)

An Reconciler can be given to manipvortex to perform pre write reconciliation.

type RetrieveManyHook

type RetrieveManyHook func(m manipulate.Manipulator, mctx manipulate.Context, dest elemental.Identifiables) (reconcile bool, err error)

RetrieveManyHook is the type of a hook for retrieve many

type TestPrefetcher

A TestPrefetcher is prefetcher that can be used for testing purposes.

func NewTestPrefetcher

func NewTestPrefetcher() TestPrefetcher

NewTestPrefetcher returns a new TestPrefetcher.

type TestReconciler

type TestReconciler interface {
	MockReconcile(t *testing.T, impl func(manipulate.Context, elemental.Operation, elemental.Identifiable) (elemental.Identifiable, bool, error))

A TestReconciler is an Reconciler that can be used for testing purposes.

func NewTestReconciler

func NewTestReconciler() TestReconciler

NewTestReconciler returns a new TestReconciler.

type Transaction

type Transaction struct {
	Date time.Time

	Object   elemental.Identifiable
	Method   elemental.Operation
	Deadline time.Time
	// contains filtered or unexported fields

Transaction is the event that captures the transaction for later processing. It is also the structure stored in the transaction logs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL