Documentation
¶
Overview ¶
Package continuation provides durable, immutable, event-sourced storage for suspended opstack runs ("continuations").
The model (see internal docs/todo-continuation-storage.md):
- Documents are immutable — never rewritten.
- Completion writes are append-only or write-once at known keys.
- Run/stage state is DERIVED by reading which docs exist.
- The only coordination primitive is atomic create-if-absent.
The object store is the source of truth; there is no SQLite for run state. A DB, if ever added, is an optional performance index behind this same interface.
Index ¶
- Constants
- Variables
- func HashToken(token string) string
- func MintToken() (token, hash string, err error)
- func NewOpContinuationID() (string, error)
- func ParseResumeTraceRID(rid string) (runID string, ok bool)
- func Register(name string, c Constructor)
- func ResumeTraceRID(runID, stage string) string
- type Constructor
- type DeferredOpCreated
- type DeferredOpSpec
- type Meta
- type OpContinuationLookup
- type OpManifestEntry
- type OpRecordSpec
- type OpTerminal
- type PendingJoin
- type Ref
- type ResumeRef
- type RunCreated
- type Runs
- func (r *Runs) AppendEvent(ctx context.Context, runID, kind string, fields map[string]any) error
- func (r *Runs) ClaimResume(ctx context.Context, runID, stage string) (won bool, err error)
- func (r *Runs) CreateDeferredOp(ctx context.Context, runID string, sp DeferredOpSpec) error
- func (r *Runs) CreateOpRecords(ctx context.Context, runID, stage string, specs []OpRecordSpec) error
- func (r *Runs) CreateRun(ctx context.Context, ...) (runID, rcid string, err error)
- func (r *Runs) CurrentStage(ctx context.Context, runID string) (StageSuspended, bool, error)
- func (r *Runs) FailRun(ctx context.Context, runID, reason string) error
- func (r *Runs) FailStage(ctx context.Context, runID, stage, reason string) error
- func (r *Runs) Get(ctx context.Context, key string) ([]byte, error)
- func (r *Runs) IsJoined(ctx context.Context, runID, opc string) (bool, error)
- func (r *Runs) ListPendingJoins(ctx context.Context, runID string) ([]PendingJoin, error)
- func (r *Runs) ListRunIDs(ctx context.Context) ([]string, error)
- func (r *Runs) MarkJoined(ctx context.Context, runID, opc string) (won bool, err error)
- func (r *Runs) PurgeRun(ctx context.Context, runID string) error
- func (r *Runs) ReadDeferredSuspendedAt(ctx context.Context, runID, opc string) (stage string, exists bool, err error)
- func (r *Runs) ReadDeferredTerminal(ctx context.Context, runID, opc string) (OpTerminal, error)
- func (r *Runs) ReadOpTerminal(ctx context.Context, runID, stage string, ordinal int, op string) (OpTerminal, error)
- func (r *Runs) ReadOpstackSnapshot(ctx context.Context, runID string) ([]byte, error)
- func (r *Runs) ReadResult(ctx context.Context, runID string) ([]byte, bool, error)
- func (r *Runs) ReadResumeClaim(ctx context.Context, runID, stage string) (claimedAt time.Time, exists bool, err error)
- func (r *Runs) ReadRunCreated(ctx context.Context, runID string) (RunCreated, error)
- func (r *Runs) ReadStageSuspended(ctx context.Context, runID, stage string) (StageSuspended, error)
- func (r *Runs) ReadTraceLinks(ctx context.Context, runID string) (TraceLinks, error)
- func (r *Runs) RecordAccepted(ctx context.Context, runID, stage string, ordinal int, op, workerJobID string) error
- func (r *Runs) RecordDeferredTerminal(ctx context.Context, runID, opc, status string, payload []byte) (recorded bool, err error)
- func (r *Runs) RecordTerminal(ctx context.Context, runID, stage string, ordinal int, op, status string, ...) (recorded bool, err error)
- func (r *Runs) ResolveOpContinuation(ctx context.Context, opc string) (OpContinuationLookup, error)
- func (r *Runs) ResolveRequestContinuation(ctx context.Context, rid string) (string, error)
- func (r *Runs) ResolveRunContinuation(ctx context.Context, rcid string) (string, error)
- func (r *Runs) RunState(ctx context.Context, runID string) (string, error)
- func (r *Runs) SetDeferredSuspendedAt(ctx context.Context, runID, opc, stage string) error
- func (r *Runs) StageState(ctx context.Context, runID, stage string, manifest []OpManifestEntry) (string, error)
- func (r *Runs) SuspendStage(ctx context.Context, runID, stage, scopeEnvelope, stackVersionID string, ...) error
- func (r *Runs) WriteOpstackSnapshot(ctx context.Context, runID string, snapshot []byte) error
- func (r *Runs) WriteResult(ctx context.Context, runID string, finalEnvelope []byte) error
- type StageSuspended
- type Store
- type StoreConfig
- type TraceLinks
Constants ¶
const ( StateWaiting = "waiting" StateResumable = "resumable" StateCompleted = "completed" StateFailed = "failed" )
State values.
Variables ¶
var ( // ErrExists is returned by Create when the key already exists. This is // the load-bearing signal for idempotency: a duplicate write that hits // ErrExists is a harmless no-op, and a single successful Create out of // a race is the sole "winner" (resume-claim, op-terminal). ErrExists = errors.New("continuation: key already exists") // ErrNotFound is returned by Get when the key does not exist. ErrNotFound = errors.New("continuation: key not found") )
Functions ¶
func MintToken ¶
MintToken returns a single-use bearer (256-bit) and its sha256 hex. The token is handed to the worker out of band of the body; only the hash is stored. Machine-to-machine, so high-entropy random (not the human-transcribable word secret) is the right primitive.
func NewOpContinuationID ¶
NewOpContinuationID mints a worker-facing per-async-op handle.
func ParseResumeTraceRID ¶
ParseResumeTraceRID extracts runID from a ResumeTraceRID. ok=false for any rid not of that shape (e.g. a normal request rid).
func Register ¶
func Register(name string, c Constructor)
Register adds a backend constructor. Called from a backend package's init(); the chassis activates a backend with a blank import.
func ResumeTraceRID ¶
ResumeTraceRID is the trace RID for resuming `stage` of `runID`. It embeds the stage so a multi-stage run produces one distinct, linkable trace per resumed stage (not a single colliding `resume-<runID>`). runID is "run_"+base58 (no '-'), so the first '-' after it unambiguously separates runID from the sanitized stage — see ParseResumeTraceRID. The stage is sanitized to [A-Za-z0-9_-] so the whole RID stays within the admin trace-id allowlist (validRID) and is a safe trace dir name.
Types ¶
type Constructor ¶
type Constructor func(StoreConfig) (Store, error)
Constructor builds a Store from resolved config.
type DeferredOpCreated ¶
type DeferredOpCreated struct {
OpContinuationID string `json:"op_continuation_id"`
Op string `json:"op"`
Ordinal int `json:"ordinal"`
JoinAtScope int `json:"join_at_scope"`
Dest string `json:"dest,omitempty"`
DispatchStage string `json:"dispatch_stage"`
InputKey string `json:"input_key"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
DeferredOpCreated is the immutable record written when a deferred op is dispatched (no suspend). It captures everything the join needs later.
type DeferredOpSpec ¶
type DeferredOpSpec struct {
OpContinuationID string
Op string
Ordinal int
JoinAtScope int
Dest string
DispatchStage string
DispatchStack string
TokenHash string
Input []byte
ExpiresAt time.Time
}
DeferredOpSpec is the input to CreateDeferredOp.
type Meta ¶
type Meta struct {
ContentType string `json:"content_type,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
}
Meta is optional object metadata.
type OpContinuationLookup ¶
type OpContinuationLookup struct {
RunID string `json:"run_id"`
Stage string `json:"stage"`
Op string `json:"op"`
Ordinal int `json:"ordinal"`
TokenHash string `json:"token_hash"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
// Deferred marks an op-continuation whose terminal is filed under an
// opc-keyed location (deferred join), not a fixed stage. The callback
// handler routes a Deferred lookup to RecordDeferredTerminal and never
// uses Stage (which is empty for deferred ops — the join scope is
// resolved dynamically at run time). See chassis/continuation/deferred.go.
Deferred bool `json:"deferred,omitempty"`
}
type OpManifestEntry ¶
type OpManifestEntry struct {
Ordinal int `json:"ordinal"`
Op string `json:"op"`
Async bool `json:"async"`
// OpContinuationID, when set, marks a manifest entry as a deferred join:
// its terminal lives under the opc-keyed deferred location rather than
// this stage's (ordinal, op) key. Resume reads it via ReadDeferredTerminal.
// Empty for ordinary same-scope ops (the existing path).
OpContinuationID string `json:"op_continuation_id,omitempty"`
}
type OpRecordSpec ¶
type OpRecordSpec struct {
Ordinal int
Op string
Async bool
OpContinuationID string
TokenHash string
Input []byte
ExpiresAt time.Time
}
OpRecordSpec is one op's durable record, written before any dispatch.
type OpTerminal ¶
type PendingJoin ¶
type PendingJoin struct {
OpContinuationID string `json:"op_continuation_id"`
Op string `json:"op"`
Ordinal int `json:"ordinal"`
JoinAtScope int `json:"join_at_scope"`
Dest string `json:"dest,omitempty"`
DispatchStack string `json:"dispatch_stack,omitempty"`
}
PendingJoin is the in-flight-obligation record: one per outstanding deferred op on a run. Listed by the join check (and the sweeper) to find joins that must resolve at a scope >= JoinAtScope. Carries only what the join needs; the full record is DeferredOpCreated.
DispatchStack is the stack the op was dispatched in. JoinAtScope is interpreted purely in that stack's scope numbering (scope numbers are not comparable across stacks), so the floor check only fires when the run is still in DispatchStack; a cross-stack transition force-resolves instead (internal docs/todo-deferred-join.md, single-stack constraint).
type Ref ¶
type Ref struct {
Store string `json:"store"` // backend name: "file" (s3 reserved)
Key string `json:"key"` // logical key, "/"-separated
Size int64 `json:"size,omitempty"` // bytes written
}
Ref is a portable reference to a stored object.
type RunCreated ¶
type RunCreated struct {
RunID string `json:"run_id"`
RunContinuationID string `json:"run_continuation_id"`
TenantID string `json:"tenant_id,omitempty"`
Stack string `json:"stack"`
StackVersionID string `json:"stack_version_id,omitempty"`
FirstStage string `json:"first_stage"`
// OriginRID is the trace rid of the request that suspended into this
// run — the back-pointer that lets admin-ui link the resume trace to
// the originating request (and vice-versa).
OriginRID string `json:"origin_rid,omitempty"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
}
type Runs ¶
type Runs struct {
// contains filtered or unexported fields
}
Runs is the domain wrapper over a Store. Every method is expressed in terms of immutable docs + create-if-absent. There is no update path: run/stage status is DERIVED from which docs exist (see StageState / RunState).
func (*Runs) AppendEvent ¶
AppendEvent writes a distinct append-only audit object. Key carries a timestamp + random suffix so concurrent events never collide.
func (*Runs) ClaimResume ¶
ClaimResume create-if-absents the SINGLE deterministic resume-claim key for (run, stage). Exactly one caller wins (won=true); losers won=false.
func (*Runs) CreateDeferredOp ¶
CreateDeferredOp writes, create-if-absent: the input blob, the deferred-created doc, the op-continuation lookup (marked Deferred so the callback records here), and the pending-join record. Idempotent — a re-entered dispatch of the same opc is a no-op.
func (*Runs) CreateOpRecords ¶
func (r *Runs) CreateOpRecords(ctx context.Context, runID, stage string, specs []OpRecordSpec) error
CreateOpRecords writes, for every op: input blob + op-created doc; and for each async op the op-continuation lookup. All create-if-absent and idempotent (ErrExists swallowed) so a re-entered suspend is safe.
func (*Runs) CreateRun ¶
func (r *Runs) CreateRun(ctx context.Context, tenantID, stack, stackVersionID, firstStage, originRID string, expires time.Time) (runID, rcid string, err error)
CreateRun mints internal + client identities and writes the immutable run-created doc plus the run-continuation lookup. Fresh ids ⇒ no collision; ErrExists would be a genuine fault, so it is not swallowed.
func (*Runs) CurrentStage ¶
CurrentStage returns the latest suspended stage (max SuspendedAt) and its loaded doc. Used by the client GET to derive run state without any stored status.
func (*Runs) FailStage ¶
FailStage records a stage that cannot complete/advance (failed sibling op, or stack-version mismatch — no specific op failed). Not an op-terminal.
func (*Runs) ListPendingJoins ¶
ListPendingJoins returns every outstanding deferred-join record on a run. Used by the per-boundary join check and the sweeper. Order is not guaranteed; callers sort by Ordinal where determinism matters.
func (*Runs) ListRunIDs ¶
ListRunIDs enumerates every run by walking the runs/ prefix and taking the distinct runID segment. Object-store walk, no index — the dumb chassis keeps run truth in the store, not a DB table.
func (*Runs) MarkJoined ¶
MarkJoined records, create-if-absent, that a deferred op's result has been merged into the run. Returns won=true for the single writer that created the marker. The pending-join doc lingers (immutability), so the per-boundary join check calls this to merge a given opc EXACTLY ONCE: a second boundary (or a resume racing the in-request merge) sees won=false and skips the re-merge.
func (*Runs) PurgeRun ¶
PurgeRun deletes every doc of a dead (terminal + past-retention) run. Lookup docs go FIRST so a concurrent resolve can never land on a half-deleted run; then the run dir. Delete is idempotent, so a partial previous purge is safely completed on the next pass. This is the only place continuation docs are deleted — it destroys a finished run, it never mutates a live one.
func (*Runs) ReadDeferredSuspendedAt ¶
func (r *Runs) ReadDeferredSuspendedAt(ctx context.Context, runID, opc string) (stage string, exists bool, err error)
ReadDeferredSuspendedAt returns the join stage the run suspended at for this op, or exists=false if the run hasn't suspended on it (still in-request — the in-request join will pick up the terminal itself).
func (*Runs) ReadDeferredTerminal ¶
ReadDeferredTerminal loads a deferred op's terminal doc. ErrNotFound (passed through) means the op hasn't completed yet — the join checker reads this to decide merge-vs-suspend.
func (*Runs) ReadOpTerminal ¶
func (r *Runs) ReadOpTerminal(ctx context.Context, runID, stage string, ordinal int, op string) (OpTerminal, error)
ReadOpTerminal loads an op's terminal doc (status + blob keys).
func (*Runs) ReadOpstackSnapshot ¶
ReadOpstackSnapshot returns the run's frozen opstack snapshot. ErrNotFound (passed through) means no snapshot — the caller falls back to the live opstack (back-compat for pre-snapshot runs / unversioned _sys).
func (*Runs) ReadResult ¶
ReadResult returns the final envelope and true if the run completed.
func (*Runs) ReadResumeClaim ¶
func (r *Runs) ReadResumeClaim(ctx context.Context, runID, stage string) (claimedAt time.Time, exists bool, err error)
ReadResumeClaim reports when a stage's resume was claimed. exists=false (nil error) when no resumer has claimed it yet. A claim that has sat far longer than any legitimate resume means the resumer crashed mid-resume.
func (*Runs) ReadRunCreated ¶
ReadRunCreated returns the immutable run-created doc (rcid, stack, origin rid, …) for a runID.
func (*Runs) ReadStageSuspended ¶
ReadStageSuspended loads the per-stage manifest + scope-entry envelope.
func (*Runs) ReadTraceLinks ¶
ReadTraceLinks assembles the trace linkage for a run: rcid + origin rid (from run-created) and a stable, deterministic resume RID per suspended stage (from the stage-suspended docs). Used by the admin trace-detail handler to render inline cross-links.
func (*Runs) RecordAccepted ¶
func (r *Runs) RecordAccepted(ctx context.Context, runID, stage string, ordinal int, op, workerJobID string) error
RecordAccepted records a worker's 202 ack (informational; not required for terminal derivation).
func (*Runs) RecordDeferredTerminal ¶
func (r *Runs) RecordDeferredTerminal(ctx context.Context, runID, opc, status string, payload []byte) (recorded bool, err error)
RecordDeferredTerminal records a deferred op's result at its opc-keyed location. First terminal (success OR failure) wins; a duplicate/late callback gets ErrExists on the terminal doc ⇒ recorded=false, a harmless no-op. Mirrors RecordTerminal. status must be "completed" or "failed".
func (*Runs) RecordTerminal ¶
func (r *Runs) RecordTerminal(ctx context.Context, runID, stage string, ordinal int, op, status string, payload []byte) (recorded bool, err error)
RecordTerminal writes the op's result blob then the create-if-absent op-terminal doc. The first terminal (success OR failure) wins; a duplicate/late callback gets ErrExists on op-terminal ⇒ recorded=false and is a harmless no-op. status must be "completed" or "failed".
func (*Runs) ResolveOpContinuation ¶
func (*Runs) ResolveRequestContinuation ¶
ResolveRequestContinuation maps an originating request rid → runID (ErrNotFound when the request did not suspend into a continuation).
func (*Runs) ResolveRunContinuation ¶
func (*Runs) SetDeferredSuspendedAt ¶
SetDeferredSuspendedAt records (create-if-absent) the join scope a run suspended at while waiting for a deferred op. The worker callback reads this to learn which dynamic stage to ClaimResume — without it the callback can't find the suspend (the join scope isn't known at dispatch).
func (*Runs) StageState ¶
func (r *Runs) StageState(ctx context.Context, runID, stage string, manifest []OpManifestEntry) (string, error)
StageState derives the state of one stage from doc existence, in the fixed precedence order. manifest is that stage's op manifest.
func (*Runs) SuspendStage ¶
func (r *Runs) SuspendStage(ctx context.Context, runID, stage, scopeEnvelope, stackVersionID string, manifest []OpManifestEntry) error
SuspendStage writes the per-stage immutable manifest + scope-entry envelope. Idempotent: a re-entered suspend for the same stage is a no-op (ErrExists swallowed).
func (*Runs) WriteOpstackSnapshot ¶
WriteOpstackSnapshot stores the resolved opstack the run was suspended against. Create-if-absent: only the first suspend of a run writes it; later re-suspends of the same multi-stage run reuse it (immutable, so a later txco apply cannot change what this run resumes against).
type StageSuspended ¶
type Store ¶
type Store interface {
// Create writes data at key iff key does not already exist. Returns
// ErrExists (and writes nothing) when the key is present. The write is
// atomic: a reader never observes a partial object.
Create(ctx context.Context, key string, r io.Reader, meta Meta) (Ref, error)
// Get returns the full object bytes. ErrNotFound if absent. Bytes are
// fully buffered — continuation docs and op JSON are bounded.
Get(ctx context.Context, key string) ([]byte, Meta, error)
// Exists reports whether key is present. Cheaper than Get for the
// existence checks that derive run/stage state.
Exists(ctx context.Context, key string) (bool, error)
// List returns the keys present under prefix (recursively).
List(ctx context.Context, prefix string) ([]string, error)
// Delete removes key. Absent key is not an error (idempotent).
Delete(ctx context.Context, key string) error
// Name is the backend identity recorded in Ref.Store.
Name() string
}
Store is the continuation object store. Create is create-if-absent ONLY: there is deliberately no overwrite and no compare-and-swap. Immutability plus create-if-absent is the entire coordination model.
type StoreConfig ¶
type StoreConfig struct {
FileDir string
// Reserved for an out-of-tree S3-compatible backend (enterprise seam).
S3Bucket string
S3Prefix string
}
StoreConfig carries backend-selecting options resolved from chassis config. Only the file backend is wired in open core; the S3 fields are reserved so an out-of-tree backend can register itself without changing this signature.
type TraceLinks ¶
type TraceLinks struct {
RunID string `json:"run_id"`
RunContinuationID string `json:"run_continuation_id,omitempty"`
OriginRID string `json:"origin_rid,omitempty"`
Resumes []ResumeRef `json:"resumes,omitempty"`
}
TraceLinks is the cross-navigation data for a continuation's traces: the originating request rid and one resume trace per suspended stage.