spi

package module
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

README

cyoda-go-spi

Storage-plugin contract for cyoda-go.

This module defines the interfaces and value types that any storage backend must implement. Plugin authors depend only on this module.

Packages

  • spi — core interfaces, value types, sentinel errors, UUIDGenerator, ClusterBroadcaster, and Plugin registration machinery.
  • spi/predicate — search-predicate AST types and JSON parse/marshal.

Dependencies

Standard library only.

Versioning & Compatibility

License

Apache 2.0.

For Plugin Authors

Every spi.StoreFactory implementation should run the spitest conformance harness to verify it meets the SPI contract.

Wiring the Harness
package myplugin_test

import (
    "testing"
    "time"

    spitest "github.com/cyoda-platform/cyoda-go-spi/spitest"
    "github.com/your-org/myplugin"
)

func TestConformance(t *testing.T) {
    factory := myplugin.NewStoreFactory(/* ... */)
    spitest.StoreFactoryConformance(t, spitest.Harness{
        Factory:      factory,
        AdvanceClock: func(d time.Duration) { time.Sleep(d) },
    })
}
AdvanceClock Contract

The harness calls AdvanceClock(d time.Duration) between writes that need distinct timestamps. After AdvanceClock returns, every subsequent timestamp the plugin assigns must strictly dominate every timestamp assigned before the call. d > 0.

Plugins wire this to whatever clock mechanism they use:

  • In-memory / app-side clock: inject a TestClock with an Advance(d) method via a factory option; AdvanceClock calls that.
  • DB-side clock (e.g., PostgreSQL): use time.Sleep(d). The DB's monotonic wall clock satisfies the contract; ~1–5ms gaps are sufficient.
  • Logical clock (e.g., Cassandra HLC): advance the physical component via a test-only hook.
Harness.Now (optional)

If your plugin uses an injected TestClock, also set Harness.Now to the clock's Now() method so the harness's temporal assertions use the same clock as the plugin. Defaults to time.Now which matches wall-clock-based plugins (postgres).

Error-Assertion Contract

The harness uses errors.Is() against SPI sentinels (spi.ErrNotFound, spi.ErrConflict). Plugins MUST wrap backend-native errors at the SPI boundary:

// WRONG — harness will fail
return pgx.ErrNoRows

// RIGHT — harness passes
return fmt.Errorf("entity %q: %w", id, spi.ErrNotFound)
State Isolation

Every subtest runs under a fresh tenant. The harness never calls Reset, Truncate, or any teardown hook. A single factory handles all subtests across different tenants. Factory.Close() is called once when the suite finishes.

Cross-tenant leakage is caught by the explicit TenantIsolation/* subtests, not by infrastructure.

Known Limitations / Harness.Skip

Backends with structural incompatibilities can register documented skips via Harness.Skip. The key is the subtest path below the root test name (the part after the first /). Mistyped keys cause the suite to fail with an "unused skip key" error, preventing stale entries from silently accumulating.

spitest.StoreFactoryConformance(t, spitest.Harness{
    Factory:      factory,
    AdvanceClock: testClock.Advance,
    Skip: map[string]string{
        "Transaction/Join":                        "pending #42: Join does not share write-set",
        "AsyncSearch/SaveAndGetResults/Pagination": "pending #43: SaveResults not yet implemented",
    },
})

Documentation

Overview

Package spi defines the storage-plugin contract for cyoda-go.

Plugin Authoring — Minimal Example

A storage plugin is a Go module that implements the Plugin interface and registers itself at package init() time:

package myplugin

import (
	"context"
	spi "github.com/cyoda-platform/cyoda-go-spi"
)

func init() { spi.Register(&plugin{}) }

type plugin struct{}

func (p *plugin) Name() string { return "myplugin" }

func (p *plugin) NewFactory(
	ctx context.Context,
	getenv func(string) string,
	opts ...spi.FactoryOption,
) (spi.StoreFactory, error) {
	// Parse config via getenv (not os.Getenv — the core injects a
	// closure so tests can supply a fake environment).
	dsn := getenv("MYPLUGIN_DSN")

	// Resolve options (e.g., a ClusterBroadcaster for cluster-wide
	// notifications). Plugins that don't need any option can skip.
	cfg := spi.ApplyFactoryOptions(opts)
	_ = cfg.ClusterBroadcaster() // nil if unset

	// Connect, run migrations, return a ready factory. Use ctx for
	// all blocking setup work so unreachable infra fails fast.
	return newStoreFactory(ctx, dsn), nil
}

The binary is built with a blank import of the plugin:

import _ "example.com/myplugin"

which causes init() to run and Register to install the plugin in the process-global registry. The core resolves the active backend at startup by calling spi.GetPlugin(name).

The getenv injection

Plugins read their configuration through the injected getenv function, not os.Getenv directly. In production the core passes os.Getenv; in tests the core passes a closure over a map[string]string so that test fixtures can provide exactly the variables the test needs without leaking into the process environment.

Plugin-owned TransactionManager

Each plugin provides its own TransactionManager via StoreFactory's TransactionManager(ctx) method. The plugin's TM implementation is free to couple tightly to the plugin's stores (memory does) or be a lightweight lifecycle tracker (postgres does — the pgx.Tx is tracked in a plugin-internal registry and stores look it up by txID when called inside an active transaction). The pattern for bridging a logical txID to a physical transaction handle:

// Begin registers the handle in the plugin's internal registry.
func (tm *TM) Begin(ctx context.Context) (string, context.Context, error) {
	phys := openPhysical(ctx)
	txID := uuid.UUID(tm.uuids.NewTimeUUID()).String()
	tm.registry.Register(txID, phys)
	state := &spi.TransactionState{ID: txID}
	return txID, spi.WithTransaction(ctx, state), nil
}

// Stores resolve the handle from context.
func (f *StoreFactory) queryExecutor(ctx context.Context) Querier {
	if state := spi.GetTransaction(ctx); state != nil {
		if phys, ok := f.tm.Lookup(state.ID); ok {
			return phys
		}
	}
	return f.defaultExecutor  // e.g., a connection pool
}

Startable and Close — symmetry

Plugins with background goroutines implement the optional Startable interface. The core calls Start(ctx) immediately after NewFactory and before any store-facing call (including TransactionManager), so plugins whose TransactionManager depends on Start's side effects (e.g. cassandra's shard-rebalance wait) can rely on the ordering. Plugins must tear down those goroutines in StoreFactory.Close(): each goroutine observes either ctx.Done() or a shutdown channel closed by Close(); Close() waits (bounded) for them to exit with a sync.WaitGroup. Leaked goroutines compound under test-driven create/destroy cycles.

Dependencies

The spi package depends only on the Go standard library. Plugin authors depend only on this module; they do not depend on cyoda-go itself.

Predicate AST

Submodule spi/predicate holds the search-predicate AST and JSON parse/marshal helpers. Plugins that translate predicates to their own query dialect (SQL, CQL) import spi/predicate for the types. Plugins with no search semantics may ignore it.

Index

Constants

This section is empty.

Variables

View Source
var ErrConflict = errors.New("conflict: entity has been modified")

ErrConflict indicates the write conflicts with a concurrent modification.

View Source
var ErrEpochMismatch = errors.New("shard epoch mismatch")

ErrEpochMismatch indicates the caller's shard epoch is stale relative to the cluster view. Retry after refreshing.

View Source
var ErrNotFound = errors.New("not found")

ErrNotFound indicates the requested resource does not exist.

View Source
var ErrRetryExhausted = errors.New("retry budget exhausted")

ErrRetryExhausted indicates the plugin's retry budget for a transparently-retried operation was consumed without success. Returned by ExtendSchema when CYODA_SCHEMA_EXTEND_MAX_RETRIES attempts have completed without success AND the context was not cancelled. Callers may choose to retry at a higher level (with backoff) or surface the condition to the end user.

Distinct from ErrConflict: ErrConflict means a single attempt hit a conflict; ErrRetryExhausted means the plugin exhausted its configured retry budget.

Functions

func DefaultSaveAll

func DefaultSaveAll(store EntityStore, ctx context.Context, entities iter.Seq[*Entity]) ([]int64, error)

DefaultSaveAll is the sequential fallback for EntityStore.SaveAll. It calls store.Save for each entity in order and stops on the first error. Backends that don't need concurrent saves delegate their SaveAll to this.

func HasRole

func HasRole(roles []string, role string) bool

HasRole checks whether the target role is present in the roles slice.

func Register

func Register(p Plugin)

Register adds p to the plugin registry. Register panics if another plugin has already been registered under the same Name — a naming collision at init time is always a programmer error. Matches the database/sql.Register convention.

func RegisteredPlugins

func RegisteredPlugins() []string

RegisteredPlugins returns the names of all currently registered plugins, sorted by name for deterministic ordering.

func WithTransaction

func WithTransaction(ctx context.Context, tx *TransactionState) context.Context

WithTransaction returns a new context carrying the given transaction state.

func WithUserContext

func WithUserContext(ctx context.Context, uc *UserContext) context.Context

Types

type AsyncSearchStore

type AsyncSearchStore interface {
	CreateJob(ctx context.Context, job *SearchJob) error
	GetJob(ctx context.Context, jobID string) (*SearchJob, error)
	UpdateJobStatus(ctx context.Context, jobID string, status string, resultCount int, errMsg string, finishTime time.Time, calcTimeMs int64) error
	SaveResults(ctx context.Context, jobID string, entityIDs []string) error
	GetResultIDs(ctx context.Context, jobID string, offset, limit int) (entityIDs []string, total int, err error)
	DeleteJob(ctx context.Context, jobID string) error
	ReapExpired(ctx context.Context, ttl time.Duration) (int, error)
	// Cancel marks the job as CANCELLED. Idempotent: cancelling a job
	// already in a terminal state returns nil. Cancelling a non-existent
	// job returns ErrNotFound.
	Cancel(ctx context.Context, jobID string) error
}

AsyncSearchStore provides persistence for async search jobs and their results.

type ChangeLevel

type ChangeLevel string

ChangeLevel controls which structural changes are permitted during data ingestion.

const (
	ChangeLevelArrayLength   ChangeLevel = "ARRAY_LENGTH"
	ChangeLevelArrayElements ChangeLevel = "ARRAY_ELEMENTS"
	ChangeLevelType          ChangeLevel = "TYPE"
	ChangeLevelStructural    ChangeLevel = "STRUCTURAL"
)

func ValidateChangeLevel

func ValidateChangeLevel(s string) (ChangeLevel, error)

ValidateChangeLevel returns an error if the given string is not a known ChangeLevel.

type ClusterBroadcaster

type ClusterBroadcaster interface {
	Broadcast(topic string, payload []byte)
	Subscribe(topic string, handler func(payload []byte))
}

ClusterBroadcaster delivers opaque payloads to peer nodes on a named topic. Semantics are fire-and-forget, best-effort: no ordering, no anti-entropy, no persistence. Payloads that need ordering or delivery guarantees should use a backend-internal transport (e.g. a message broker) rather than this interface.

Broadcast is non-blocking; it enqueues the payload and returns. Subscribe registers a handler called for every message received on the topic. Handlers run on the broadcaster's goroutines; they must not block indefinitely.

Typical use: a plugin needs eventually-consistent cluster-wide notifications (cache invalidation, clock gossip, topology hints).

type ConfigVar

type ConfigVar struct {
	Name        string
	Description string
	Default     string
	Required    bool
}

ConfigVar documents a single environment variable a plugin reads.

type DescribablePlugin

type DescribablePlugin interface {
	Plugin
	ConfigVars() []ConfigVar
}

DescribablePlugin is an optional Plugin capability: it exposes the configuration variables the plugin reads, so --help can render them.

type Entity

type Entity struct {
	Meta EntityMeta
	Data []byte
}

type EntityMeta

type EntityMeta struct {
	ID                      string
	TenantID                TenantID
	ModelRef                ModelRef
	State                   string
	Version                 int64
	CreationDate            time.Time
	LastModifiedDate        time.Time
	TransactionID           string
	ChangeType              string // "CREATED", "UPDATED", "DELETED"
	ChangeUser              string // user ID who performed the change
	TransitionForLatestSave string
}

type EntityStore

type EntityStore interface {
	Save(ctx context.Context, entity *Entity) (int64, error)
	// CompareAndSave saves the entity only if the current latest transaction ID matches expectedTxID.
	// Returns ErrConflict if the transaction ID has changed.
	CompareAndSave(ctx context.Context, entity *Entity, expectedTxID string) (int64, error)
	// SaveAll saves multiple entities, returning versions in iteration order.
	// Backends may execute saves concurrently. On error, returns the first
	// error encountered; partially-saved entities within an uncommitted
	// transaction are invisible to readers.
	SaveAll(ctx context.Context, entities iter.Seq[*Entity]) ([]int64, error)
	Get(ctx context.Context, entityID string) (*Entity, error)
	GetAsAt(ctx context.Context, entityID string, asAt time.Time) (*Entity, error)
	GetAll(ctx context.Context, modelRef ModelRef) ([]*Entity, error)
	GetAllAsAt(ctx context.Context, modelRef ModelRef, asAt time.Time) ([]*Entity, error)
	Delete(ctx context.Context, entityID string) error
	DeleteAll(ctx context.Context, modelRef ModelRef) error
	Exists(ctx context.Context, entityID string) (bool, error)
	Count(ctx context.Context, modelRef ModelRef) (int64, error)
	// CountByState returns the count of non-deleted entities grouped by state
	// for the given model. If states is non-nil, only the listed states are
	// included in the result. If states is nil, all states are returned.
	// An empty (non-nil) states slice returns an empty map without querying
	// the storage layer.
	//
	// Unknown model: returns an empty map with no error, matching Count's
	// behavior (no model-registry check at this layer).
	//
	// Implementations MUST push the state filter down to the storage layer
	// when feasible. Callers may invoke this from inside a transaction; the
	// returned counts MUST reflect the transactional view (uncommitted writes
	// from the current tx are visible, writes from other in-flight txs are not),
	// matching the semantics of Count.
	CountByState(ctx context.Context, modelRef ModelRef, states []string) (map[string]int64, error)
	GetVersionHistory(ctx context.Context, entityID string) ([]EntityVersion, error)
}

type EntityVersion

type EntityVersion struct {
	Entity     *Entity
	ChangeType string
	User       string
	Timestamp  time.Time
	Version    int64
	Deleted    bool
}

EntityVersion represents a single version entry in an entity's change history.

type ExecutionResult

type ExecutionResult struct {
	State      string
	Success    bool
	StopReason string
	Error      error
}

ExecutionResult holds the outcome of a workflow engine execution.

type FactoryConfig

type FactoryConfig struct {
	// contains filtered or unexported fields
}

FactoryConfig is the read-only view plugins see after resolution.

func ApplyFactoryOptions

func ApplyFactoryOptions(opts []FactoryOption) FactoryConfig

ApplyFactoryOptions resolves the variadic options into a read-only FactoryConfig. Plugins call this inside NewFactory.

func (FactoryConfig) ClusterBroadcaster

func (c FactoryConfig) ClusterBroadcaster() ClusterBroadcaster

ClusterBroadcaster returns the cluster broadcaster supplied via WithClusterBroadcaster, or nil if none was supplied.

type FactoryOption

type FactoryOption func(*factoryConfig)

FactoryOption configures a storage factory during NewFactory. Plugins receive options via the variadic parameter and resolve them with ApplyFactoryOptions.

func WithClusterBroadcaster

func WithClusterBroadcaster(b ClusterBroadcaster) FactoryOption

WithClusterBroadcaster injects the cluster broadcaster for plugins that use ClusterBroadcaster for cluster-wide notifications.

type FieldSource added in v0.4.0

type FieldSource string

FieldSource indicates whether a filter path refers to entity data or metadata.

const (
	SourceData FieldSource = "data"
	SourceMeta FieldSource = "meta"
)

type Filter added in v0.4.0

type Filter struct {
	Op       FilterOp
	Path     string
	Source   FieldSource
	Value    any
	Values   []any
	Children []Filter
}

Filter is a generic predicate tree for search pushdown. Leaf nodes carry Op, Path, Source, and Value/Values. Branch nodes (FilterAnd, FilterOr) carry Children.

type FilterOp added in v0.4.0

type FilterOp string

FilterOp defines a filter operation for search predicate pushdown.

const (
	FilterAnd FilterOp = "and"
	FilterOr  FilterOp = "or"

	FilterEq  FilterOp = "eq"
	FilterNe  FilterOp = "ne"
	FilterGt  FilterOp = "gt"
	FilterLt  FilterOp = "lt"
	FilterGte FilterOp = "gte"
	FilterLte FilterOp = "lte"

	FilterContains   FilterOp = "contains"
	FilterStartsWith FilterOp = "starts_with"
	FilterEndsWith   FilterOp = "ends_with"
	FilterLike       FilterOp = "like"

	FilterIsNull  FilterOp = "is_null"
	FilterNotNull FilterOp = "not_null"

	FilterBetween      FilterOp = "between"
	FilterMatchesRegex FilterOp = "matches_regex"

	FilterIEq            FilterOp = "ieq"
	FilterINe            FilterOp = "ine"
	FilterIContains      FilterOp = "icontains"
	FilterINotContains   FilterOp = "inot_contains"
	FilterIStartsWith    FilterOp = "istarts_with"
	FilterINotStartsWith FilterOp = "inot_starts_with"
	FilterIEndsWith      FilterOp = "iends_with"
	FilterINotEndsWith   FilterOp = "inot_ends_with"
)

type KeyValueStore

type KeyValueStore interface {
	Put(ctx context.Context, namespace string, key string, value []byte) error
	Get(ctx context.Context, namespace string, key string) ([]byte, error)
	Delete(ctx context.Context, namespace string, key string) error
	List(ctx context.Context, namespace string) (map[string][]byte, error)
}

type MessageHeader

type MessageHeader struct {
	Subject         string
	ContentType     string
	ContentLength   int64
	ContentEncoding string
	MessageID       string // custom message ID from X-Message-ID header
	UserID          string
	Recipient       string
	ReplyTo         string
	CorrelationID   string
}

MessageHeader holds the fixed AMQP-aligned headers for an edge message.

type MessageMetaData

type MessageMetaData struct {
	Values        map[string]any
	IndexedValues map[string]any
}

MessageMetaData holds arbitrary key-value metadata for an edge message. Values preserve their original JSON types (string, number, bool, etc.).

type MessageStore

type MessageStore interface {
	Save(ctx context.Context, id string, header MessageHeader, metaData MessageMetaData, payload io.Reader) error
	Get(ctx context.Context, id string) (MessageHeader, MessageMetaData, io.ReadCloser, error)
	Delete(ctx context.Context, id string) error
	DeleteBatch(ctx context.Context, ids []string) error
}

type ModelDescriptor

type ModelDescriptor struct {
	Ref         ModelRef
	State       ModelState
	ChangeLevel ChangeLevel
	UpdateDate  time.Time
	Schema      []byte
}

ModelDescriptor holds the full metadata and schema for an entity model.

type ModelRef

type ModelRef struct {
	EntityName   string
	ModelVersion string
}

func (ModelRef) String

func (r ModelRef) String() string

type ModelState

type ModelState string

ModelState represents the lifecycle state of an entity model.

const (
	ModelLocked   ModelState = "LOCKED"
	ModelUnlocked ModelState = "UNLOCKED"
)

type ModelStore

type ModelStore interface {
	Save(ctx context.Context, desc *ModelDescriptor) error
	Get(ctx context.Context, modelRef ModelRef) (*ModelDescriptor, error)
	GetAll(ctx context.Context) ([]ModelRef, error)
	Delete(ctx context.Context, modelRef ModelRef) error
	Lock(ctx context.Context, modelRef ModelRef) error
	Unlock(ctx context.Context, modelRef ModelRef) error
	IsLocked(ctx context.Context, modelRef ModelRef) (bool, error)
	SetChangeLevel(ctx context.Context, modelRef ModelRef, level ChangeLevel) error
	// ExtendSchema appends a schema delta for the model at ref. The
	// delta is an opaque, plugin-agnostic blob that the plugin stores
	// verbatim in its extension log; folding the log into the current
	// schema is done on read via a plugin-injected ApplyFunc.
	//
	// Contract:
	//   - Success (nil return) means the extension is durably committed
	//     and visible to subsequent reads on this node.
	//   - A non-nil error means no persisted effect — no log entry,
	//     no savepoint, no partial state.
	//   - Plugins with a native conflict surface (sqlite SQLITE_BUSY,
	//     cassandra LWT applied:false) retry transparently up to a
	//     configurable budget. On exhaustion without ctx cancellation,
	//     return ErrRetryExhausted.
	//   - Context cancellation between retry attempts returns ctx.Err()
	//     (wrapped with attempt count), not ErrRetryExhausted. Mid-attempt
	//     cancellation follows backend-native behavior.
	//   - Plugins without a conflict surface (memory, postgres) commit
	//     immediately or fail with the backend's native error.
	//
	// Empty or nil deltas are a no-op and return nil.
	ExtendSchema(ctx context.Context, ref ModelRef, delta SchemaDelta) error
}

type OrderSpec added in v0.4.0

type OrderSpec struct {
	Path   string
	Source FieldSource
	Desc   bool
}

OrderSpec defines a single sort clause for search results. When OrderBy is empty, the default order is entity_id ascending.

type Plugin

type Plugin interface {
	Name() string
	NewFactory(ctx context.Context, getenv func(string) string, opts ...FactoryOption) (StoreFactory, error)
}

Plugin is the storage-backend contract. Implementations register themselves at init time by calling Register.

func GetPlugin

func GetPlugin(name string) (Plugin, bool)

GetPlugin returns the registered plugin with the given name.

type ProcessorConfig

type ProcessorConfig struct {
	AttachEntity         bool   `json:"attachEntity,omitempty"`
	CalculationNodesTags string `json:"calculationNodesTags,omitempty"`
	ResponseTimeoutMs    int64  `json:"responseTimeoutMs,omitempty"`
	RetryPolicy          string `json:"retryPolicy,omitempty"`
	Context              string `json:"context,omitempty"`
	// StartNewTxOnDispatch, when true and ExecutionMode is COMMIT_BEFORE_DISPATCH,
	// causes the cascade engine to open a fresh transaction before dispatching
	// the processor (so the processor may perform transactional work via that
	// tx's token). When false (default) the processor runs with no transaction
	// context and the connection is released entirely during dispatch.
	// Ignored for any other ExecutionMode.
	StartNewTxOnDispatch *bool `json:"startNewTxOnDispatch,omitempty"`
}

ProcessorConfig holds configuration for a processor.

type ProcessorDefinition

type ProcessorDefinition struct {
	Type          string          `json:"type"`
	Name          string          `json:"name"`
	ExecutionMode string          `json:"executionMode,omitempty"`
	Config        ProcessorConfig `json:"config,omitempty"`
}

ProcessorDefinition represents a processor attached to a transition.

type SchemaDelta added in v0.6.0

type SchemaDelta []byte

SchemaDelta is an opaque, plugin-agnostic serialization of an additive schema change. Bytes are produced by the consuming application's schema diff logic (e.g. cyoda-go's internal/domain/model/schema) and replayed by an injected apply function in the plugin. Plugins persist bytes verbatim; they MUST NOT interpret them.

type SearchJob

type SearchJob struct {
	ID          string
	TenantID    TenantID
	Status      string // RUNNING, SUCCESSFUL, FAILED, CANCELLED
	ModelRef    ModelRef
	Condition   json.RawMessage
	PointInTime time.Time
	SearchOpts  json.RawMessage
	ResultCount int
	Error       string
	CreateTime  time.Time
	FinishTime  *time.Time
	CalcTimeMs  int64
}

SearchJob represents the persistent state of an async search operation.

type SearchOptions added in v0.4.0

type SearchOptions struct {
	ModelName    string
	ModelVersion string
	PointInTime  *time.Time
	Limit        int
	Offset       int
	OrderBy      []OrderSpec
}

SearchOptions configures pagination, ordering, and scoping for a search.

type Searcher added in v0.4.0

type Searcher interface {
	Search(ctx context.Context, filter Filter, opts SearchOptions) ([]*Entity, error)
}

Searcher is an optional interface for storage plugins that support search predicate pushdown (e.g. SQL WHERE clauses). Plugins that implement Searcher get native query execution; those that don't fall back to in-memory filtering.

type SelfExecutingSearchStore

type SelfExecutingSearchStore interface {
	AsyncSearchStore
	SelfExecuting()
}

SelfExecutingSearchStore is implemented by AsyncSearchStore variants whose CreateJob method also kicks off per-shard execution and result persistence. The domain SearchService detects this via a type assertion after CreateJob and skips its own background-execution goroutine for these stores.

Memory and Postgres do NOT implement this — their CreateJob only persists the job row, and the SearchService spawns a background goroutine to perform the actual search. A backend with native distributed execution can opt in by implementing this interface; its CreateJob is expected to dispatch work and persist results itself.

type Startable

type Startable interface {
	Start(ctx context.Context) error
}

Startable is an optional StoreFactory capability: the core calls Start immediately after NewFactory and before any store-facing call (including TransactionManager). Plugins that need background goroutines — shard managers, consumer groups, rebalance waits, long-lived cluster connections — implement this. Start must complete (successfully) before the factory is expected to serve transactions; plugins whose TransactionManager depends on Start's side effects (rebalance-assigned shards, consumer group membership) can rely on this ordering.

Start is bounded by the caller's context (typically a startup timeout). Plugins must honor ctx.Done() for cancellation.

type StateDefinition

type StateDefinition struct {
	Transitions []TransitionDefinition `json:"transitions,omitempty"`
}

StateDefinition represents a state with its transitions.

type StateMachineAuditStore

type StateMachineAuditStore interface {
	Record(ctx context.Context, entityID string, event StateMachineEvent) error
	GetEvents(ctx context.Context, entityID string) ([]StateMachineEvent, error)
	GetEventsByTransaction(ctx context.Context, entityID string, transactionID string) ([]StateMachineEvent, error)
}

type StateMachineEvent

type StateMachineEvent struct {
	EventType     StateMachineEventType `json:"eventType"`
	EntityID      string                `json:"entityId"`
	TimeUUID      string                `json:"timeUuid"`
	State         string                `json:"state,omitempty"`
	TransactionID string                `json:"transactionId,omitempty"`
	Details       string                `json:"details"`
	Data          map[string]any        `json:"data,omitempty"`
	Timestamp     time.Time             `json:"timestamp"`
}

StateMachineEvent represents a single event in a state machine execution.

type StateMachineEventType

type StateMachineEventType string

StateMachineEventType represents the type of state machine event.

const (
	SMEventStarted                    StateMachineEventType = "STATE_MACHINE_START"
	SMEventFinished                   StateMachineEventType = "STATE_MACHINE_FINISH"
	SMEventCancelled                  StateMachineEventType = "CANCEL"
	SMEventForcedSuccess              StateMachineEventType = "FORCE_SUCCESS"
	SMEventWorkflowFound              StateMachineEventType = "WORKFLOW_FOUND"
	SMEventWorkflowNotFound           StateMachineEventType = "WORKFLOW_NOT_FOUND"
	SMEventWorkflowSkipped            StateMachineEventType = "WORKFLOW_SKIP"
	SMEventTransitionMade             StateMachineEventType = "TRANSITION_MAKE"
	SMEventTransitionNotFound         StateMachineEventType = "TRANSITION_NOT_FOUND"
	SMEventTransitionCriterionNoMatch StateMachineEventType = "TRANSITION_NOT_MATCH_CRITERION"
	SMEventProcessCriterionNoMatch    StateMachineEventType = "PROCESS_NOT_MATCH_CRITERION"
	SMEventProcessingPaused           StateMachineEventType = "PAUSE_FOR_PROCESSING"
	SMEventStateProcessResult         StateMachineEventType = "STATE_PROCESS_RESULT"
)

type StoreFactory

type StoreFactory interface {
	EntityStore(ctx context.Context) (EntityStore, error)
	ModelStore(ctx context.Context) (ModelStore, error)
	KeyValueStore(ctx context.Context) (KeyValueStore, error)
	MessageStore(ctx context.Context) (MessageStore, error)
	WorkflowStore(ctx context.Context) (WorkflowStore, error)
	StateMachineAuditStore(ctx context.Context) (StateMachineAuditStore, error)
	AsyncSearchStore(ctx context.Context) (AsyncSearchStore, error)
	TransactionManager(ctx context.Context) (TransactionManager, error)
	Close() error
}

type Tenant

type Tenant struct {
	ID   TenantID
	Name string
}

Tenant is a first-class domain entity representing a tenant.

type TenantID

type TenantID string

TenantID is a named type for tenant identifiers, preventing accidental use of bare strings.

const SystemTenantID TenantID = "SYSTEM"

SystemTenantID is the well-known tenant for system-level data.

type TransactionManager

type TransactionManager interface {
	// Begin starts a new transaction in the caller's tenant. Returns the
	// txID and a child context carrying the new TransactionState. After
	// Begin returns, the TransactionState's immutable fields (ID,
	// TenantID, SnapshotTime) are safe to read without locks.
	Begin(ctx context.Context) (txID string, txCtx context.Context, err error)

	// Commit closes the transaction and applies its buffered writes to
	// the underlying store. Commit acquires tx.OpMu.Lock for its
	// duration, so it waits for any in-flight tx-path operation on the
	// same tx (any SPI method invocation that holds OpMu.RLock) to drain
	// before mutating or closing tx state. Implementations must verify
	// that the caller's tenant matches tx.TenantID and reject
	// mismatched-tenant calls.
	Commit(ctx context.Context, txID string) error

	// Rollback closes the transaction and discards its buffered writes.
	// Acquires tx.OpMu.Lock; same tenant verification as Commit.
	Rollback(ctx context.Context, txID string) error

	// Join returns a context carrying the TransactionState for an existing
	// active transaction, allowing multiple goroutines to participate in
	// the same tx.
	//
	// Two distinct contracts apply to a joined tx:
	//
	//   - Plugin contract (enforced by [TransactionState.OpMu]): the
	//     plugin's tx-path SPI methods hold OpMu.RLock; the plugin's
	//     closure SPI methods (Commit, Rollback, RollbackToSavepoint)
	//     hold OpMu.Lock. So closure waits for any in-flight SPI-method
	//     invocation to return before mutating or closing tx state. This
	//     contract covers SPI-method invocations only — application code
	//     that mutates tx state directly (e.g. through a [GetTransaction]
	//     handle) is outside the OpMu protection.
	//
	//   - Application contract (NOT enforced by the plugin): the
	//     application must serialise its own concurrent in-flight ops on
	//     the same tx. OpMu.RLock allows multiple readers concurrently;
	//     two RLock-holding ops (e.g. two Save calls from different
	//     goroutines) will trigger Go's "concurrent map writes" runtime
	//     fatal because both write to tx.Buffer / tx.WriteSet / tx.Deletes
	//     without mutual exclusion. RLock does not protect map writes from
	//     each other regardless of key overlap. The plugin does not detect
	//     or recover from this contract violation.
	//
	// Implementations must verify that the caller's tenant matches
	// tx.TenantID and reject mismatched-tenant joins. Implementations must
	// read tx.RolledBack and tx.Closed under tx.OpMu.RLock (not under the
	// manager mutex) — Commit's deferred Closed-write runs outside the
	// manager-mutex region.
	Join(ctx context.Context, txID string) (txCtx context.Context, err error)

	GetSubmitTime(ctx context.Context, txID string) (time.Time, error)

	// Savepoint creates a named savepoint within the given transaction by
	// snapshotting tx.Buffer / tx.ReadSet / tx.WriteSet / tx.Deletes.
	//
	// Locking discipline: read-only on tx state. Implementations must
	// acquire tx.OpMu.RLock for the snapshot read so the operation is
	// serialised against Commit/Rollback (which take tx.OpMu.Lock)
	// without blocking other in-flight readers.
	//
	// Tenant isolation: implementations must reject calls whose
	// UserContext tenant does not match tx.TenantID.
	Savepoint(ctx context.Context, txID string) (savepointID string, err error)

	// RollbackToSavepoint rolls back all work done since the savepoint was
	// created by replacing tx.Buffer / tx.ReadSet / tx.WriteSet /
	// tx.Deletes with the snapshot taken at Savepoint time.
	//
	// Locking discipline: write on tx state — exclusive against every
	// other tx-path op. Implementations must acquire tx.OpMu.Lock (write
	// lock, not RLock) for the duration of the field replacement.
	//
	// Tenant isolation: implementations must reject mismatched-tenant
	// callers — RollbackToSavepoint is destructive on tx-state.
	RollbackToSavepoint(ctx context.Context, txID string, savepointID string) error

	// ReleaseSavepoint releases a savepoint, merging its work into the
	// parent transaction. The work done since the savepoint already lives
	// in tx.Buffer / tx.ReadSet / tx.WriteSet / tx.Deletes — Release only
	// removes the snapshot record from manager-side state.
	//
	// Locking discipline: does not touch any field of TransactionState
	// (only manager-side savepoint records). Implementations need only
	// the manager mutex; tx.OpMu is not required.
	//
	// Tenant isolation: implementations must reject mismatched-tenant
	// callers — manager-side savepoint state is tenant-scoped.
	ReleaseSavepoint(ctx context.Context, txID string, savepointID string) error
}

TransactionManager is the plugin-side surface for the snapshot-isolation transaction model. See TransactionState for the full concurrency contract that implementations must honour.

type TransactionState

type TransactionState struct {
	ID           string
	TenantID     TenantID
	SnapshotTime time.Time
	ReadSet      map[string]bool    // entity IDs read; access under OpMu (see godoc)
	WriteSet     map[string]bool    // entity IDs written; access under OpMu
	Buffer       map[string]*Entity // staged writes; access under OpMu
	Deletes      map[string]bool    // staged deletes; access under OpMu
	RolledBack   bool               // closure flag; written under OpMu.Lock, read under OpMu.RLock
	OpMu         sync.RWMutex       // see TransactionState godoc above for full contract
	Closed       bool               // closure flag; written under OpMu.Lock, read under OpMu.RLock
}

TransactionState holds the state of an active SSI transaction. All processor execution is sequential (no goroutines) — see docs/superpowers/specs/2026-04-01-workflow-processor-execution-design.md. SAVEPOINTs snapshot/restore these maps for ASYNC_NEW_TX rollback isolation.

Concurrency contract

Plugin implementations of TransactionManager must coordinate concurrent access to TransactionState's mutable fields using OpMu. Two distinct concerns:

  1. Cross-class serialisation — plugin's responsibility, enforced via OpMu. In-flight tx-path operations (Save, Get, Delete, Savepoint, etc., regardless of which plugin type — TransactionManager, EntityStore, or any other surface — defines them) hold OpMu.RLock; closure operations (Commit, Rollback, RollbackToSavepoint) hold OpMu.Lock. This guarantees Commit/Rollback wait for any in-flight SPI-method invocation on the same tx to drain before mutating or closing tx state. Every plugin method that reads or writes ReadSet, WriteSet, Buffer, Deletes, RolledBack, or Closed must acquire OpMu in the appropriate posture.

  2. Within-class serialisation — application's responsibility, NOT enforced by OpMu. OpMu.RLock allows multiple readers concurrently; it does not mutually exclude RLock-holders from each other. If the application fires two RLock-holding ops on the same tx concurrently (e.g. two `Save` calls from different goroutines), the underlying Go map writes to tx.Buffer / tx.WriteSet / tx.Deletes will trigger the runtime's "concurrent map writes" fatal — RLock does not protect map writes from each other regardless of key overlap. The application must serialise its own ops on a given tx; the plugin does not detect or recover from this contract violation.

Lock posture per field

  • ReadSet, WriteSet, Buffer, Deletes: read or written under OpMu.RLock by in-flight ops; iterated or replaced under OpMu.Lock by Commit / Rollback / RollbackToSavepoint.
  • Closed: written under OpMu.Lock by Commit/Rollback in their defer (so all return paths are covered); read under OpMu.RLock by every in-flight op so the op fails fast on a closed tx.
  • RolledBack: written under OpMu.Lock by Rollback eagerly inside the OpMu region (not in defer); read under OpMu.RLock by every in-flight op.
  • ID, TenantID, SnapshotTime: immutable after [TransactionManager.Begin] returns; safe to read without locks.

Lock order

Plugin implementations acquire locks in this overall order to avoid deadlock:

tx.OpMu  →  factory's per-store mutex  →  manager's per-tx-table mutex

The manager's per-tx-table mutex is also acquired BEFORE tx.OpMu for the brief active-tx-table lookup at the top of every method, then released BEFORE tx.OpMu is taken. So in practice the manager mutex appears at two distinct points in the timeline:

  1. Brief lookup of the tx pointer in the manager's active-tx table. Released immediately. Never held across slow operations.
  2. Optional re-acquisition INSIDE the OpMu region for committedLog / savepoint-table maintenance, while still holding OpMu.

Holding the manager mutex across the tx.OpMu acquisition is a deadlock-bug — Commit holds tx.OpMu while waiting on the manager mutex for log maintenance, so any path that holds the manager mutex while waiting on tx.OpMu inverts the order.

Required reading for plugin authors

New methods that touch *TransactionState — on TransactionManager, EntityStore, or any other plugin surface — must declare their OpMu posture in a code comment ("Locking discipline: ..."). See `.claude/rules/tx-state-locking.md` in the cyoda-go-spi repository for the review checklist enforced at code review.

func GetTransaction

func GetTransaction(ctx context.Context) *TransactionState

GetTransaction returns the transaction state from the context, or nil if none.

type TransitionDefinition

type TransitionDefinition struct {
	Name       string                `json:"name"`
	Next       string                `json:"next"`
	Manual     bool                  `json:"manual"`
	Disabled   bool                  `json:"disabled,omitempty"`
	Criterion  json.RawMessage       `json:"criterion,omitempty"`
	Processors []ProcessorDefinition `json:"processors,omitempty"`
}

TransitionDefinition represents a single transition from a state.

type UUIDGenerator

type UUIDGenerator interface {
	NewTimeUUID() [16]byte
}

UUIDGenerator produces identifiers for stored records.

The return type is [16]byte so this package remains stdlib-only. Callers that want the github.com/google/uuid type perform a zero-cost type conversion: uuid.UUID(gen.NewTimeUUID()).

Implementations should produce monotonic, time-ordered IDs (v1 UUIDs or equivalent) so that sorted IDs correspond to insertion order.

type UserContext

type UserContext struct {
	UserID   string
	UserName string
	Tenant   Tenant
	Roles    []string
}

UserContext carries the authenticated user's identity through the request lifecycle.

func GetUserContext

func GetUserContext(ctx context.Context) *UserContext

func MustGetUserContext

func MustGetUserContext(ctx context.Context) *UserContext

type WorkflowDefinition

type WorkflowDefinition struct {
	Version      string                     `json:"version"`
	Name         string                     `json:"name"`
	Description  string                     `json:"desc,omitempty"`
	InitialState string                     `json:"initialState"`
	Active       bool                       `json:"active"`
	Criterion    json.RawMessage            `json:"criterion,omitempty"`
	States       map[string]StateDefinition `json:"states"`
}

WorkflowDefinition represents a complete workflow configuration.

type WorkflowStore

type WorkflowStore interface {
	Save(ctx context.Context, modelRef ModelRef, workflows []WorkflowDefinition) error
	Get(ctx context.Context, modelRef ModelRef) ([]WorkflowDefinition, error)
	Delete(ctx context.Context, modelRef ModelRef) error
}

Directories

Path Synopsis
Package spitest provides a conformance test harness for spi.StoreFactory implementations.
Package spitest provides a conformance test harness for spi.StoreFactory implementations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL