README
Vortex - Local Cache for Elemental Objects
Vortex is a generic elemental cache, currently backed by MemDB. Other implementations are possible in the future.
Vortex requires three components:
-
A manipulator that allows Vortex to synchronize objects in the cache with a remote backend.
-
An optional subscriber that allows vortex to synchronize events from an upstream susbscriber.
-
A local datastore where it will cache objects. The datastore must be properly configured with any indexes as required by your queries.
The downstream manipulator is optional and vortex can also be used a standalone embedded database that obays the manipulate interfaces. Note, that vortex supports both the manipulator and subscriber interfaces and it will propagate any events from the downstream to upstream callers.
For every object you must provide a processor configuration that will define any hook configuration. Separate hooks are possible before its transaction:
-
RemoteHook will be applied before the transaction is send upstream.
-
LocalHost will be applied before the transaction is commited locally.
-
RetrieveManyHook is applied before retrieve many operations.
For every identity, you can configure the cache mode:
- Write-through indicates that the object will be first commited downstream and only if it succeeds it will be committed localy.
*Write-back indicates that the object will be stored in a local queue and later it will be send to the downstream database.
You can take a look at the test files for examples.
Here is a simple example:
// NewMemoryDB will create the DB.
func NewMemoryDB(
ctx context.Context,
b manipulate.TokenManager,
api string,
model elemental.ModelManager,
) (manipulate.Manipulator, context.CancelFunc, error) {
connectContext, cancel := context.WithCancel(ctx)
m, err := maniphttp.New(
connectContext,
api,
maniphttp.OptionNamespace(namespace),
maniphttp.OptionTokenManager(b),
)
if err != nil {
cancel()
return nil, nil, err
}
subscriber := maniphttp.NewSubscriber(m, true)
subscriber.Start(ctx, nil)
indexConfig := map[string]*config.MemDBIdentity{
testmodel.ListIdentity.Category: &config.MemDBIdentity{
Identity: testmodel.ListIdentity,
Indexes: []*config.IndexConfig{
&config.IndexConfig{
Name: "id",
Type: config.String,
Unique: true,
Attribute: "ID",
},
&config.IndexConfig{
Name: "Name",
Type: config.String,
Attribute: "Name",
,
&config.IndexConfig{
Name: "Slice",
Type: config.Slice,
Attribute: "Slice",
},
},
},
}
// Create a data store, register the identities and start it.
datastore, err := memdbvortex.NewDatastore(indexConfig)
if err != nil {
return nil, cancel, fmt.Errorf("failed to create local memory db: %s", err)
}
// Create the processors and the vortex.
processors := map[string]*config.ProcessorConfiguration{
testmodel.ListIdentity.Name: &config.ProcessorConfiguration{
Identity: testmodel.ListIdentity,
CommitOnEvent: true,
},
v, err := memdbvortex.NewMemDBVortex(
ctx,
datastore,
processors,
gaia.Manager(),
memdbvortex.OptionBackendManipulator(s),
memdbvortex.OptionBackendSubscriber(subscriber),
)
return v, cancel, err
}
Documentation
Overview ¶
Package manipvortex contains a Manipulator that can be used as cache in front of another Manipulator.
Index ¶
- func New(ctx context.Context, downstreamManipulator manipulate.Manipulator, ...) (manipulate.BufferedManipulator, error)
- func NewSubscriber(m manipulate.Manipulator, queueSize int) (manipulate.Subscriber, error)
- type Option
- func OptionDefaultConsistency(read manipulate.ReadConsistency, write manipulate.WriteConsistency) Option
- func OptionDefaultPageSize(defaultPageSize int) Option
- func OptionDisableCommitUpstream(disabled bool) Option
- func OptionDownstreamReconciler(r Reconciler) Option
- func OptionPrefetcher(p Prefetcher) Option
- func OptionTransactionLog(filename string) Option
- func OptionTransactionQueueDuration(d time.Duration) Option
- func OptionTransactionQueueLength(n int) Option
- func OptionUpstreamManipulator(manipulator manipulate.Manipulator) Option
- func OptionUpstreamReconciler(r Reconciler) Option
- func OptionUpstreamSubscriber(s manipulate.Subscriber) Option
- type Prefetcher
- type Processor
- type Reconciler
- type RetrieveManyHook
- type TestPrefetcher
- type TestReconciler
- type Transaction
Constants ¶
Variables ¶
Functions ¶
func New ¶
func New( ctx context.Context, downstreamManipulator manipulate.Manipulator, processors map[string]*Processor, model elemental.ModelManager, options ...Option, ) (manipulate.BufferedManipulator, error)
New will create a new cache. Caller must provide a valid backend manipulator and susbscriber. If the manipulator is nil, it will be assumed that the cache is standalone (ie there is no backend to synchronize with).
func NewSubscriber ¶
func NewSubscriber(m manipulate.Manipulator, queueSize int) (manipulate.Subscriber, error)
NewSubscriber creates a new vortex subscriber.
Types ¶
type Option ¶
type Option func(*config)
Option represents an option can can be passed to NewContext.
func OptionDefaultConsistency ¶
func OptionDefaultConsistency(read manipulate.ReadConsistency, write manipulate.WriteConsistency) Option
OptionDefaultConsistency sets the default read and write consistency.
func OptionDefaultPageSize ¶
OptionDefaultPageSize is the page size during fetching.
func OptionDisableCommitUpstream ¶
OptionDisableCommitUpstream sets the global upstream Reconcilers to use.
func OptionDownstreamReconciler ¶
func OptionDownstreamReconciler(r Reconciler) Option
OptionDownstreamReconciler sets the global downstream Reconcilers to use.
func OptionPrefetcher ¶
func OptionPrefetcher(p Prefetcher) Option
OptionPrefetcher sets the Prefetcher to use.
func OptionTransactionLog ¶
OptionTransactionLog sets the transaction log file.
func OptionTransactionQueueDuration ¶
OptionTransactionQueueDuration sets the default queue transaction duration. Once expired, the transaction is discarded.
func OptionTransactionQueueLength ¶
OptionTransactionQueueLength sets the queue length of the transaction queue.
func OptionUpstreamManipulator ¶
func OptionUpstreamManipulator(manipulator manipulate.Manipulator) Option
OptionUpstreamManipulator sets the upstream manipulator.
func OptionUpstreamReconciler ¶
func OptionUpstreamReconciler(r Reconciler) Option
OptionUpstreamReconciler sets the global upstream Reconcilers to use.
func OptionUpstreamSubscriber ¶
func OptionUpstreamSubscriber(s manipulate.Subscriber) Option
OptionUpstreamSubscriber sets the upstream subscriber. Note the given subscriber must NOT be started or the events will be received twice, needlessly loading the VortexDB.
type Prefetcher ¶
type Prefetcher interface { // WarmUp is called during Vortex initialization. // Implementations can use the given manipulator to retrieve all the // objects to add into the Vortex cache before starting. WarmUp(context.Context, manipulate.Manipulator, elemental.ModelManager, elemental.Identity) (elemental.Identifiables, error) // Prefetch is called before a Retrieve or RetrieveMany operation to perform // cache prefetching. The given operation will be either elemental.OperationRetrieve or // elemental.OperationRetrieveMany. The requested identity is passed, alongside // with the request manipulate.Context allowing to do additional conditional logic based on // the initial request. It is a copy of the original context, so you cannot change anything // in the original request. // // If Prefetch returns some elemental.Identifiables, all of them will be added to the local cache // before peforming the initial request. // // If Prefetch returns nil, the original operation continues with not additional processing. According // to the requested consistency, the data will be either retrieved locally or from upstream. // // If prefetch returns an error, the upstream operation will be canceled and the error returned. // You can use the provided manipulator to retrieve the needed data. Prefetch(context.Context, elemental.Operation, elemental.Identity, manipulate.Manipulator, manipulate.Context) (elemental.Identifiables, error) // If the prefetcher uses some internal state // if must reset it when this is called. Flush() }
A Prefetcher is used to perform prefetching operations in a Vortex manipulator. Vortex will call the Prefetch method before performing a Retrieve (elemental.OperationRetrieve) or a RetrieveMany (elemental.OperationRetrieveMany) operation to eventually lazy load more data than the one requested so they'll be cached in advanced.
func NewDefaultPrefetcher ¶
func NewDefaultPrefetcher() Prefetcher
NewDefaultPrefetcher returns a new Prefetcher that will simply load everything into memory
type Processor ¶
type Processor struct { // Identity is the identity of the object that is stored in the DB. Identity elemental.Identity // TODO: Mode is the type of default consistency mode required from the cache. // This consistency can be overwritten by manipulate options. WriteConsistency manipulate.WriteConsistency // TODO: Mode is the type of default consistency mode required from the cache. // This consistency can be overwritten by manipulate options. ReadConsistency manipulate.ReadConsistency // QueueingDuration is the maximum time that an object should be // cached if the backend is not responding. QueueingDuration time.Duration // RetrieveManyHook is a hook function that can be called // before a RetrieveMany call. It returns an error and a continue // boolean. If the continue false, we can return without any // additional calls. RetrieveManyHook RetrieveManyHook // DownstreamReconciler is the custom Reconciler for the processor that // will be called to reconcile downstream writes. DownstreamReconciler Reconciler // UpstreamReconciler is the custom Reconciler for the processor that // will be called to reconcile upstream writes. UpstreamReconciler Reconciler // CommitOnEvent with commit the event in the cache even if a client // is subscribed for this event. If left false, it will only commit // if no client has subscribed for this event. It will always forward // the event to the clients. CommitOnEvent bool // LazySync will not sync all data of the identity on startup, but // will only load data on demand based on the transactions. LazySync bool }
Processor configures the processing details for a specific identity.
type Reconciler ¶
type Reconciler interface { // Reconcile is called before a write operation to // to determine if the objects needs reconciliation. If it returns // false, the objects are ignored. // If it returns an error, the error will be forwarded to the caller. // The Reconcile function may modify the objects to perform transformations. Reconcile(manipulate.Context, elemental.Operation, elemental.Identifiable) (elemental.Identifiable, bool, error) }
An Reconciler can be given to manipvortex to perform pre write reconciliation.
type RetrieveManyHook ¶
type RetrieveManyHook func(m manipulate.Manipulator, mctx manipulate.Context, dest elemental.Identifiables) (reconcile bool, err error)
RetrieveManyHook is the type of a hook for retrieve many
type TestPrefetcher ¶
type TestPrefetcher interface { Prefetcher MockWarmUp(t *testing.T, impl func(context.Context, manipulate.Manipulator, elemental.ModelManager, elemental.Identity) (elemental.Identifiables, error)) MockPrefetch(t *testing.T, impl func(context.Context, elemental.Operation, elemental.Identity, manipulate.Manipulator, manipulate.Context) (elemental.Identifiables, error)) MockFlush(t *testing.T, impl func()) }
A TestPrefetcher is prefetcher that can be used for testing purposes.
func NewTestPrefetcher ¶
func NewTestPrefetcher() TestPrefetcher
NewTestPrefetcher returns a new TestPrefetcher.
type TestReconciler ¶
type TestReconciler interface { Reconciler MockReconcile(t *testing.T, impl func(manipulate.Context, elemental.Operation, elemental.Identifiable) (elemental.Identifiable, bool, error)) }
A TestReconciler is an Reconciler that can be used for testing purposes.
func NewTestReconciler ¶
func NewTestReconciler() TestReconciler
NewTestReconciler returns a new TestReconciler.
type Transaction ¶
type Transaction struct { Date time.Time Object elemental.Identifiable Method elemental.Operation Deadline time.Time // contains filtered or unexported fields }
Transaction is the event that captures the transaction for later processing. It is also the structure stored in the transaction logs.