Vortex - Local Cache for Elemental Objects

Vortex is a generic elemental cache, currently backed by MemDB. Other implementations are possible in the future.

Vortex requires three components:

  • A manipulator that allows Vortex to synchronize objects in the cache with a remote backend.

  • An optional subscriber that allows vortex to synchronize events from an upstream susbscriber.

  • A local datastore where it will cache objects. The datastore must be properly configured with any indexes as required by your queries.

The downstream manipulator is optional and vortex can also be used a standalone embedded database that obays the manipulate interfaces. Note, that vortex supports both the manipulator and subscriber interfaces and it will propagate any events from the downstream to upstream callers.

For every object you must provide a processor configuration that will define any hook configuration. Separate hooks are possible before its transaction:

  • RemoteHook will be applied before the transaction is send upstream.

  • LocalHost will be applied before the transaction is commited locally.

  • RetrieveManyHook is applied before retrieve many operations.

For every identity, you can configure the cache mode:

  • Write-through indicates that the object will be first commited downstream and only if it succeeds it will be committed localy.

*Write-back indicates that the object will be stored in a local queue and later it will be send to the downstream database.

You can take a look at the test files for examples.

Here is a simple example:

// NewMemoryDB will create the DB.
func NewMemoryDB(
    ctx context.Context,
    b manipulate.TokenManager,
    api string,
    model elemental.ModelManager,
) (manipulate.Manipulator, context.CancelFunc, error) {

    connectContext, cancel := context.WithCancel(ctx)

    m, err := maniphttp.New(

    if err != nil {
        return nil, nil, err

    subscriber := maniphttp.NewSubscriber(m, true)
    subscriber.Start(ctx, nil)

    indexConfig := map[string]*config.MemDBIdentity{
        testmodel.ListIdentity.Category: &config.MemDBIdentity{
            Identity: testmodel.ListIdentity,
            Indexes: []*config.IndexConfig{
                    Name:      "id",
                    Type:      config.String,
                    Unique:    true,
                    Attribute: "ID",
                    Name:      "Name",
                    Type:      config.String,
                    Attribute: "Name",
                    Name:      "Slice",
                    Type:      config.Slice,
                    Attribute: "Slice",

// Create a data store, register the identities and start it.
    datastore, err := memdbvortex.NewDatastore(indexConfig)
    if err != nil {
        return nil, cancel, fmt.Errorf("failed to create local memory db: %s", err)

// Create the processors and the vortex.
    processors := map[string]*config.ProcessorConfiguration{
        testmodel.ListIdentity.Name: &config.ProcessorConfiguration{
            Identity:         testmodel.ListIdentity,
            CommitOnEvent:    true,

    v, err := memdbvortex.NewMemDBVortex(

    return v, cancel, err

Expand ▾ Collapse ▴



    Package manipvortex contains a Manipulator that can be used as cache in front of another Manipulator.



    This section is empty.


    This section is empty.


    func New

    func New(
    	ctx context.Context,
    	downstreamManipulator manipulate.Manipulator,
    	processors map[string]*Processor,
    	model elemental.ModelManager,
    	options ...Option,
    ) (manipulate.BufferedManipulator, error)

      New will create a new cache. Caller must provide a valid backend manipulator and susbscriber. If the manipulator is nil, it will be assumed that the cache is standalone (ie there is no backend to synchronize with).

      func NewSubscriber

      func NewSubscriber(m manipulate.Manipulator, queueSize int) (manipulate.Subscriber, error)

        NewSubscriber creates a new vortex subscriber.


        type Option

        type Option func(*config)

          Option represents an option can can be passed to NewContext.

          func OptionDefaultConsistency

          func OptionDefaultConsistency(read manipulate.ReadConsistency, write manipulate.WriteConsistency) Option

            OptionDefaultConsistency sets the default read and write consistency.

            func OptionDefaultPageSize

            func OptionDefaultPageSize(defaultPageSize int) Option

              OptionDefaultPageSize is the page size during fetching.

              func OptionDisableCommitUpstream

              func OptionDisableCommitUpstream(disabled bool) Option

                OptionDisableCommitUpstream sets the global upstream Reconcilers to use.

                func OptionDownstreamReconciler

                func OptionDownstreamReconciler(r Reconciler) Option

                  OptionDownstreamReconciler sets the global downstream Reconcilers to use.

                  func OptionPrefetcher

                  func OptionPrefetcher(p Prefetcher) Option

                    OptionPrefetcher sets the Prefetcher to use.

                    func OptionTransactionLog

                    func OptionTransactionLog(filename string) Option

                      OptionTransactionLog sets the transaction log file.

                      func OptionTransactionQueueDuration

                      func OptionTransactionQueueDuration(d time.Duration) Option

                        OptionTransactionQueueDuration sets the default queue transaction duration. Once expired, the transaction is discarded.

                        func OptionTransactionQueueLength

                        func OptionTransactionQueueLength(n int) Option

                          OptionTransactionQueueLength sets the queue length of the transaction queue.

                          func OptionUpstreamManipulator

                          func OptionUpstreamManipulator(manipulator manipulate.Manipulator) Option

                            OptionUpstreamManipulator sets the upstream manipulator.

                            func OptionUpstreamReconciler

                            func OptionUpstreamReconciler(r Reconciler) Option

                              OptionUpstreamReconciler sets the global upstream Reconcilers to use.

                              func OptionUpstreamSubscriber

                              func OptionUpstreamSubscriber(s manipulate.Subscriber) Option

                                OptionUpstreamSubscriber sets the upstream subscriber. Note the given subscriber must NOT be started or the events will be received twice, needlessly loading the VortexDB.

                                type Prefetcher

                                type Prefetcher interface {
                                	// WarmUp is called during Vortex initialization.
                                	// Implementations can use the given manipulator to retrieve all the
                                	// objects to add into the Vortex cache before starting.
                                	WarmUp(context.Context, manipulate.Manipulator, elemental.ModelManager, elemental.Identity) (elemental.Identifiables, error)
                                	// Prefetch is called before a Retrieve or RetrieveMany operation to perform
                                	// cache prefetching. The given operation will be either elemental.OperationRetrieve or
                                	// elemental.OperationRetrieveMany. The requested identity is passed, alongside
                                	// with the request manipulate.Context allowing to do additional conditional logic based on
                                	// the initial request. It is a copy of the original context, so you cannot change anything
                                	// in the original request.
                                	// If Prefetch returns some elemental.Identifiables, all of them will be added to the local cache
                                	// before peforming the initial request.
                                	// If Prefetch returns nil, the original operation continues with not additional processing. According
                                	// to the requested consistency, the data will be either retrieved locally or from upstream.
                                	// If prefetch returns an error, the upstream operation will be canceled and the error returned.
                                	// You can use the provided manipulator to retrieve the needed data.
                                	Prefetch(context.Context, elemental.Operation, elemental.Identity, manipulate.Manipulator, manipulate.Context) (elemental.Identifiables, error)
                                	// If the prefetcher uses some internal state
                                	// if must reset it when this is called.

                                  A Prefetcher is used to perform prefetching operations in a Vortex manipulator. Vortex will call the Prefetch method before performing a Retrieve (elemental.OperationRetrieve) or a RetrieveMany (elemental.OperationRetrieveMany) operation to eventually lazy load more data than the one requested so they'll be cached in advanced.

                                  func NewDefaultPrefetcher

                                  func NewDefaultPrefetcher() Prefetcher

                                    NewDefaultPrefetcher returns a new Prefetcher that will simply load everything into memory

                                    type Processor

                                    type Processor struct {
                                    	// Identity is the identity of the object that is stored in the DB.
                                    	Identity elemental.Identity
                                    	// TODO: Mode is the type of default consistency mode required from the cache.
                                    	// This consistency can be overwritten by manipulate options.
                                    	WriteConsistency manipulate.WriteConsistency
                                    	// TODO: Mode is the type of default consistency mode required from the cache.
                                    	// This consistency can be overwritten by manipulate options.
                                    	ReadConsistency manipulate.ReadConsistency
                                    	// QueueingDuration is the maximum time that an object should be
                                    	// cached if the backend is not responding.
                                    	QueueingDuration time.Duration
                                    	// RetrieveManyHook is a hook function that can be called
                                    	// before a RetrieveMany call. It returns an error and a continue
                                    	// boolean. If the continue false, we can return without any
                                    	// additional calls.
                                    	RetrieveManyHook RetrieveManyHook
                                    	// DownstreamReconciler is the custom Reconciler for the processor that
                                    	// will be called to reconcile downstream writes.
                                    	DownstreamReconciler Reconciler
                                    	// UpstreamReconciler is the custom Reconciler for the processor that
                                    	// will be called to reconcile upstream writes.
                                    	UpstreamReconciler Reconciler
                                    	// CommitOnEvent with commit the event in the cache even if a client
                                    	// is subscribed for this event. If left false, it will only commit
                                    	// if no client has subscribed for this event. It will always forward
                                    	// the event to the clients.
                                    	CommitOnEvent bool
                                    	// LazySync will not sync all data of the identity on startup, but
                                    	// will only load data on demand based on the transactions.
                                    	LazySync bool

                                      Processor configures the processing details for a specific identity.

                                      type Reconciler

                                      type Reconciler interface {
                                      	// Reconcile is called before a write operation to
                                      	// to determine if the objects needs reconciliation. If it returns
                                      	// false, the objects are ignored.
                                      	// If it returns an error, the error will be forwarded to the caller.
                                      	// The Reconcile function may modify the objects to perform transformations.
                                      	Reconcile(manipulate.Context, elemental.Operation, elemental.Identifiable) (elemental.Identifiable, bool, error)

                                        An Reconciler can be given to manipvortex to perform pre write reconciliation.

                                        type RetrieveManyHook

                                        type RetrieveManyHook func(m manipulate.Manipulator, mctx manipulate.Context, dest elemental.Identifiables) (reconcile bool, err error)

                                          RetrieveManyHook is the type of a hook for retrieve many

                                          type TestPrefetcher

                                            A TestPrefetcher is prefetcher that can be used for testing purposes.

                                            func NewTestPrefetcher

                                            func NewTestPrefetcher() TestPrefetcher

                                              NewTestPrefetcher returns a new TestPrefetcher.

                                              type TestReconciler

                                              type TestReconciler interface {
                                              	MockReconcile(t *testing.T, impl func(manipulate.Context, elemental.Operation, elemental.Identifiable) (elemental.Identifiable, bool, error))

                                                A TestReconciler is an Reconciler that can be used for testing purposes.

                                                func NewTestReconciler

                                                func NewTestReconciler() TestReconciler

                                                  NewTestReconciler returns a new TestReconciler.

                                                  type Transaction

                                                  type Transaction struct {
                                                  	Date time.Time
                                                  	Object   elemental.Identifiable
                                                  	Method   elemental.Operation
                                                  	Deadline time.Time
                                                  	// contains filtered or unexported fields

                                                    Transaction is the event that captures the transaction for later processing. It is also the structure stored in the transaction logs.