Documentation
¶
Overview ¶
Package spi defines the storage-plugin contract for cyoda-go.
Plugin Authoring — Minimal Example ¶
A storage plugin is a Go module that implements the Plugin interface and registers itself at package init() time:
package myplugin
import (
"context"
spi "github.com/cyoda-platform/cyoda-go-spi"
)
func init() { spi.Register(&plugin{}) }
type plugin struct{}
func (p *plugin) Name() string { return "myplugin" }
func (p *plugin) NewFactory(
ctx context.Context,
getenv func(string) string,
opts ...spi.FactoryOption,
) (spi.StoreFactory, error) {
// Parse config via getenv (not os.Getenv — the core injects a
// closure so tests can supply a fake environment).
dsn := getenv("MYPLUGIN_DSN")
// Resolve options (e.g., a ClusterBroadcaster for cluster-wide
// notifications). Plugins that don't need any option can skip.
cfg := spi.ApplyFactoryOptions(opts)
_ = cfg.ClusterBroadcaster() // nil if unset
// Connect, run migrations, return a ready factory. Use ctx for
// all blocking setup work so unreachable infra fails fast.
return newStoreFactory(ctx, dsn), nil
}
The binary is built with a blank import of the plugin:
import _ "example.com/myplugin"
which causes init() to run and Register to install the plugin in the process-global registry. The core resolves the active backend at startup by calling spi.GetPlugin(name).
The getenv injection ¶
Plugins read their configuration through the injected getenv function, not os.Getenv directly. In production the core passes os.Getenv; in tests the core passes a closure over a map[string]string so that test fixtures can provide exactly the variables the test needs without leaking into the process environment.
Plugin-owned TransactionManager ¶
Each plugin provides its own TransactionManager via StoreFactory's TransactionManager(ctx) method. The plugin's TM implementation is free to couple tightly to the plugin's stores (memory does) or be a lightweight lifecycle tracker (postgres does — the pgx.Tx is tracked in a plugin-internal registry and stores look it up by txID when called inside an active transaction). The pattern for bridging a logical txID to a physical transaction handle:
// Begin registers the handle in the plugin's internal registry.
func (tm *TM) Begin(ctx context.Context) (string, context.Context, error) {
phys := openPhysical(ctx)
txID := uuid.UUID(tm.uuids.NewTimeUUID()).String()
tm.registry.Register(txID, phys)
state := &spi.TransactionState{ID: txID}
return txID, spi.WithTransaction(ctx, state), nil
}
// Stores resolve the handle from context.
func (f *StoreFactory) queryExecutor(ctx context.Context) Querier {
if state := spi.GetTransaction(ctx); state != nil {
if phys, ok := f.tm.Lookup(state.ID); ok {
return phys
}
}
return f.defaultExecutor // e.g., a connection pool
}
Startable and Close — symmetry ¶
Plugins with background goroutines implement the optional Startable interface. The core calls Start(ctx) immediately after NewFactory and before any store-facing call (including TransactionManager), so plugins whose TransactionManager depends on Start's side effects (e.g. cassandra's shard-rebalance wait) can rely on the ordering. Plugins must tear down those goroutines in StoreFactory.Close(): each goroutine observes either ctx.Done() or a shutdown channel closed by Close(); Close() waits (bounded) for them to exit with a sync.WaitGroup. Leaked goroutines compound under test-driven create/destroy cycles.
Dependencies ¶
The spi package depends only on the Go standard library. Plugin authors depend only on this module; they do not depend on cyoda-go itself.
Predicate AST ¶
Submodule spi/predicate holds the search-predicate AST and JSON parse/marshal helpers. Plugins that translate predicates to their own query dialect (SQL, CQL) import spi/predicate for the types. Plugins with no search semantics may ignore it.
Index ¶
- Variables
- func DefaultSaveAll(store EntityStore, ctx context.Context, entities iter.Seq[*Entity]) ([]int64, error)
- func HasRole(roles []string, role string) bool
- func Register(p Plugin)
- func RegisteredPlugins() []string
- func WithTransaction(ctx context.Context, tx *TransactionState) context.Context
- func WithUserContext(ctx context.Context, uc *UserContext) context.Context
- type AsyncSearchStore
- type ChangeLevel
- type ClusterBroadcaster
- type ConfigVar
- type DescribablePlugin
- type Entity
- type EntityMeta
- type EntityStore
- type EntityVersion
- type ExecutionResult
- type FactoryConfig
- type FactoryOption
- type FieldSource
- type Filter
- type FilterOp
- type KeyValueStore
- type MessageHeader
- type MessageMetaData
- type MessageStore
- type ModelDescriptor
- type ModelRef
- type ModelState
- type ModelStore
- type OrderSpec
- type Plugin
- type ProcessorConfig
- type ProcessorDefinition
- type SchemaDelta
- type SearchJob
- type SearchOptions
- type Searcher
- type SelfExecutingSearchStore
- type Startable
- type StateDefinition
- type StateMachineAuditStore
- type StateMachineEvent
- type StateMachineEventType
- type StoreFactory
- type Tenant
- type TenantID
- type TransactionManager
- type TransactionState
- type TransitionDefinition
- type UUIDGenerator
- type UserContext
- type WorkflowDefinition
- type WorkflowStore
Constants ¶
This section is empty.
Variables ¶
var ErrConflict = errors.New("conflict: entity has been modified")
ErrConflict indicates the write conflicts with a concurrent modification.
var ErrEpochMismatch = errors.New("shard epoch mismatch")
ErrEpochMismatch indicates the caller's shard epoch is stale relative to the cluster view. Retry after refreshing.
var ErrNotFound = errors.New("not found")
ErrNotFound indicates the requested resource does not exist.
var ErrRetryExhausted = errors.New("retry budget exhausted")
ErrRetryExhausted indicates the plugin's retry budget for a transparently-retried operation was consumed without success. Returned by ExtendSchema when CYODA_SCHEMA_EXTEND_MAX_RETRIES attempts have completed without success AND the context was not cancelled. Callers may choose to retry at a higher level (with backoff) or surface the condition to the end user.
Distinct from ErrConflict: ErrConflict means a single attempt hit a conflict; ErrRetryExhausted means the plugin exhausted its configured retry budget.
Functions ¶
func DefaultSaveAll ¶
func DefaultSaveAll(store EntityStore, ctx context.Context, entities iter.Seq[*Entity]) ([]int64, error)
DefaultSaveAll is the sequential fallback for EntityStore.SaveAll. It calls store.Save for each entity in order and stops on the first error. Backends that don't need concurrent saves delegate their SaveAll to this.
func Register ¶
func Register(p Plugin)
Register adds p to the plugin registry. Register panics if another plugin has already been registered under the same Name — a naming collision at init time is always a programmer error. Matches the database/sql.Register convention.
func RegisteredPlugins ¶
func RegisteredPlugins() []string
RegisteredPlugins returns the names of all currently registered plugins, sorted by name for deterministic ordering.
func WithTransaction ¶
func WithTransaction(ctx context.Context, tx *TransactionState) context.Context
WithTransaction returns a new context carrying the given transaction state.
func WithUserContext ¶
func WithUserContext(ctx context.Context, uc *UserContext) context.Context
Types ¶
type AsyncSearchStore ¶
type AsyncSearchStore interface {
CreateJob(ctx context.Context, job *SearchJob) error
GetJob(ctx context.Context, jobID string) (*SearchJob, error)
UpdateJobStatus(ctx context.Context, jobID string, status string, resultCount int, errMsg string, finishTime time.Time, calcTimeMs int64) error
SaveResults(ctx context.Context, jobID string, entityIDs []string) error
GetResultIDs(ctx context.Context, jobID string, offset, limit int) (entityIDs []string, total int, err error)
DeleteJob(ctx context.Context, jobID string) error
ReapExpired(ctx context.Context, ttl time.Duration) (int, error)
// Cancel marks the job as CANCELLED. Idempotent: cancelling a job
// already in a terminal state returns nil. Cancelling a non-existent
// job returns ErrNotFound.
Cancel(ctx context.Context, jobID string) error
}
AsyncSearchStore provides persistence for async search jobs and their results.
type ChangeLevel ¶
type ChangeLevel string
ChangeLevel controls which structural changes are permitted during data ingestion.
const ( ChangeLevelArrayLength ChangeLevel = "ARRAY_LENGTH" ChangeLevelArrayElements ChangeLevel = "ARRAY_ELEMENTS" ChangeLevelType ChangeLevel = "TYPE" ChangeLevelStructural ChangeLevel = "STRUCTURAL" )
func ValidateChangeLevel ¶
func ValidateChangeLevel(s string) (ChangeLevel, error)
ValidateChangeLevel returns an error if the given string is not a known ChangeLevel.
type ClusterBroadcaster ¶
type ClusterBroadcaster interface {
Broadcast(topic string, payload []byte)
Subscribe(topic string, handler func(payload []byte))
}
ClusterBroadcaster delivers opaque payloads to peer nodes on a named topic. Semantics are fire-and-forget, best-effort: no ordering, no anti-entropy, no persistence. Payloads that need ordering or delivery guarantees should use a backend-internal transport (e.g. a message broker) rather than this interface.
Broadcast is non-blocking; it enqueues the payload and returns. Subscribe registers a handler called for every message received on the topic. Handlers run on the broadcaster's goroutines; they must not block indefinitely.
Typical use: a plugin needs eventually-consistent cluster-wide notifications (cache invalidation, clock gossip, topology hints).
type DescribablePlugin ¶
DescribablePlugin is an optional Plugin capability: it exposes the configuration variables the plugin reads, so --help can render them.
type Entity ¶
type Entity struct {
Meta EntityMeta
Data []byte
}
type EntityMeta ¶
type EntityMeta struct {
ID string
TenantID TenantID
ModelRef ModelRef
State string
Version int64
CreationDate time.Time
LastModifiedDate time.Time
TransactionID string
ChangeType string // "CREATED", "UPDATED", "DELETED"
ChangeUser string // user ID who performed the change
TransitionForLatestSave string
}
type EntityStore ¶
type EntityStore interface {
Save(ctx context.Context, entity *Entity) (int64, error)
// CompareAndSave saves the entity only if the current latest transaction ID matches expectedTxID.
// Returns ErrConflict if the transaction ID has changed.
CompareAndSave(ctx context.Context, entity *Entity, expectedTxID string) (int64, error)
// SaveAll saves multiple entities, returning versions in iteration order.
// Backends may execute saves concurrently. On error, returns the first
// error encountered; partially-saved entities within an uncommitted
// transaction are invisible to readers.
SaveAll(ctx context.Context, entities iter.Seq[*Entity]) ([]int64, error)
Get(ctx context.Context, entityID string) (*Entity, error)
GetAsAt(ctx context.Context, entityID string, asAt time.Time) (*Entity, error)
GetAll(ctx context.Context, modelRef ModelRef) ([]*Entity, error)
GetAllAsAt(ctx context.Context, modelRef ModelRef, asAt time.Time) ([]*Entity, error)
Delete(ctx context.Context, entityID string) error
DeleteAll(ctx context.Context, modelRef ModelRef) error
Exists(ctx context.Context, entityID string) (bool, error)
Count(ctx context.Context, modelRef ModelRef) (int64, error)
// CountByState returns the count of non-deleted entities grouped by state
// for the given model. If states is non-nil, only the listed states are
// included in the result. If states is nil, all states are returned.
// An empty (non-nil) states slice returns an empty map without querying
// the storage layer.
//
// Unknown model: returns an empty map with no error, matching Count's
// behavior (no model-registry check at this layer).
//
// Implementations MUST push the state filter down to the storage layer
// when feasible. Callers may invoke this from inside a transaction; the
// returned counts MUST reflect the transactional view (uncommitted writes
// from the current tx are visible, writes from other in-flight txs are not),
// matching the semantics of Count.
CountByState(ctx context.Context, modelRef ModelRef, states []string) (map[string]int64, error)
GetVersionHistory(ctx context.Context, entityID string) ([]EntityVersion, error)
}
type EntityVersion ¶
type EntityVersion struct {
Entity *Entity
ChangeType string
User string
Timestamp time.Time
Version int64
Deleted bool
}
EntityVersion represents a single version entry in an entity's change history.
type ExecutionResult ¶
ExecutionResult holds the outcome of a workflow engine execution.
type FactoryConfig ¶
type FactoryConfig struct {
// contains filtered or unexported fields
}
FactoryConfig is the read-only view plugins see after resolution.
func ApplyFactoryOptions ¶
func ApplyFactoryOptions(opts []FactoryOption) FactoryConfig
ApplyFactoryOptions resolves the variadic options into a read-only FactoryConfig. Plugins call this inside NewFactory.
func (FactoryConfig) ClusterBroadcaster ¶
func (c FactoryConfig) ClusterBroadcaster() ClusterBroadcaster
ClusterBroadcaster returns the cluster broadcaster supplied via WithClusterBroadcaster, or nil if none was supplied.
type FactoryOption ¶
type FactoryOption func(*factoryConfig)
FactoryOption configures a storage factory during NewFactory. Plugins receive options via the variadic parameter and resolve them with ApplyFactoryOptions.
func WithClusterBroadcaster ¶
func WithClusterBroadcaster(b ClusterBroadcaster) FactoryOption
WithClusterBroadcaster injects the cluster broadcaster for plugins that use ClusterBroadcaster for cluster-wide notifications.
type FieldSource ¶ added in v0.4.0
type FieldSource string
FieldSource indicates whether a filter path refers to entity data or metadata.
const ( SourceData FieldSource = "data" SourceMeta FieldSource = "meta" )
type Filter ¶ added in v0.4.0
type Filter struct {
Op FilterOp
Path string
Source FieldSource
Value any
Values []any
Children []Filter
}
Filter is a generic predicate tree for search pushdown. Leaf nodes carry Op, Path, Source, and Value/Values. Branch nodes (FilterAnd, FilterOr) carry Children.
type FilterOp ¶ added in v0.4.0
type FilterOp string
FilterOp defines a filter operation for search predicate pushdown.
const ( FilterAnd FilterOp = "and" FilterOr FilterOp = "or" FilterEq FilterOp = "eq" FilterNe FilterOp = "ne" FilterGt FilterOp = "gt" FilterLt FilterOp = "lt" FilterGte FilterOp = "gte" FilterLte FilterOp = "lte" FilterContains FilterOp = "contains" FilterStartsWith FilterOp = "starts_with" FilterEndsWith FilterOp = "ends_with" FilterLike FilterOp = "like" FilterIsNull FilterOp = "is_null" FilterNotNull FilterOp = "not_null" FilterBetween FilterOp = "between" FilterMatchesRegex FilterOp = "matches_regex" FilterIEq FilterOp = "ieq" FilterINe FilterOp = "ine" FilterIContains FilterOp = "icontains" FilterINotContains FilterOp = "inot_contains" FilterIStartsWith FilterOp = "istarts_with" FilterINotStartsWith FilterOp = "inot_starts_with" FilterIEndsWith FilterOp = "iends_with" FilterINotEndsWith FilterOp = "inot_ends_with" )
type KeyValueStore ¶
type KeyValueStore interface {
Put(ctx context.Context, namespace string, key string, value []byte) error
Get(ctx context.Context, namespace string, key string) ([]byte, error)
Delete(ctx context.Context, namespace string, key string) error
List(ctx context.Context, namespace string) (map[string][]byte, error)
}
type MessageHeader ¶
type MessageHeader struct {
Subject string
ContentType string
ContentLength int64
ContentEncoding string
MessageID string // custom message ID from X-Message-ID header
UserID string
Recipient string
ReplyTo string
CorrelationID string
}
MessageHeader holds the fixed AMQP-aligned headers for an edge message.
type MessageMetaData ¶
MessageMetaData holds arbitrary key-value metadata for an edge message. Values preserve their original JSON types (string, number, bool, etc.).
type MessageStore ¶
type MessageStore interface {
Save(ctx context.Context, id string, header MessageHeader, metaData MessageMetaData, payload io.Reader) error
Get(ctx context.Context, id string) (MessageHeader, MessageMetaData, io.ReadCloser, error)
Delete(ctx context.Context, id string) error
DeleteBatch(ctx context.Context, ids []string) error
}
type ModelDescriptor ¶
type ModelDescriptor struct {
Ref ModelRef
State ModelState
ChangeLevel ChangeLevel
UpdateDate time.Time
Schema []byte
}
ModelDescriptor holds the full metadata and schema for an entity model.
type ModelState ¶
type ModelState string
ModelState represents the lifecycle state of an entity model.
const ( ModelLocked ModelState = "LOCKED" ModelUnlocked ModelState = "UNLOCKED" )
type ModelStore ¶
type ModelStore interface {
Save(ctx context.Context, desc *ModelDescriptor) error
Get(ctx context.Context, modelRef ModelRef) (*ModelDescriptor, error)
GetAll(ctx context.Context) ([]ModelRef, error)
Delete(ctx context.Context, modelRef ModelRef) error
Lock(ctx context.Context, modelRef ModelRef) error
Unlock(ctx context.Context, modelRef ModelRef) error
IsLocked(ctx context.Context, modelRef ModelRef) (bool, error)
SetChangeLevel(ctx context.Context, modelRef ModelRef, level ChangeLevel) error
// ExtendSchema appends a schema delta for the model at ref. The
// delta is an opaque, plugin-agnostic blob that the plugin stores
// verbatim in its extension log; folding the log into the current
// schema is done on read via a plugin-injected ApplyFunc.
//
// Contract:
// - Success (nil return) means the extension is durably committed
// and visible to subsequent reads on this node.
// - A non-nil error means no persisted effect — no log entry,
// no savepoint, no partial state.
// - Plugins with a native conflict surface (sqlite SQLITE_BUSY,
// cassandra LWT applied:false) retry transparently up to a
// configurable budget. On exhaustion without ctx cancellation,
// return ErrRetryExhausted.
// - Context cancellation between retry attempts returns ctx.Err()
// (wrapped with attempt count), not ErrRetryExhausted. Mid-attempt
// cancellation follows backend-native behavior.
// - Plugins without a conflict surface (memory, postgres) commit
// immediately or fail with the backend's native error.
//
// Empty or nil deltas are a no-op and return nil.
ExtendSchema(ctx context.Context, ref ModelRef, delta SchemaDelta) error
}
type OrderSpec ¶ added in v0.4.0
type OrderSpec struct {
Path string
Source FieldSource
Desc bool
}
OrderSpec defines a single sort clause for search results. When OrderBy is empty, the default order is entity_id ascending.
type Plugin ¶
type Plugin interface {
Name() string
NewFactory(ctx context.Context, getenv func(string) string, opts ...FactoryOption) (StoreFactory, error)
}
Plugin is the storage-backend contract. Implementations register themselves at init time by calling Register.
type ProcessorConfig ¶
type ProcessorConfig struct {
AttachEntity bool `json:"attachEntity,omitempty"`
CalculationNodesTags string `json:"calculationNodesTags,omitempty"`
ResponseTimeoutMs int64 `json:"responseTimeoutMs,omitempty"`
RetryPolicy string `json:"retryPolicy,omitempty"`
Context string `json:"context,omitempty"`
// StartNewTxOnDispatch, when true and ExecutionMode is COMMIT_BEFORE_DISPATCH,
// causes the cascade engine to open a fresh transaction before dispatching
// the processor (so the processor may perform transactional work via that
// tx's token). When false (default) the processor runs with no transaction
// context and the connection is released entirely during dispatch.
// Ignored for any other ExecutionMode.
StartNewTxOnDispatch *bool `json:"startNewTxOnDispatch,omitempty"`
}
ProcessorConfig holds configuration for a processor.
type ProcessorDefinition ¶
type ProcessorDefinition struct {
Type string `json:"type"`
Name string `json:"name"`
ExecutionMode string `json:"executionMode,omitempty"`
Config ProcessorConfig `json:"config,omitempty"`
}
ProcessorDefinition represents a processor attached to a transition.
type SchemaDelta ¶ added in v0.6.0
type SchemaDelta []byte
SchemaDelta is an opaque, plugin-agnostic serialization of an additive schema change. Bytes are produced by the consuming application's schema diff logic (e.g. cyoda-go's internal/domain/model/schema) and replayed by an injected apply function in the plugin. Plugins persist bytes verbatim; they MUST NOT interpret them.
type SearchJob ¶
type SearchJob struct {
ID string
TenantID TenantID
Status string // RUNNING, SUCCESSFUL, FAILED, CANCELLED
ModelRef ModelRef
Condition json.RawMessage
PointInTime time.Time
SearchOpts json.RawMessage
ResultCount int
Error string
CreateTime time.Time
FinishTime *time.Time
CalcTimeMs int64
}
SearchJob represents the persistent state of an async search operation.
type SearchOptions ¶ added in v0.4.0
type SearchOptions struct {
ModelName string
ModelVersion string
PointInTime *time.Time
Limit int
Offset int
OrderBy []OrderSpec
}
SearchOptions configures pagination, ordering, and scoping for a search.
type Searcher ¶ added in v0.4.0
type Searcher interface {
Search(ctx context.Context, filter Filter, opts SearchOptions) ([]*Entity, error)
}
Searcher is an optional interface for storage plugins that support search predicate pushdown (e.g. SQL WHERE clauses). Plugins that implement Searcher get native query execution; those that don't fall back to in-memory filtering.
type SelfExecutingSearchStore ¶
type SelfExecutingSearchStore interface {
AsyncSearchStore
SelfExecuting()
}
SelfExecutingSearchStore is implemented by AsyncSearchStore variants whose CreateJob method also kicks off per-shard execution and result persistence. The domain SearchService detects this via a type assertion after CreateJob and skips its own background-execution goroutine for these stores.
Memory and Postgres do NOT implement this — their CreateJob only persists the job row, and the SearchService spawns a background goroutine to perform the actual search. A backend with native distributed execution can opt in by implementing this interface; its CreateJob is expected to dispatch work and persist results itself.
type Startable ¶
Startable is an optional StoreFactory capability: the core calls Start immediately after NewFactory and before any store-facing call (including TransactionManager). Plugins that need background goroutines — shard managers, consumer groups, rebalance waits, long-lived cluster connections — implement this. Start must complete (successfully) before the factory is expected to serve transactions; plugins whose TransactionManager depends on Start's side effects (rebalance-assigned shards, consumer group membership) can rely on this ordering.
Start is bounded by the caller's context (typically a startup timeout). Plugins must honor ctx.Done() for cancellation.
type StateDefinition ¶
type StateDefinition struct {
Transitions []TransitionDefinition `json:"transitions,omitempty"`
}
StateDefinition represents a state with its transitions.
type StateMachineAuditStore ¶
type StateMachineAuditStore interface {
Record(ctx context.Context, entityID string, event StateMachineEvent) error
GetEvents(ctx context.Context, entityID string) ([]StateMachineEvent, error)
GetEventsByTransaction(ctx context.Context, entityID string, transactionID string) ([]StateMachineEvent, error)
}
type StateMachineEvent ¶
type StateMachineEvent struct {
EventType StateMachineEventType `json:"eventType"`
EntityID string `json:"entityId"`
TimeUUID string `json:"timeUuid"`
State string `json:"state,omitempty"`
TransactionID string `json:"transactionId,omitempty"`
Details string `json:"details"`
Data map[string]any `json:"data,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
StateMachineEvent represents a single event in a state machine execution.
type StateMachineEventType ¶
type StateMachineEventType string
StateMachineEventType represents the type of state machine event.
const ( SMEventStarted StateMachineEventType = "STATE_MACHINE_START" SMEventFinished StateMachineEventType = "STATE_MACHINE_FINISH" SMEventCancelled StateMachineEventType = "CANCEL" SMEventForcedSuccess StateMachineEventType = "FORCE_SUCCESS" SMEventWorkflowFound StateMachineEventType = "WORKFLOW_FOUND" SMEventWorkflowNotFound StateMachineEventType = "WORKFLOW_NOT_FOUND" SMEventWorkflowSkipped StateMachineEventType = "WORKFLOW_SKIP" SMEventTransitionMade StateMachineEventType = "TRANSITION_MAKE" SMEventTransitionNotFound StateMachineEventType = "TRANSITION_NOT_FOUND" SMEventTransitionCriterionNoMatch StateMachineEventType = "TRANSITION_NOT_MATCH_CRITERION" SMEventProcessCriterionNoMatch StateMachineEventType = "PROCESS_NOT_MATCH_CRITERION" SMEventProcessingPaused StateMachineEventType = "PAUSE_FOR_PROCESSING" SMEventStateProcessResult StateMachineEventType = "STATE_PROCESS_RESULT" )
type StoreFactory ¶
type StoreFactory interface {
EntityStore(ctx context.Context) (EntityStore, error)
ModelStore(ctx context.Context) (ModelStore, error)
KeyValueStore(ctx context.Context) (KeyValueStore, error)
MessageStore(ctx context.Context) (MessageStore, error)
WorkflowStore(ctx context.Context) (WorkflowStore, error)
StateMachineAuditStore(ctx context.Context) (StateMachineAuditStore, error)
AsyncSearchStore(ctx context.Context) (AsyncSearchStore, error)
TransactionManager(ctx context.Context) (TransactionManager, error)
Close() error
}
type TenantID ¶
type TenantID string
TenantID is a named type for tenant identifiers, preventing accidental use of bare strings.
const SystemTenantID TenantID = "SYSTEM"
SystemTenantID is the well-known tenant for system-level data.
type TransactionManager ¶
type TransactionManager interface {
// Begin starts a new transaction in the caller's tenant. Returns the
// txID and a child context carrying the new TransactionState. After
// Begin returns, the TransactionState's immutable fields (ID,
// TenantID, SnapshotTime) are safe to read without locks.
Begin(ctx context.Context) (txID string, txCtx context.Context, err error)
// Commit closes the transaction and applies its buffered writes to
// the underlying store. Commit acquires tx.OpMu.Lock for its
// duration, so it waits for any in-flight tx-path operation on the
// same tx (any SPI method invocation that holds OpMu.RLock) to drain
// before mutating or closing tx state. Implementations must verify
// that the caller's tenant matches tx.TenantID and reject
// mismatched-tenant calls.
Commit(ctx context.Context, txID string) error
// Rollback closes the transaction and discards its buffered writes.
// Acquires tx.OpMu.Lock; same tenant verification as Commit.
Rollback(ctx context.Context, txID string) error
// Join returns a context carrying the TransactionState for an existing
// active transaction, allowing multiple goroutines to participate in
// the same tx.
//
// Two distinct contracts apply to a joined tx:
//
// - Plugin contract (enforced by [TransactionState.OpMu]): the
// plugin's tx-path SPI methods hold OpMu.RLock; the plugin's
// closure SPI methods (Commit, Rollback, RollbackToSavepoint)
// hold OpMu.Lock. So closure waits for any in-flight SPI-method
// invocation to return before mutating or closing tx state. This
// contract covers SPI-method invocations only — application code
// that mutates tx state directly (e.g. through a [GetTransaction]
// handle) is outside the OpMu protection.
//
// - Application contract (NOT enforced by the plugin): the
// application must serialise its own concurrent in-flight ops on
// the same tx. OpMu.RLock allows multiple readers concurrently;
// two RLock-holding ops (e.g. two Save calls from different
// goroutines) will trigger Go's "concurrent map writes" runtime
// fatal because both write to tx.Buffer / tx.WriteSet / tx.Deletes
// without mutual exclusion. RLock does not protect map writes from
// each other regardless of key overlap. The plugin does not detect
// or recover from this contract violation.
//
// Implementations must verify that the caller's tenant matches
// tx.TenantID and reject mismatched-tenant joins. Implementations must
// read tx.RolledBack and tx.Closed under tx.OpMu.RLock (not under the
// manager mutex) — Commit's deferred Closed-write runs outside the
// manager-mutex region.
Join(ctx context.Context, txID string) (txCtx context.Context, err error)
GetSubmitTime(ctx context.Context, txID string) (time.Time, error)
// Savepoint creates a named savepoint within the given transaction by
// snapshotting tx.Buffer / tx.ReadSet / tx.WriteSet / tx.Deletes.
//
// Locking discipline: read-only on tx state. Implementations must
// acquire tx.OpMu.RLock for the snapshot read so the operation is
// serialised against Commit/Rollback (which take tx.OpMu.Lock)
// without blocking other in-flight readers.
//
// Tenant isolation: implementations must reject calls whose
// UserContext tenant does not match tx.TenantID.
Savepoint(ctx context.Context, txID string) (savepointID string, err error)
// RollbackToSavepoint rolls back all work done since the savepoint was
// created by replacing tx.Buffer / tx.ReadSet / tx.WriteSet /
// tx.Deletes with the snapshot taken at Savepoint time.
//
// Locking discipline: write on tx state — exclusive against every
// other tx-path op. Implementations must acquire tx.OpMu.Lock (write
// lock, not RLock) for the duration of the field replacement.
//
// Tenant isolation: implementations must reject mismatched-tenant
// callers — RollbackToSavepoint is destructive on tx-state.
RollbackToSavepoint(ctx context.Context, txID string, savepointID string) error
// ReleaseSavepoint releases a savepoint, merging its work into the
// parent transaction. The work done since the savepoint already lives
// in tx.Buffer / tx.ReadSet / tx.WriteSet / tx.Deletes — Release only
// removes the snapshot record from manager-side state.
//
// Locking discipline: does not touch any field of TransactionState
// (only manager-side savepoint records). Implementations need only
// the manager mutex; tx.OpMu is not required.
//
// Tenant isolation: implementations must reject mismatched-tenant
// callers — manager-side savepoint state is tenant-scoped.
ReleaseSavepoint(ctx context.Context, txID string, savepointID string) error
}
TransactionManager is the plugin-side surface for the snapshot-isolation transaction model. See TransactionState for the full concurrency contract that implementations must honour.
type TransactionState ¶
type TransactionState struct {
ID string
TenantID TenantID
SnapshotTime time.Time
ReadSet map[string]bool // entity IDs read; access under OpMu (see godoc)
WriteSet map[string]bool // entity IDs written; access under OpMu
Buffer map[string]*Entity // staged writes; access under OpMu
Deletes map[string]bool // staged deletes; access under OpMu
RolledBack bool // closure flag; written under OpMu.Lock, read under OpMu.RLock
OpMu sync.RWMutex // see TransactionState godoc above for full contract
Closed bool // closure flag; written under OpMu.Lock, read under OpMu.RLock
}
TransactionState holds the state of an active SSI transaction. All processor execution is sequential (no goroutines) — see docs/superpowers/specs/2026-04-01-workflow-processor-execution-design.md. SAVEPOINTs snapshot/restore these maps for ASYNC_NEW_TX rollback isolation.
Concurrency contract ¶
Plugin implementations of TransactionManager must coordinate concurrent access to TransactionState's mutable fields using OpMu. Two distinct concerns:
Cross-class serialisation — plugin's responsibility, enforced via OpMu. In-flight tx-path operations (Save, Get, Delete, Savepoint, etc., regardless of which plugin type — TransactionManager, EntityStore, or any other surface — defines them) hold OpMu.RLock; closure operations (Commit, Rollback, RollbackToSavepoint) hold OpMu.Lock. This guarantees Commit/Rollback wait for any in-flight SPI-method invocation on the same tx to drain before mutating or closing tx state. Every plugin method that reads or writes ReadSet, WriteSet, Buffer, Deletes, RolledBack, or Closed must acquire OpMu in the appropriate posture.
Within-class serialisation — application's responsibility, NOT enforced by OpMu. OpMu.RLock allows multiple readers concurrently; it does not mutually exclude RLock-holders from each other. If the application fires two RLock-holding ops on the same tx concurrently (e.g. two `Save` calls from different goroutines), the underlying Go map writes to tx.Buffer / tx.WriteSet / tx.Deletes will trigger the runtime's "concurrent map writes" fatal — RLock does not protect map writes from each other regardless of key overlap. The application must serialise its own ops on a given tx; the plugin does not detect or recover from this contract violation.
Lock posture per field ¶
- ReadSet, WriteSet, Buffer, Deletes: read or written under OpMu.RLock by in-flight ops; iterated or replaced under OpMu.Lock by Commit / Rollback / RollbackToSavepoint.
- Closed: written under OpMu.Lock by Commit/Rollback in their defer (so all return paths are covered); read under OpMu.RLock by every in-flight op so the op fails fast on a closed tx.
- RolledBack: written under OpMu.Lock by Rollback eagerly inside the OpMu region (not in defer); read under OpMu.RLock by every in-flight op.
- ID, TenantID, SnapshotTime: immutable after [TransactionManager.Begin] returns; safe to read without locks.
Lock order ¶
Plugin implementations acquire locks in this overall order to avoid deadlock:
tx.OpMu → factory's per-store mutex → manager's per-tx-table mutex
The manager's per-tx-table mutex is also acquired BEFORE tx.OpMu for the brief active-tx-table lookup at the top of every method, then released BEFORE tx.OpMu is taken. So in practice the manager mutex appears at two distinct points in the timeline:
- Brief lookup of the tx pointer in the manager's active-tx table. Released immediately. Never held across slow operations.
- Optional re-acquisition INSIDE the OpMu region for committedLog / savepoint-table maintenance, while still holding OpMu.
Holding the manager mutex across the tx.OpMu acquisition is a deadlock-bug — Commit holds tx.OpMu while waiting on the manager mutex for log maintenance, so any path that holds the manager mutex while waiting on tx.OpMu inverts the order.
Required reading for plugin authors ¶
New methods that touch *TransactionState — on TransactionManager, EntityStore, or any other plugin surface — must declare their OpMu posture in a code comment ("Locking discipline: ..."). See `.claude/rules/tx-state-locking.md` in the cyoda-go-spi repository for the review checklist enforced at code review.
func GetTransaction ¶
func GetTransaction(ctx context.Context) *TransactionState
GetTransaction returns the transaction state from the context, or nil if none.
type TransitionDefinition ¶
type TransitionDefinition struct {
Name string `json:"name"`
Next string `json:"next"`
Manual bool `json:"manual"`
Disabled bool `json:"disabled,omitempty"`
Criterion json.RawMessage `json:"criterion,omitempty"`
Processors []ProcessorDefinition `json:"processors,omitempty"`
}
TransitionDefinition represents a single transition from a state.
type UUIDGenerator ¶
type UUIDGenerator interface {
NewTimeUUID() [16]byte
}
UUIDGenerator produces identifiers for stored records.
The return type is [16]byte so this package remains stdlib-only. Callers that want the github.com/google/uuid type perform a zero-cost type conversion: uuid.UUID(gen.NewTimeUUID()).
Implementations should produce monotonic, time-ordered IDs (v1 UUIDs or equivalent) so that sorted IDs correspond to insertion order.
type UserContext ¶
UserContext carries the authenticated user's identity through the request lifecycle.
func GetUserContext ¶
func GetUserContext(ctx context.Context) *UserContext
func MustGetUserContext ¶
func MustGetUserContext(ctx context.Context) *UserContext
type WorkflowDefinition ¶
type WorkflowDefinition struct {
Version string `json:"version"`
Name string `json:"name"`
Description string `json:"desc,omitempty"`
InitialState string `json:"initialState"`
Active bool `json:"active"`
Criterion json.RawMessage `json:"criterion,omitempty"`
States map[string]StateDefinition `json:"states"`
}
WorkflowDefinition represents a complete workflow configuration.