Documentation
¶
Overview ¶
Package memory is the in-process storage plugin for cyoda-go.
It serves both as the default backend (no external dependencies) and as the minimal reference implementation for plugin authors — it implements every required interface method, nothing more.
Registration happens at init() time via spi.Register. A binary picks up this plugin by blank-importing it:
import _ "github.com/cyoda-platform/cyoda-go/plugins/memory"
Configuration: none. The plugin reads no environment variables and does not implement spi.DescribablePlugin.
Index ¶
- type ApplyFunc
- type AsyncSearchStore
- func (s *AsyncSearchStore) Cancel(ctx context.Context, jobID string) error
- func (s *AsyncSearchStore) CreateJob(ctx context.Context, job *spi.SearchJob) error
- func (s *AsyncSearchStore) DeleteJob(ctx context.Context, jobID string) error
- func (s *AsyncSearchStore) GetJob(ctx context.Context, jobID string) (*spi.SearchJob, error)
- func (s *AsyncSearchStore) GetResultIDs(ctx context.Context, jobID string, offset, limit int) ([]string, int, error)
- func (s *AsyncSearchStore) ReapExpired(ctx context.Context, ttl time.Duration) (int, error)
- func (s *AsyncSearchStore) SaveResults(ctx context.Context, jobID string, entityIDs []string) error
- func (s *AsyncSearchStore) UpdateJobStatus(ctx context.Context, jobID string, status string, resultCount int, ...) error
- type Clock
- type EntityStore
- func (s *EntityStore) CompareAndSave(ctx context.Context, entity *spi.Entity, expectedTxID string) (int64, error)
- func (s *EntityStore) Count(ctx context.Context, modelRef spi.ModelRef) (int64, error)
- func (s *EntityStore) CountByState(ctx context.Context, modelRef spi.ModelRef, states []string) (map[string]int64, error)
- func (s *EntityStore) Delete(ctx context.Context, entityID string) error
- func (s *EntityStore) DeleteAll(ctx context.Context, modelRef spi.ModelRef) error
- func (s *EntityStore) Exists(ctx context.Context, entityID string) (bool, error)
- func (s *EntityStore) Get(ctx context.Context, entityID string) (*spi.Entity, error)
- func (s *EntityStore) GetAll(ctx context.Context, modelRef spi.ModelRef) ([]*spi.Entity, error)
- func (s *EntityStore) GetAllAsAt(ctx context.Context, modelRef spi.ModelRef, asAt time.Time) ([]*spi.Entity, error)
- func (s *EntityStore) GetAsAt(ctx context.Context, entityID string, asAt time.Time) (*spi.Entity, error)
- func (s *EntityStore) GetVersionHistory(ctx context.Context, entityID string) ([]spi.EntityVersion, error)
- func (s *EntityStore) Save(ctx context.Context, entity *spi.Entity) (int64, error)
- func (s *EntityStore) SaveAll(ctx context.Context, entities iter.Seq[*spi.Entity]) ([]int64, error)
- type KeyValueStore
- func (s *KeyValueStore) Delete(ctx context.Context, namespace string, key string) error
- func (s *KeyValueStore) Get(ctx context.Context, namespace string, key string) ([]byte, error)
- func (s *KeyValueStore) List(ctx context.Context, namespace string) (map[string][]byte, error)
- func (s *KeyValueStore) Put(ctx context.Context, namespace string, key string, value []byte) error
- type MessageStore
- func (s *MessageStore) Delete(_ context.Context, id string) error
- func (s *MessageStore) DeleteBatch(ctx context.Context, ids []string) error
- func (s *MessageStore) Get(_ context.Context, id string) (spi.MessageHeader, spi.MessageMetaData, io.ReadCloser, error)
- func (s *MessageStore) Save(_ context.Context, id string, header spi.MessageHeader, ...) error
- type ModelStore
- func (s *ModelStore) Delete(ctx context.Context, modelRef spi.ModelRef) error
- func (s *ModelStore) ExtendSchema(ctx context.Context, ref spi.ModelRef, delta spi.SchemaDelta) error
- func (s *ModelStore) Get(ctx context.Context, modelRef spi.ModelRef) (*spi.ModelDescriptor, error)
- func (s *ModelStore) GetAll(ctx context.Context) ([]spi.ModelRef, error)
- func (s *ModelStore) IsLocked(ctx context.Context, modelRef spi.ModelRef) (bool, error)
- func (s *ModelStore) Lock(ctx context.Context, modelRef spi.ModelRef) error
- func (s *ModelStore) Save(ctx context.Context, desc *spi.ModelDescriptor) error
- func (s *ModelStore) SetChangeLevel(ctx context.Context, modelRef spi.ModelRef, level spi.ChangeLevel) error
- func (s *ModelStore) Unlock(ctx context.Context, modelRef spi.ModelRef) error
- type Option
- type StateMachineAuditStore
- func (s *StateMachineAuditStore) GetEvents(ctx context.Context, entityID string) ([]spi.StateMachineEvent, error)
- func (s *StateMachineAuditStore) GetEventsByTransaction(ctx context.Context, entityID string, transactionID string) ([]spi.StateMachineEvent, error)
- func (s *StateMachineAuditStore) Record(ctx context.Context, entityID string, event spi.StateMachineEvent) error
- type StoreFactory
- func (f *StoreFactory) AsyncSearchStore(_ context.Context) (spi.AsyncSearchStore, error)
- func (f *StoreFactory) Close() error
- func (f *StoreFactory) EntityStore(ctx context.Context) (spi.EntityStore, error)
- func (f *StoreFactory) GetTransactionManager() spi.TransactionManager
- func (f *StoreFactory) KeyValueStore(ctx context.Context) (spi.KeyValueStore, error)
- func (f *StoreFactory) MessageStore(ctx context.Context) (spi.MessageStore, error)
- func (f *StoreFactory) ModelStore(ctx context.Context) (spi.ModelStore, error)
- func (f *StoreFactory) NewTransactionManager(uuids spi.UUIDGenerator) *TransactionManager
- func (f *StoreFactory) SetApplyFunc(fn func(base []byte, delta spi.SchemaDelta) ([]byte, error))
- func (f *StoreFactory) StateMachineAuditStore(ctx context.Context) (spi.StateMachineAuditStore, error)
- func (f *StoreFactory) TransactionManager(ctx context.Context) (spi.TransactionManager, error)
- func (f *StoreFactory) WorkflowStore(ctx context.Context) (spi.WorkflowStore, error)
- type TestClock
- type TransactionManager
- func (m *TransactionManager) Begin(ctx context.Context) (string, context.Context, error)
- func (m *TransactionManager) Commit(ctx context.Context, txID string) error
- func (m *TransactionManager) CommittedLogLen() int
- func (m *TransactionManager) GetSubmitTime(_ context.Context, txID string) (time.Time, error)
- func (m *TransactionManager) Join(ctx context.Context, txID string) (context.Context, error)
- func (m *TransactionManager) ReleaseSavepoint(ctx context.Context, txID string, savepointID string) error
- func (m *TransactionManager) Rollback(ctx context.Context, txID string) error
- func (m *TransactionManager) RollbackToSavepoint(ctx context.Context, txID string, savepointID string) error
- func (m *TransactionManager) Savepoint(ctx context.Context, txID string) (string, error)
- type WorkflowStore
- func (s *WorkflowStore) Delete(ctx context.Context, modelRef spi.ModelRef) error
- func (s *WorkflowStore) Get(ctx context.Context, modelRef spi.ModelRef) ([]spi.WorkflowDefinition, error)
- func (s *WorkflowStore) Save(ctx context.Context, modelRef spi.ModelRef, workflows []spi.WorkflowDefinition) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ApplyFunc ¶ added in v0.7.1
type ApplyFunc func(base []byte, delta spi.SchemaDelta) ([]byte, error)
ApplyFunc replays an opaque SchemaDelta onto a base schema.
type AsyncSearchStore ¶
type AsyncSearchStore struct {
// contains filtered or unexported fields
}
AsyncSearchStore is a tenant-scoped, in-memory implementation of spi.AsyncSearchStore.
func NewAsyncSearchStore ¶
func NewAsyncSearchStore() *AsyncSearchStore
NewAsyncSearchStore creates a new in-memory AsyncSearchStore using the wall clock. Prefer newAsyncSearchStore for internal construction so the factory clock is shared.
func (*AsyncSearchStore) Cancel ¶
func (s *AsyncSearchStore) Cancel(ctx context.Context, jobID string) error
Cancel marks the job as CANCELLED. Idempotent: cancelling a job already in a terminal state returns nil. Cancelling a non-existent job returns spi.ErrNotFound.
func (*AsyncSearchStore) DeleteJob ¶
func (s *AsyncSearchStore) DeleteJob(ctx context.Context, jobID string) error
func (*AsyncSearchStore) GetResultIDs ¶
func (*AsyncSearchStore) ReapExpired ¶
func (*AsyncSearchStore) SaveResults ¶
type Clock ¶ added in v0.7.1
Clock abstracts time.Now so tests can advance it deterministically. Production uses wallClock; conformance tests use TestClock.
type EntityStore ¶
type EntityStore struct {
// contains filtered or unexported fields
}
func (*EntityStore) CompareAndSave ¶
func (*EntityStore) CountByState ¶ added in v0.7.1
func (s *EntityStore) CountByState(ctx context.Context, modelRef spi.ModelRef, states []string) (map[string]int64, error)
CountByState returns counts of non-deleted entities grouped by state for the given model. See SPI godoc on EntityStore.CountByState for filter semantics.
func (*EntityStore) Delete ¶
func (s *EntityStore) Delete(ctx context.Context, entityID string) error
func (*EntityStore) GetAllAsAt ¶
func (*EntityStore) GetVersionHistory ¶
func (s *EntityStore) GetVersionHistory(ctx context.Context, entityID string) ([]spi.EntityVersion, error)
type KeyValueStore ¶
type KeyValueStore struct {
// contains filtered or unexported fields
}
type MessageStore ¶
type MessageStore struct {
// contains filtered or unexported fields
}
func (*MessageStore) DeleteBatch ¶
func (s *MessageStore) DeleteBatch(ctx context.Context, ids []string) error
func (*MessageStore) Get ¶
func (s *MessageStore) Get(_ context.Context, id string) (spi.MessageHeader, spi.MessageMetaData, io.ReadCloser, error)
func (*MessageStore) Save ¶
func (s *MessageStore) Save(_ context.Context, id string, header spi.MessageHeader, metaData spi.MessageMetaData, payload io.Reader) error
type ModelStore ¶
type ModelStore struct {
// contains filtered or unexported fields
}
ModelStore is a tenant-scoped, in-memory implementation of spi.ModelStore.
func (*ModelStore) ExtendSchema ¶ added in v0.7.1
func (s *ModelStore) ExtendSchema(ctx context.Context, ref spi.ModelRef, delta spi.SchemaDelta) error
ExtendSchema applies the delta to the current schema via the injected ApplyFunc. Memory is single-writer so apply-in-place under the factory's model mutex is correct.
func (*ModelStore) Get ¶
func (s *ModelStore) Get(ctx context.Context, modelRef spi.ModelRef) (*spi.ModelDescriptor, error)
func (*ModelStore) Save ¶
func (s *ModelStore) Save(ctx context.Context, desc *spi.ModelDescriptor) error
func (*ModelStore) SetChangeLevel ¶
func (s *ModelStore) SetChangeLevel(ctx context.Context, modelRef spi.ModelRef, level spi.ChangeLevel) error
type Option ¶ added in v0.7.1
type Option func(*StoreFactory)
Option is a functional option for NewStoreFactory.
func WithApplyFunc ¶ added in v0.7.1
WithApplyFunc installs the replay function used by ExtendSchema.
type StateMachineAuditStore ¶
type StateMachineAuditStore struct {
// contains filtered or unexported fields
}
func (*StateMachineAuditStore) GetEvents ¶
func (s *StateMachineAuditStore) GetEvents(ctx context.Context, entityID string) ([]spi.StateMachineEvent, error)
func (*StateMachineAuditStore) GetEventsByTransaction ¶
func (s *StateMachineAuditStore) GetEventsByTransaction(ctx context.Context, entityID string, transactionID string) ([]spi.StateMachineEvent, error)
func (*StateMachineAuditStore) Record ¶
func (s *StateMachineAuditStore) Record(ctx context.Context, entityID string, event spi.StateMachineEvent) error
type StoreFactory ¶
type StoreFactory struct {
// contains filtered or unexported fields
}
func NewStoreFactory ¶
func NewStoreFactory(opts ...Option) *StoreFactory
func (*StoreFactory) AsyncSearchStore ¶
func (f *StoreFactory) AsyncSearchStore(_ context.Context) (spi.AsyncSearchStore, error)
func (*StoreFactory) Close ¶
func (f *StoreFactory) Close() error
func (*StoreFactory) EntityStore ¶
func (f *StoreFactory) EntityStore(ctx context.Context) (spi.EntityStore, error)
func (*StoreFactory) GetTransactionManager ¶
func (f *StoreFactory) GetTransactionManager() spi.TransactionManager
GetTransactionManager returns the registered TransactionManager, or nil.
func (*StoreFactory) KeyValueStore ¶
func (f *StoreFactory) KeyValueStore(ctx context.Context) (spi.KeyValueStore, error)
func (*StoreFactory) MessageStore ¶
func (f *StoreFactory) MessageStore(ctx context.Context) (spi.MessageStore, error)
func (*StoreFactory) ModelStore ¶
func (f *StoreFactory) ModelStore(ctx context.Context) (spi.ModelStore, error)
func (*StoreFactory) NewTransactionManager ¶
func (f *StoreFactory) NewTransactionManager(uuids spi.UUIDGenerator) *TransactionManager
NewTransactionManager creates and registers a TransactionManager on the StoreFactory.
func (*StoreFactory) SetApplyFunc ¶ added in v0.7.1
func (f *StoreFactory) SetApplyFunc(fn func(base []byte, delta spi.SchemaDelta) ([]byte, error))
SetApplyFunc installs the replay function used by ExtendSchema. May be called at most once — typically immediately after Plugin.NewFactory in app/app.go. Panics on double-call (programmer error).
The parameter is the unnamed function type (not memory.ApplyFunc) so that an interface type-assertion in app/app.go can satisfy the setter uniformly across plugins.
func (*StoreFactory) StateMachineAuditStore ¶
func (f *StoreFactory) StateMachineAuditStore(ctx context.Context) (spi.StateMachineAuditStore, error)
func (*StoreFactory) TransactionManager ¶
func (f *StoreFactory) TransactionManager(ctx context.Context) (spi.TransactionManager, error)
TransactionManager implements spi.StoreFactory. Returns the TM registered via NewTransactionManager. Errors if none is set.
func (*StoreFactory) WorkflowStore ¶
func (f *StoreFactory) WorkflowStore(ctx context.Context) (spi.WorkflowStore, error)
type TestClock ¶ added in v0.7.1
type TestClock struct {
// contains filtered or unexported fields
}
TestClock is a mutable clock for tests. Advance(d) moves it forward; Now returns the current virtual time. Safe for concurrent use.
func NewTestClock ¶ added in v0.7.1
func NewTestClock() *TestClock
NewTestClock returns a TestClock starting at the current wall-clock time.
type TransactionManager ¶
type TransactionManager struct {
// contains filtered or unexported fields
}
TransactionManager implements spi.TransactionManager using Snapshot Isolation (SSI). It lives in the memory package because it needs direct access to StoreFactory's entityData map and mu lock for the atomic commit flush.
func (*TransactionManager) Begin ¶
Begin starts a new transaction. It resolves the tenant from the context, generates a unique transaction ID, captures a snapshot time, and returns a new context carrying the TransactionState.
func (*TransactionManager) Commit ¶
func (m *TransactionManager) Commit(ctx context.Context, txID string) error
Commit validates the transaction against the committed log for SSI conflicts, flushes the write buffer and deletes to the entity store, and records the commit in the log.
func (*TransactionManager) CommittedLogLen ¶
func (m *TransactionManager) CommittedLogLen() int
CommittedLogLen returns the current length of the committed log. Exported for testing only.
func (*TransactionManager) GetSubmitTime ¶
GetSubmitTime returns the submit time of a committed transaction. Returns an error if the transaction is still active or not found.
func (*TransactionManager) Join ¶
Join returns a context carrying the TransactionState for an existing active transaction. This allows multiple goroutines to participate in the same transaction. Callers must coordinate access to the transaction's Buffer, ReadSet, WriteSet, and Deletes maps.
Locking discipline (issue #199 audit): Rollback writes tx.RolledBack inside m.mu only; Commit and Rollback both write tx.Closed in their defer under tx.OpMu.Lock only. Reading those fields requires tx.OpMu.RLock to be synchronised against the Closed-write — m.mu alone is not sufficient because Commit's defer runs outside the m.mu region.
func (*TransactionManager) ReleaseSavepoint ¶
func (m *TransactionManager) ReleaseSavepoint(ctx context.Context, txID string, savepointID string) error
ReleaseSavepoint releases a savepoint. The work done since the savepoint is already in the parent transaction's buffer, so this just removes the snapshot.
Locking discipline (issue #199): ReleaseSavepoint does not read or write tx.Buffer / tx.ReadSet / tx.WriteSet / tx.Deletes — it only mutates m.savepoints. Holds m.mu only; tx.OpMu is not required because there is no tx-state field to coordinate against Commit/Rollback.
Tenant isolation (issue #199 PR-A review I-1): rejects cross-tenant callers — m.savepoints is tenant-scoped state.
func (*TransactionManager) Rollback ¶
func (m *TransactionManager) Rollback(ctx context.Context, txID string) error
Rollback discards an active transaction without committing any changes.
func (*TransactionManager) RollbackToSavepoint ¶
func (m *TransactionManager) RollbackToSavepoint(ctx context.Context, txID string, savepointID string) error
RollbackToSavepoint restores the transaction's buffer maps from the snapshot captured when the savepoint was created, then removes the snapshot.
Locking discipline (issue #199): RollbackToSavepoint replaces tx.Buffer / tx.ReadSet / tx.WriteSet / tx.Deletes — exclusive against every other tx-path op. Holds tx.OpMu.Lock (write) for the duration of the field replacement. Lock interleaving with m.mu follows Commit's pattern.
Tenant isolation (issue #199 PR-A review I-1): rejects cross-tenant callers — RollbackToSavepoint is destructive on tx-state.
func (*TransactionManager) Savepoint ¶
Savepoint creates a named savepoint within the given transaction by deep-copying the transaction's buffer maps.
Locking discipline (issue #199): Savepoint reads tx.Buffer / tx.ReadSet / tx.WriteSet / tx.Deletes — the same fields Commit's flush phase iterates under tx.OpMu.Lock and that other tx-path ops (Save, Get, Delete, ...) mutate under tx.OpMu.RLock. Savepoint must therefore hold tx.OpMu.RLock across those reads. The lock interleaving with m.mu follows Commit's pattern: drop m.mu before taking tx.OpMu, re-take m.mu briefly for the m.savepoints update.
Tenant isolation (issue #199 PR-A review I-1): rejects callers whose UserContext tenant does not match the transaction's tenant, mirroring Commit/Rollback. Without this guard a caller authenticated as tenant A who learned a tenant B txID could record a snapshot against tenant B's tx-state.
type WorkflowStore ¶
type WorkflowStore struct {
// contains filtered or unexported fields
}
func (*WorkflowStore) Get ¶
func (s *WorkflowStore) Get(ctx context.Context, modelRef spi.ModelRef) ([]spi.WorkflowDefinition, error)
func (*WorkflowStore) Save ¶
func (s *WorkflowStore) Save(ctx context.Context, modelRef spi.ModelRef, workflows []spi.WorkflowDefinition) error