memory

package module
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package memory is the in-process storage plugin for cyoda-go.

It serves both as the default backend (no external dependencies) and as the minimal reference implementation for plugin authors — it implements every required interface method, nothing more.

Registration happens at init() time via spi.Register. A binary picks up this plugin by blank-importing it:

import _ "github.com/cyoda-platform/cyoda-go/plugins/memory"

Configuration: none. The plugin reads no environment variables and does not implement spi.DescribablePlugin.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ApplyFunc added in v0.7.1

type ApplyFunc func(base []byte, delta spi.SchemaDelta) ([]byte, error)

ApplyFunc replays an opaque SchemaDelta onto a base schema.

type AsyncSearchStore

type AsyncSearchStore struct {
	// contains filtered or unexported fields
}

AsyncSearchStore is a tenant-scoped, in-memory implementation of spi.AsyncSearchStore.

func NewAsyncSearchStore

func NewAsyncSearchStore() *AsyncSearchStore

NewAsyncSearchStore creates a new in-memory AsyncSearchStore using the wall clock. Prefer newAsyncSearchStore for internal construction so the factory clock is shared.

func (*AsyncSearchStore) Cancel

func (s *AsyncSearchStore) Cancel(ctx context.Context, jobID string) error

Cancel marks the job as CANCELLED. Idempotent: cancelling a job already in a terminal state returns nil. Cancelling a non-existent job returns spi.ErrNotFound.

func (*AsyncSearchStore) CreateJob

func (s *AsyncSearchStore) CreateJob(ctx context.Context, job *spi.SearchJob) error

func (*AsyncSearchStore) DeleteJob

func (s *AsyncSearchStore) DeleteJob(ctx context.Context, jobID string) error

func (*AsyncSearchStore) GetJob

func (s *AsyncSearchStore) GetJob(ctx context.Context, jobID string) (*spi.SearchJob, error)

func (*AsyncSearchStore) GetResultIDs

func (s *AsyncSearchStore) GetResultIDs(ctx context.Context, jobID string, offset, limit int) ([]string, int, error)

func (*AsyncSearchStore) ReapExpired

func (s *AsyncSearchStore) ReapExpired(ctx context.Context, ttl time.Duration) (int, error)

func (*AsyncSearchStore) SaveResults

func (s *AsyncSearchStore) SaveResults(ctx context.Context, jobID string, entityIDs []string) error

func (*AsyncSearchStore) UpdateJobStatus

func (s *AsyncSearchStore) UpdateJobStatus(ctx context.Context, jobID string, status string, resultCount int, errMsg string, finishTime time.Time, calcTimeMs int64) error

type Clock added in v0.7.1

type Clock interface {
	Now() time.Time
}

Clock abstracts time.Now so tests can advance it deterministically. Production uses wallClock; conformance tests use TestClock.

type EntityStore

type EntityStore struct {
	// contains filtered or unexported fields
}

func (*EntityStore) CompareAndSave

func (s *EntityStore) CompareAndSave(ctx context.Context, entity *spi.Entity, expectedTxID string) (int64, error)

func (*EntityStore) Count

func (s *EntityStore) Count(ctx context.Context, modelRef spi.ModelRef) (int64, error)

func (*EntityStore) CountByState added in v0.7.1

func (s *EntityStore) CountByState(ctx context.Context, modelRef spi.ModelRef, states []string) (map[string]int64, error)

CountByState returns counts of non-deleted entities grouped by state for the given model. See SPI godoc on EntityStore.CountByState for filter semantics.

func (*EntityStore) Delete

func (s *EntityStore) Delete(ctx context.Context, entityID string) error

func (*EntityStore) DeleteAll

func (s *EntityStore) DeleteAll(ctx context.Context, modelRef spi.ModelRef) error

func (*EntityStore) Exists

func (s *EntityStore) Exists(ctx context.Context, entityID string) (bool, error)

func (*EntityStore) Get

func (s *EntityStore) Get(ctx context.Context, entityID string) (*spi.Entity, error)

func (*EntityStore) GetAll

func (s *EntityStore) GetAll(ctx context.Context, modelRef spi.ModelRef) ([]*spi.Entity, error)

func (*EntityStore) GetAllAsAt

func (s *EntityStore) GetAllAsAt(ctx context.Context, modelRef spi.ModelRef, asAt time.Time) ([]*spi.Entity, error)

func (*EntityStore) GetAsAt

func (s *EntityStore) GetAsAt(ctx context.Context, entityID string, asAt time.Time) (*spi.Entity, error)

func (*EntityStore) GetVersionHistory

func (s *EntityStore) GetVersionHistory(ctx context.Context, entityID string) ([]spi.EntityVersion, error)

func (*EntityStore) Save

func (s *EntityStore) Save(ctx context.Context, entity *spi.Entity) (int64, error)

func (*EntityStore) SaveAll

func (s *EntityStore) SaveAll(ctx context.Context, entities iter.Seq[*spi.Entity]) ([]int64, error)

type KeyValueStore

type KeyValueStore struct {
	// contains filtered or unexported fields
}

func (*KeyValueStore) Delete

func (s *KeyValueStore) Delete(ctx context.Context, namespace string, key string) error

func (*KeyValueStore) Get

func (s *KeyValueStore) Get(ctx context.Context, namespace string, key string) ([]byte, error)

func (*KeyValueStore) List

func (s *KeyValueStore) List(ctx context.Context, namespace string) (map[string][]byte, error)

func (*KeyValueStore) Put

func (s *KeyValueStore) Put(ctx context.Context, namespace string, key string, value []byte) error

type MessageStore

type MessageStore struct {
	// contains filtered or unexported fields
}

func (*MessageStore) Delete

func (s *MessageStore) Delete(_ context.Context, id string) error

func (*MessageStore) DeleteBatch

func (s *MessageStore) DeleteBatch(ctx context.Context, ids []string) error

func (*MessageStore) Get

func (*MessageStore) Save

func (s *MessageStore) Save(_ context.Context, id string, header spi.MessageHeader, metaData spi.MessageMetaData, payload io.Reader) error

type ModelStore

type ModelStore struct {
	// contains filtered or unexported fields
}

ModelStore is a tenant-scoped, in-memory implementation of spi.ModelStore.

func (*ModelStore) Delete

func (s *ModelStore) Delete(ctx context.Context, modelRef spi.ModelRef) error

func (*ModelStore) ExtendSchema added in v0.7.1

func (s *ModelStore) ExtendSchema(ctx context.Context, ref spi.ModelRef, delta spi.SchemaDelta) error

ExtendSchema applies the delta to the current schema via the injected ApplyFunc. Memory is single-writer so apply-in-place under the factory's model mutex is correct.

func (*ModelStore) Get

func (s *ModelStore) Get(ctx context.Context, modelRef spi.ModelRef) (*spi.ModelDescriptor, error)

func (*ModelStore) GetAll

func (s *ModelStore) GetAll(ctx context.Context) ([]spi.ModelRef, error)

func (*ModelStore) IsLocked

func (s *ModelStore) IsLocked(ctx context.Context, modelRef spi.ModelRef) (bool, error)

func (*ModelStore) Lock

func (s *ModelStore) Lock(ctx context.Context, modelRef spi.ModelRef) error

func (*ModelStore) Save

func (s *ModelStore) Save(ctx context.Context, desc *spi.ModelDescriptor) error

func (*ModelStore) SetChangeLevel

func (s *ModelStore) SetChangeLevel(ctx context.Context, modelRef spi.ModelRef, level spi.ChangeLevel) error

func (*ModelStore) Unlock

func (s *ModelStore) Unlock(ctx context.Context, modelRef spi.ModelRef) error

type Option added in v0.7.1

type Option func(*StoreFactory)

Option is a functional option for NewStoreFactory.

func WithApplyFunc added in v0.7.1

func WithApplyFunc(fn ApplyFunc) Option

WithApplyFunc installs the replay function used by ExtendSchema.

func WithClock added in v0.7.1

func WithClock(c Clock) Option

WithClock injects a custom Clock into the factory. Used by conformance tests to advance time deterministically.

type StateMachineAuditStore

type StateMachineAuditStore struct {
	// contains filtered or unexported fields
}

func (*StateMachineAuditStore) GetEvents

func (s *StateMachineAuditStore) GetEvents(ctx context.Context, entityID string) ([]spi.StateMachineEvent, error)

func (*StateMachineAuditStore) GetEventsByTransaction

func (s *StateMachineAuditStore) GetEventsByTransaction(ctx context.Context, entityID string, transactionID string) ([]spi.StateMachineEvent, error)

func (*StateMachineAuditStore) Record

func (s *StateMachineAuditStore) Record(ctx context.Context, entityID string, event spi.StateMachineEvent) error

type StoreFactory

type StoreFactory struct {
	// contains filtered or unexported fields
}

func NewStoreFactory

func NewStoreFactory(opts ...Option) *StoreFactory

func (*StoreFactory) AsyncSearchStore

func (f *StoreFactory) AsyncSearchStore(_ context.Context) (spi.AsyncSearchStore, error)

func (*StoreFactory) Close

func (f *StoreFactory) Close() error

func (*StoreFactory) EntityStore

func (f *StoreFactory) EntityStore(ctx context.Context) (spi.EntityStore, error)

func (*StoreFactory) GetTransactionManager

func (f *StoreFactory) GetTransactionManager() spi.TransactionManager

GetTransactionManager returns the registered TransactionManager, or nil.

func (*StoreFactory) KeyValueStore

func (f *StoreFactory) KeyValueStore(ctx context.Context) (spi.KeyValueStore, error)

func (*StoreFactory) MessageStore

func (f *StoreFactory) MessageStore(ctx context.Context) (spi.MessageStore, error)

func (*StoreFactory) ModelStore

func (f *StoreFactory) ModelStore(ctx context.Context) (spi.ModelStore, error)

func (*StoreFactory) NewTransactionManager

func (f *StoreFactory) NewTransactionManager(uuids spi.UUIDGenerator) *TransactionManager

NewTransactionManager creates and registers a TransactionManager on the StoreFactory.

func (*StoreFactory) SetApplyFunc added in v0.7.1

func (f *StoreFactory) SetApplyFunc(fn func(base []byte, delta spi.SchemaDelta) ([]byte, error))

SetApplyFunc installs the replay function used by ExtendSchema. May be called at most once — typically immediately after Plugin.NewFactory in app/app.go. Panics on double-call (programmer error).

The parameter is the unnamed function type (not memory.ApplyFunc) so that an interface type-assertion in app/app.go can satisfy the setter uniformly across plugins.

func (*StoreFactory) StateMachineAuditStore

func (f *StoreFactory) StateMachineAuditStore(ctx context.Context) (spi.StateMachineAuditStore, error)

func (*StoreFactory) TransactionManager

func (f *StoreFactory) TransactionManager(ctx context.Context) (spi.TransactionManager, error)

TransactionManager implements spi.StoreFactory. Returns the TM registered via NewTransactionManager. Errors if none is set.

func (*StoreFactory) WorkflowStore

func (f *StoreFactory) WorkflowStore(ctx context.Context) (spi.WorkflowStore, error)

type TestClock added in v0.7.1

type TestClock struct {
	// contains filtered or unexported fields
}

TestClock is a mutable clock for tests. Advance(d) moves it forward; Now returns the current virtual time. Safe for concurrent use.

func NewTestClock added in v0.7.1

func NewTestClock() *TestClock

NewTestClock returns a TestClock starting at the current wall-clock time.

func (*TestClock) Advance added in v0.7.1

func (c *TestClock) Advance(d time.Duration)

Advance moves the virtual clock forward by d. d must be > 0; d <= 0 panics. Matches the spitest.Harness.AdvanceClock contract.

func (*TestClock) Now added in v0.7.1

func (c *TestClock) Now() time.Time

Now returns the current virtual time.

type TransactionManager

type TransactionManager struct {
	// contains filtered or unexported fields
}

TransactionManager implements spi.TransactionManager using Snapshot Isolation (SSI). It lives in the memory package because it needs direct access to StoreFactory's entityData map and mu lock for the atomic commit flush.

func (*TransactionManager) Begin

Begin starts a new transaction. It resolves the tenant from the context, generates a unique transaction ID, captures a snapshot time, and returns a new context carrying the TransactionState.

func (*TransactionManager) Commit

func (m *TransactionManager) Commit(ctx context.Context, txID string) error

Commit validates the transaction against the committed log for SSI conflicts, flushes the write buffer and deletes to the entity store, and records the commit in the log.

func (*TransactionManager) CommittedLogLen

func (m *TransactionManager) CommittedLogLen() int

CommittedLogLen returns the current length of the committed log. Exported for testing only.

func (*TransactionManager) GetSubmitTime

func (m *TransactionManager) GetSubmitTime(_ context.Context, txID string) (time.Time, error)

GetSubmitTime returns the submit time of a committed transaction. Returns an error if the transaction is still active or not found.

func (*TransactionManager) Join

Join returns a context carrying the TransactionState for an existing active transaction. This allows multiple goroutines to participate in the same transaction. Callers must coordinate access to the transaction's Buffer, ReadSet, WriteSet, and Deletes maps.

Locking discipline (issue #199 audit): Rollback writes tx.RolledBack inside m.mu only; Commit and Rollback both write tx.Closed in their defer under tx.OpMu.Lock only. Reading those fields requires tx.OpMu.RLock to be synchronised against the Closed-write — m.mu alone is not sufficient because Commit's defer runs outside the m.mu region.

func (*TransactionManager) ReleaseSavepoint

func (m *TransactionManager) ReleaseSavepoint(ctx context.Context, txID string, savepointID string) error

ReleaseSavepoint releases a savepoint. The work done since the savepoint is already in the parent transaction's buffer, so this just removes the snapshot.

Locking discipline (issue #199): ReleaseSavepoint does not read or write tx.Buffer / tx.ReadSet / tx.WriteSet / tx.Deletes — it only mutates m.savepoints. Holds m.mu only; tx.OpMu is not required because there is no tx-state field to coordinate against Commit/Rollback.

Tenant isolation (issue #199 PR-A review I-1): rejects cross-tenant callers — m.savepoints is tenant-scoped state.

func (*TransactionManager) Rollback

func (m *TransactionManager) Rollback(ctx context.Context, txID string) error

Rollback discards an active transaction without committing any changes.

func (*TransactionManager) RollbackToSavepoint

func (m *TransactionManager) RollbackToSavepoint(ctx context.Context, txID string, savepointID string) error

RollbackToSavepoint restores the transaction's buffer maps from the snapshot captured when the savepoint was created, then removes the snapshot.

Locking discipline (issue #199): RollbackToSavepoint replaces tx.Buffer / tx.ReadSet / tx.WriteSet / tx.Deletes — exclusive against every other tx-path op. Holds tx.OpMu.Lock (write) for the duration of the field replacement. Lock interleaving with m.mu follows Commit's pattern.

Tenant isolation (issue #199 PR-A review I-1): rejects cross-tenant callers — RollbackToSavepoint is destructive on tx-state.

func (*TransactionManager) Savepoint

func (m *TransactionManager) Savepoint(ctx context.Context, txID string) (string, error)

Savepoint creates a named savepoint within the given transaction by deep-copying the transaction's buffer maps.

Locking discipline (issue #199): Savepoint reads tx.Buffer / tx.ReadSet / tx.WriteSet / tx.Deletes — the same fields Commit's flush phase iterates under tx.OpMu.Lock and that other tx-path ops (Save, Get, Delete, ...) mutate under tx.OpMu.RLock. Savepoint must therefore hold tx.OpMu.RLock across those reads. The lock interleaving with m.mu follows Commit's pattern: drop m.mu before taking tx.OpMu, re-take m.mu briefly for the m.savepoints update.

Tenant isolation (issue #199 PR-A review I-1): rejects callers whose UserContext tenant does not match the transaction's tenant, mirroring Commit/Rollback. Without this guard a caller authenticated as tenant A who learned a tenant B txID could record a snapshot against tenant B's tx-state.

type WorkflowStore

type WorkflowStore struct {
	// contains filtered or unexported fields
}

func (*WorkflowStore) Delete

func (s *WorkflowStore) Delete(ctx context.Context, modelRef spi.ModelRef) error

func (*WorkflowStore) Get

func (s *WorkflowStore) Get(ctx context.Context, modelRef spi.ModelRef) ([]spi.WorkflowDefinition, error)

func (*WorkflowStore) Save

func (s *WorkflowStore) Save(ctx context.Context, modelRef spi.ModelRef, workflows []spi.WorkflowDefinition) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL