Documentation
¶
Overview ¶
Package engine defines the foundational primitives shared by every local execution engine in FlowCraft (e.g. sdk/graph DAG executor, future script-based or native-Go executors).
Position in the layering ¶
engine sits below sdk/agent and below any concrete engine implementation. It is deliberately ignorant of agent-level concepts: there is no Agent, no Memory, no Request/Result, no chat-specific Var conventions in this package. Anything that knows about "agents", "messages" or "memory" belongs in sdk/agent or higher.
Allowed dependencies:
- sdk/event (for Envelope only; engine does NOT use Bus)
- sdk/errdefs (for the interrupted-error classification)
- sdk/model (for Message in Board's typed channels and Part in user-prompt payloads)
- standard library
engine MUST NOT import sdk/agent, sdk/agent/strategy, sdk/graph, sdk/script, sdk/history, sdk/recall, sdk/llm, sdk/tool, sdk/workflow.
The contract at a glance ¶
An engine receives three things at run time:
Execute(ctx, run Run, host Host, board *Board) (*Board, error)
- run — read-only metadata (ID, Attributes, Deps);
- host — capabilities the engine may invoke (Publish events,
listen for Interrupts, AskUser, Checkpoint);
- board — shared blackboard the engine mutates as it runs.
The Host interface is a *composition* of small interfaces:
type Host interface {
Publisher // Publish(ctx, env) error
Interrupter // Interrupts() <-chan Interrupt
UserPrompter // AskUser(ctx, prompt) (UserReply, error)
Checkpointer // Checkpoint(ctx, cp) error
UsageReporter // ReportUsage(ctx, usage)
}
Downstream code (graph nodes, tools, …) should depend on the smallest interface it actually needs (Publisher alone is the common case) rather than the full Host. This keeps node signatures honest about their requirements.
What lives here ¶
Board / BoardSnapshot / Cloneable — shared blackboard state and typed message channels. Any engine that wants a key/value store and ordered message lists reuses these.
Run — per-execution input bundle (ID, Attributes, Deps, ResumeFrom) as a plain data struct. Setting Run.ResumeFrom is how the host requests a resume; the engine interprets the opaque [Checkpoint.Step] / [Checkpoint.Payload] it produced earlier.
Host and the five small interfaces it composes — the surface the engine uses to interact with its host runtime.
Interrupt + Cause + InterruptedError — cooperative-stop primitive. Engines select on Host.Interrupts(); they convert a received Interrupt into an error via Interrupted, which satisfies errdefs.IsInterrupted and carries the Cause for the host to inspect via errors.As.
UserPrompt / UserReply — engine-agnostic, multi-modal (model.Part) prompt/response payloads for input-required steps.
Checkpoint + CheckpointStore — engine-agnostic persistence contract for resumable execution. Each engine decides what its own Step / Payload look like.
Engine — uniform Execute interface (and EngineFunc adapter) so the agent layer can drive any engine through a single shape.
NoopHost / NoopCheckpointStore — zero-cost stand-ins for tests and embedded scenarios.
Subject schema (subjects.go) — the cross-engine event-routing convention every implementation MUST follow when publishing run lifecycle, step lifecycle, and stream-delta envelopes. Public Subject* / Pattern* builders, the StreamDeltaPayload decode contract, and SanitiseID live here so consumers (voice, SSE bridges, dashboards) can route on subject without importing any concrete engine.
Resume primitives (resume.go) — the optional Resumer capability interface, ResumeContext (per-attempt metadata threaded through ctx) and the LoadAndResume helper that wires a CheckpointStore to an Engine in one call. See "Resume usage" below for examples.
Stream-delta emit helpers (stream_emit.go) — EmitStreamToken / EmitStreamToolCall / EmitStreamToolResult / EmitStreamDelta let ANY node (not just LLM nodes) publish in-flight increments without re-implementing the envelope construction + header-stamping boilerplate. Custom long-running nodes (RAG loaders, batch transformers, externally-driven tool wrappers) can surface progress on the same SubjectStreamDelta channel LLM nodes use, so consumer code stays uniform regardless of which node generated the increment.
Capability description (capabilities.go) — the optional Describer interface lets engines advertise SupportsResume, EmitsUserPrompt, EmitsCheckpoint and RequiredDepNames. Hosts read via CapabilitiesOf (works on Describer + non-Describer engines via zero-value default). WithCapabilities wraps an EngineFunc with a Describer adapter so closure-style engines can declare their capabilities without becoming named structs.
Dependency naming (depname/) — string constants for conventional Dependencies keys (depname.LLMClient, depname.ToolRegistry, …). Engines that publish RequiredDepNames in their Capabilities should reference these constants so hosts and engines agree on the wire vocabulary.
Host-on-context plumbing (host_ctx.go) — WithHost / HostFromContext let engines hand the Host to extension points whose signatures don't carry it (canonically: sdk/tool's Tool.Execute). Built-in tools that need host capabilities (e.g. sdk/tool/builtin/askuser) recover the host this way without each engine inventing its own ctx-key convention.
What does NOT live here ¶
- StreamCallback / StreamEvent — replaced by Publisher + event.Envelope.
- Memory / MemorySession — that is a sdk/history + sdk/recall concern at the agent layer.
- Strategy / Runnable / Disposition / ResumeToken — those are agent ↔ engine adapter contracts and live in sdk/agent and sdk/agent/strategy.
- Engine kind enumeration — engine does not reserve a "type" namespace or list which engines exist; routing on subject is the only cross-engine identification mechanism.
Resume usage ¶
Hosts that want to drive resumable execution typically reach for LoadAndResume. It loads the most recent checkpoint, validates it against the engine's Resumer (if implemented), populates [Run.ResumeFrom], threads a ResumeContext onto ctx and finally calls Execute:
board, err := engine.LoadAndResume(ctx, eng, host, store,
engine.Run{ID: runID}, nil,
engine.WithResumeSignal("crash"),
engine.WithResumeAttempt(2),
engine.WithFreshStartAllowed(false),
)
Engines opt in to resume validation by implementing Resumer:
func (e *myEngine) CanResume(cp engine.Checkpoint) error {
if cp.Payload == nil {
return errdefs.Validation(errors.New("missing payload"))
}
return nil
}
Engines (and observers / middleware) read replay metadata from ctx inside Execute:
if rc, ok := engine.ResumeContextFromContext(ctx); ok && rc.Attempt > 1 {
// mark telemetry: this is a replay
}
The contract intentionally separates WHERE (engine-defined [Run.ResumeFrom]) from WHY (host-defined ResumeContext); engines that do not implement Resumer still honour the resume contract described on [Run.ResumeFrom].
Capability discovery ¶
Hosts that need to know what an engine can do BEFORE invoking Execute (e.g. agent.Run preflight, dashboard rendering, vessel admission) call CapabilitiesOf:
caps := engine.CapabilitiesOf(eng)
if !caps.SupportsResume && run.ResumeFrom != nil {
return errdefs.NotAvailablef("engine cannot resume")
}
Engines opt in by implementing Describer:
func (e *myEngine) Capabilities() engine.Capabilities {
return engine.Capabilities{
SupportsResume: true,
EmitsCheckpoint: true,
RequiredDepNames: []string{depname.LLMClient},
}
}
Closure-style engines wrap with WithCapabilities:
return engine.WithCapabilities(
engine.EngineFunc(func(...) (...) { ... }),
engine.Capabilities{EmitsCheckpoint: true},
), nil
Capabilities are declarative — the matching behaviour MUST exist. Tests should pin the pairing (see graph runner's HonestlyReportsCurrentBehaviour test for the pattern).
Index ¶
- Constants
- func EmitStreamDelta(ctx context.Context, pub Publisher, runID, stepActor string, ...) error
- func EmitStreamToken(ctx context.Context, pub Publisher, runID, stepActor, content string) error
- func EmitStreamToolCall(ctx context.Context, pub Publisher, runID, stepActor, id, name string, ...) error
- func EmitStreamToolResult(ctx context.Context, pub Publisher, ...) error
- func GetDep[T any](d *Dependencies, key any) (T, error)
- func GetTyped[T any](b *Board, key string) (T, bool)
- func Interrupted(intr Interrupt) error
- func IsResumable(eng Engine) bool
- func IsStreamDelta(s event.Subject) bool
- func MergeInterrupts(ctx context.Context, sources ...<-chan Interrupt) <-chan Interrupt
- func MustGetDep[T any](d *Dependencies, key any) T
- func PatternAllRuns() event.Pattern
- func PatternRun(runID string) event.Pattern
- func PatternRunSteps(runID string) event.Pattern
- func PatternRunStream(runID string) event.Pattern
- func SanitiseID(id string) string
- func SubjectRunEnd(runID string) event.Subject
- func SubjectRunStart(runID string) event.Subject
- func SubjectStepComplete(runID, stepActor string) event.Subject
- func SubjectStepError(runID, stepActor string) event.Subject
- func SubjectStepStart(runID, stepActor string) event.Subject
- func SubjectStreamDelta(runID, stepActor string) event.Subject
- func SuggestCheckpoint(e Engine) error
- func WithHost(ctx context.Context, h Host) context.Context
- func WithResumeContext(ctx context.Context, rc ResumeContext) context.Context
- type Board
- func (b *Board) AppendChannelMessage(name string, msg model.Message)
- func (b *Board) AppendSliceVar(key string, value any) error
- func (b *Board) Channel(name string) []model.Message
- func (b *Board) ChannelsCopy() map[string][]model.Message
- func (b *Board) GetVar(key string) (any, bool)
- func (b *Board) GetVarString(key string) string
- func (b *Board) RestoreFrom(snap *BoardSnapshot)
- func (b *Board) SetChannel(name string, msgs []model.Message)
- func (b *Board) SetVar(key string, value any)
- func (b *Board) Snapshot() *BoardSnapshot
- func (b *Board) UpdateSliceVarItem(key string, match func(any) bool, update func(any) any)
- func (b *Board) Vars() map[string]any
- type BoardSnapshot
- type Capabilities
- type Cause
- type Checkpoint
- type CheckpointDeleter
- type CheckpointLister
- type CheckpointStore
- type CheckpointSuggester
- type Checkpointer
- type Cloneable
- type Dependencies
- type Describer
- type Engine
- type EngineFunc
- type Host
- type HostFuncs
- func (h HostFuncs) AskUser(ctx context.Context, prompt UserPrompt) (UserReply, error)
- func (h HostFuncs) Checkpoint(ctx context.Context, cp Checkpoint) error
- func (h HostFuncs) Interrupts() <-chan Interrupt
- func (h HostFuncs) Publish(ctx context.Context, env event.Envelope) error
- func (h HostFuncs) ReportUsage(ctx context.Context, usage model.TokenUsage) error
- type HostMiddleware
- type Interrupt
- type InterruptedError
- type Interrupter
- type LoadAndResumeOption
- type NoopCheckpointStore
- type NoopHost
- func (NoopHost) AskUser(context.Context, UserPrompt) (UserReply, error)
- func (NoopHost) Checkpoint(context.Context, Checkpoint) error
- func (NoopHost) Interrupts() <-chan Interrupt
- func (NoopHost) Publish(context.Context, event.Envelope) error
- func (NoopHost) ReportUsage(context.Context, model.TokenUsage) error
- type Publisher
- type ResumeContext
- type Resumer
- type Run
- type StreamDeltaPayload
- type StreamDeltaType
- type StreamRouter
- type StreamRouterOption
- type StreamSink
- type StreamSinkFunc
- type UsageReporter
- type UserPrompt
- type UserPrompter
- type UserReply
Constants ¶
const MainChannel = "__main_channel"
MainChannel is the default message channel key.
Channels are an engine-level primitive: they let nodes/steps share ordered message sequences without going through Vars. Convention-level keys for "the chat transcript", "the answer", etc. belong to the agent layer; this package only provides the channel mechanism.
Naming rule: any board key (var or channel) reserved by the engine itself uses the "__" prefix. User-domain code MUST NOT introduce channel or var names beginning with "__"; doing so risks colliding with a future engine-managed slot. Existing reserved names besides MainChannel include graph-level vars VarInterruptedNode and VarToolCalls (see sdk/graph).
const SubjectPrefix = "engine.run."
SubjectPrefix is the fixed root every engine envelope subject MUST start with. Exposed as a constant so consumers can check strings.HasPrefix without re-deriving it.
Variables ¶
This section is empty.
Functions ¶
func EmitStreamDelta ¶ added in v0.2.7
func EmitStreamDelta(ctx context.Context, pub Publisher, runID, stepActor string, payload StreamDeltaPayload) error
EmitStreamDelta is the low-level form of the EmitStreamX helpers. Custom nodes that need to set fields outside the type-specific helpers (e.g. a forward-compatible Type the SDK does not yet ship a helper for) build the payload themselves and pass it here. Required per-Type fields are validated to mirror the contract enforced by DecodeStreamDelta on the consumer side, so a malformed delta is caught at publish time instead of silently flowing to subscribers.
stepActor follows the contract documented at the top of subjects.go: it MUST start with the executing agent.id (so [PatternRunAgentStream] can fan-in by agent) and MAY append an engine-private suffix (graph runner: ".node.<nodeID>"; vessel inline: ".iter<N>"). Both runID and stepActor are sanitised by SanitiseID so caller-supplied values cannot fragment the resulting subject.
The envelope is stamped with HeaderRunID. The agent identifier is derived from the stepActor segment ahead of any optional ".node." / ".iter" suffix — it goes onto HeaderAgentID (and the legacy HeaderActorID via event.Envelope.SetAgentID dual-write). For header-routed subscribers that key off the node id, the HeaderNodeID is populated whenever stepActor carries the graph runner's "<agent>.node.<nodeID>" form so the two transports stay aligned.
Publish errors are returned to the caller (unlike the executor's fire-and-forget convention) so node authors can decide whether to retry or surface the failure; in practice most callers just discard the error because stream deltas are observability, not control flow.
func EmitStreamToken ¶ added in v0.2.7
EmitStreamToken publishes one assistant-token delta on the canonical stream subject. See EmitStreamDelta for the stepActor format requirement.
Use this from any node that produces incremental textual output — for example a custom RAG retriever streaming its working notes, or a post-processing node turning structured data into prose. content may be empty (callers that want "still alive" heartbeats should typically mark them differently); empty content is published as-is so the helper stays predictable.
func EmitStreamToolCall ¶ added in v0.2.7
func EmitStreamToolCall(ctx context.Context, pub Publisher, runID, stepActor, id, name string, args any) error
EmitStreamToolCall publishes one tool-call delta. id and name are required (consumers correlate the eventual tool_result by ID); args is the tool input the model produced and may be either a JSON string or an already-decoded map / slice — both are valid per the StreamDeltaPayload contract. See EmitStreamDelta for the stepActor format requirement.
The helper validates the required fields up-front and returns a descriptive error instead of publishing a malformed envelope; callers that already validated upstream can ignore the error safely.
func EmitStreamToolResult ¶ added in v0.2.7
func EmitStreamToolResult(ctx context.Context, pub Publisher, runID, stepActor, toolCallID, name, content string, isError, cancelled bool) error
EmitStreamToolResult publishes one tool-result delta. toolCallID and content are required (toolCallID pairs the result with the originating tool_call); name is recommended so consumers can render the result without a separate dispatch table. isError marks unsuccessful results; cancelled marks synthesised cancellations emitted when the round was interrupted before the call dispatched. See EmitStreamDelta for the stepActor format requirement.
func GetDep ¶
func GetDep[T any](d *Dependencies, key any) (T, error)
GetDep is a generic helper that retrieves a typed dependency. It returns an error when the key is missing or when the stored value is not assignable to T, so callers can surface configuration mistakes early instead of failing with a nil-pointer panic deep inside an engine.
func GetTyped ¶
GetTyped retrieves a typed value from the Board's vars. It is a generic helper rather than a method because Go does not allow type parameters on methods.
func Interrupted ¶
Interrupted wraps an Interrupt as an error that satisfies errdefs.IsInterrupted. The recommended usage from an engine is:
case intr := <-h.Interrupts():
return engine.Interrupted(intr)
Hosts inspecting the result use the standard errdefs / errors.As idiom:
if errdefs.IsInterrupted(err) {
var ie engine.InterruptedError
if errors.As(err, &ie) {
switch ie.Cause {
case engine.CauseUserInput: ...
}
}
}
A zero-value Interrupt still produces a well-formed error so callers don't need to special-case CauseUnknown.
func IsResumable ¶ added in v0.2.9
IsResumable reports whether eng implements Resumer. Use the two-value variant when you want the typed handle:
if r, ok := engine.AsResumer(eng); ok { _ = r.CanResume(cp) }
func IsStreamDelta ¶
IsStreamDelta reports whether s is a stream-delta subject. Cheap (string-only) so consumers can filter envelopes before the more expensive payload decode.
Implementation note: matches subjects shaped like "engine.run.<runID>.stream.<stepActor>.delta" — i.e. the prefix is SubjectPrefix, contains ".stream." and ends with ".delta".
func MergeInterrupts ¶ added in v0.2.5
MergeInterrupts fans-in N independent interrupt channels into a single output channel — the natural shape sandbox / pod hosts need to combine "user cancel", "SIGTERM", "budget exceeded", "graceful stop", … into the one channel an Engine reads from [Host.Interrupts].
Behaviour:
- The returned channel is closed once ctx is cancelled OR every non-nil source channel has been closed. Engines reading from it therefore see EOF when "everyone is done", matching how a single cooperative-interrupt channel behaves today.
- Nil source channels are skipped silently — matches the documented "nil means never fires" semantic in [Interrupter.Interrupts] and keeps callers from having to filter their slice.
- When zero non-nil sources are supplied the returned channel is a never-fires channel that is closed when ctx is cancelled. This keeps the helper total — engines selecting on it stay correct even in the trivial case.
- Order of forwarded interrupts is the natural runtime arrival order across the source channels. No de-duplication: a host that fires "shutdown" through two distinct sources will surface both.
Forwarding goroutines exit promptly when ctx is cancelled, so the helper is safe to use in long-lived hosts that get re-created across reloads.
func MustGetDep ¶
func MustGetDep[T any](d *Dependencies, key any) T
MustGetDep is like GetDep but panics on error. Use it only inside engine internals where a missing dependency is a programming bug (e.g. a node referenced a dep that the host did not register).
func PatternAllRuns ¶
PatternAllRuns returns the wildcard pattern matching every event from every run.
engine.run.>
func PatternRun ¶
PatternRun returns the wildcard pattern matching every event of one run, regardless of engine implementation or engine-private extension.
engine.run.<runID>.>
func PatternRunSteps ¶
PatternRunSteps returns the wildcard pattern matching every step lifecycle event (start / complete / error and any engine-private step.* extension such as graph runner's "skipped") of one run.
engine.run.<runID>.step.>
func PatternRunStream ¶
PatternRunStream returns the wildcard pattern matching every stream delta of one run. Use this when you want LLM token / tool deltas but not the step lifecycle events.
Agent-level fan-in ("only agent X's events in this run") is NOT available as a NATS wildcard because the stepActor segment is collapsed into one token by SanitiseID (see the file header). Consumers route by agent through the event.HeaderAgentID envelope header instead — subscribe with PatternRun(runID) and filter on env.AgentID() in the consumer.
engine.run.<runID>.stream.>
func SanitiseID ¶
SanitiseID escapes characters that would corrupt an event.Subject when the input is concatenated into one. event.Subject segments are separated by '.', and '*' / '>' are reserved by event.Pattern for wildcards; any of these in a runID / stepActor would either fragment the subject or turn it into an unintended pattern. SanitiseID replaces each occurrence with '_'.
Empty input becomes "_" so the resulting subject keeps a constant segment count even when the engine forgot to mint an id.
Engines are expected to call SanitiseID on every user-supplied fragment they splice into a subject. The Subject* / Pattern* builders in this file already do so for their parameters; engine implementations only need it when constructing private extensions of their own.
func SubjectRunEnd ¶
SubjectRunEnd returns the subject every engine MUST publish exactly once when [Engine.Execute] returns, regardless of outcome (clean completion, interrupt, cancel, failure).
engine.run.<runID>.end
func SubjectRunStart ¶
SubjectRunStart returns the subject every engine MUST publish exactly once when [Engine.Execute] begins.
engine.run.<runID>.start
func SubjectStepComplete ¶
SubjectStepComplete returns the subject every engine MUST publish when one step finishes successfully. See SubjectStepStart for the stepActor format.
engine.run.<runID>.step.<stepActor>.complete
func SubjectStepError ¶
SubjectStepError returns the subject every engine MUST publish when one step fails (i.e. when it would normally cause Execute to return a non-nil non-interrupt error). See SubjectStepStart for the stepActor format.
engine.run.<runID>.step.<stepActor>.error
func SubjectStepStart ¶
SubjectStepStart returns the subject every engine MUST publish when it begins executing one step. stepActor identifies the unit of work; it MUST start with the executing agent.id (so consumers can reconstruct the agent identity from the subject when the envelope header is unavailable). See the file header for the per-engine suffix conventions (graph: ".node.<id>"; vessel inline: ".iter<N>") and for why agent-level NATS wildcard fan-in goes through the event.HeaderAgentID header instead of the subject.
engine.run.<runID>.step.<stepActor>.start
func SubjectStreamDelta ¶
SubjectStreamDelta returns the subject every engine MUST use when emitting an in-flight increment from the step identified by stepActor — the canonical example is one LLM token, one dispatched tool call, or one tool result. See SubjectStepStart for the stepActor format.
Payload MUST decode to a StreamDeltaPayload; see its docs for the per-Type field requirements.
engine.run.<runID>.stream.<stepActor>.delta
func SuggestCheckpoint ¶ added in v0.2.5
SuggestCheckpoint asks the engine for a voluntary checkpoint when the engine implements CheckpointSuggester; otherwise it is a no-op. Returns the engine's error directly so the host can log / retry; nil is returned both for "engine does not support suggestion" and for "engine accepted the suggestion".
Walks any WithCapabilities-style wrappers via Unwrap so a suggester wrapped to advertise capabilities still surfaces.
func WithHost ¶ added in v0.3.4
WithHost returns a derived context carrying h. Engines that dispatch to extension points which were not designed to receive the Host directly (sdk/tool's Tool.Execute signature, custom plugin callbacks, …) call WithHost before invoking those extensions so the extension can recover the Host via HostFromContext.
The intended consumer pattern:
// engine side: just before invoking the tool registry
ctx = engine.WithHost(ctx, host)
results := reg.ExecuteAll(ctx, calls)
// tool side
host, ok := engine.HostFromContext(ctx)
if ok {
reply, err := host.AskUser(ctx, prompt)
}
nil h is allowed and a no-op (returns ctx unchanged) so callers can plumb a possibly-nil host without conditional branches.
Engines MUST NOT use the host stashed here as a substitute for the host argument they receive in [Engine.Execute] — the argument is the contract; the context-carried copy is purely a transport for downstream extensions that lack a Host parameter.
func WithResumeContext ¶ added in v0.2.9
func WithResumeContext(ctx context.Context, rc ResumeContext) context.Context
WithResumeContext returns a derived context carrying rc. Engines that publish telemetry attribs (attempt count, resume signal) pull from here instead of plumbing extra parameters.
Types ¶
type Board ¶
type Board struct {
// contains filtered or unexported fields
}
Board is the engine execution blackboard: typed message channels plus untyped control vars, both protected by the same mutex.
Thread safety: every public method takes a mutex. Concurrent reads use RLock; mutations use Lock. The contract matches the original graph.Board it replaces — callers that previously held graph.Board across goroutines do not need to add any new locking.
Board is intentionally ignorant of agent concepts. It does not know what "messages", "answer" or "run id" mean; those names are established by callers. Per-execution metadata (ID, Attributes, Deps) belongs on Run, not here.
func LoadAndResume ¶ added in v0.2.9
func LoadAndResume( ctx context.Context, eng Engine, host Host, store CheckpointStore, run Run, board *Board, opts ...LoadAndResumeOption, ) (*Board, error)
LoadAndResume is the canonical host-side helper for "either continue the existing run or start fresh". It:
- Loads the most recent checkpoint for run.ID from store.
- If a checkpoint exists, validates it against the engine's Resumer (if implemented) — invalid checkpoints surface immediately rather than after a partial Execute.
- Populates run.ResumeFrom and threads a ResumeContext onto ctx so the engine, observers and middleware see consistent replay metadata.
- Calls eng.Execute and returns its result.
store may be nil; that is treated as "no checkpoints persisted" and is equivalent to a fresh-start with WithFreshStartAllowed honoured. board is the bootstrap board used on fresh starts and when the engine wishes to keep the host-supplied initial state (the engine itself decides whether to override with ResumeFrom.Board per the [Run.ResumeFrom] contract).
LoadAndResume is intentionally a one-shot helper, not a retry loop: a supervisor that wants exponential backoff or budget enforcement composes LoadAndResume with its own retry policy.
func NewBoard ¶
func NewBoard() *Board
NewBoard creates an empty Board with an initialised main channel so callers can Board.AppendChannelMessage without a nil-check.
func RestoreBoard ¶
func RestoreBoard(snap *BoardSnapshot) *Board
RestoreBoard reconstructs a Board from a snapshot. Passing nil returns a fresh empty board so resume code can use this unconditionally.
func (*Board) AppendChannelMessage ¶
AppendChannelMessage appends a message to a channel, creating the channel on demand.
func (*Board) AppendSliceVar ¶
AppendSliceVar atomically appends a value to a slice stored in a board variable. It returns an error if the existing value is not a []any (instead of silently overwriting), so callers cannot lose data by typo'ing a key already used for a non-slice value.
func (*Board) Channel ¶
Channel returns a copy of messages for the given channel. An empty or missing channel returns a nil slice (not a zero-length slice) so callers can use len() == 0 uniformly.
func (*Board) ChannelsCopy ¶
ChannelsCopy returns a deep copy of all channel message lists. Used by parallel branch execution to give each branch an independent view that can later be merged.
func (*Board) GetVarString ¶
GetVarString retrieves a board variable as a string. It returns "" when the key is missing or the stored value is not a string.
func (*Board) RestoreFrom ¶
func (b *Board) RestoreFrom(snap *BoardSnapshot)
RestoreFrom overwrites this board from a snapshot. Used by retry / rollback paths inside an executor: the executor takes a snapshot before a risky step, then restores the same Board (preserving its identity for nodes that captured a pointer to it) on failure.
func (*Board) SetChannel ¶
SetChannel replaces the entire message list for a channel. The input slice is copied; later mutations by the caller do not affect the Board.
func (*Board) Snapshot ¶
func (b *Board) Snapshot() *BoardSnapshot
Snapshot returns a serialisable copy of the current state. Vars are deep-copied so the snapshot is safe to retain after further Board mutations; values implementing Cloneable are duplicated through their Clone method, otherwise reflection is used.
func (*Board) UpdateSliceVarItem ¶
UpdateSliceVarItem finds and updates the first matching item in a slice variable. Missing keys and non-slice values are silently ignored — the typical use is "update the entry I just appended", and the caller has already verified the slice exists.
type BoardSnapshot ¶
type BoardSnapshot struct {
Vars map[string]any `json:"vars"`
Channels map[string][]model.Message `json:"channels,omitempty"`
}
BoardSnapshot is a serialisable representation of board state, used for resume / checkpoint flows. It carries no live mutex and is safe to JSON-encode.
type Capabilities ¶ added in v0.2.5
type Capabilities struct {
// SupportsResume reports whether Execute can honour
// Run.ResumeFrom. Engines that always return errdefs.NotAvailable
// for a non-nil ResumeFrom MUST leave this false; pods enforcing
// RestartPolicy=Always need the true case to recover mid-run state
// without losing partial work.
SupportsResume bool
// EmitsUserPrompt reports whether the engine may call
// Host.AskUser during Execute. Pods deploying in headless / batch
// mode use this to refuse engines that would block waiting for a
// reply that nobody can supply.
EmitsUserPrompt bool
// EmitsCheckpoint reports whether the engine writes Checkpoints
// during Execute (independently of SupportsResume — an engine
// can write checkpoints that only an external tool can replay).
// Pods use this to decide whether to attach a CheckpointStore.
EmitsCheckpoint bool
// RequiredDepNames lists the named dependencies the engine
// expects in Run.Deps. Names follow the convention enumerated
// in the [sdk/engine/depname] package — engines reference the
// constants there rather than open-coding strings so a typo
// becomes a compile-time error. Hosts populate Dependencies
// under the same names the engine declares here.
//
// Pods, agent.Run pre-flight, and the vessel build path
// iterate this list and reject the spec / run when a required
// dep is absent — surfacing wiring mistakes before any
// engine.Execute call. Empty when the engine has no required
// deps.
//
// This is a *named* declaration deliberately. The underlying
// Dependencies map is keyed by any so engines cannot
// meaningfully expose their concrete typed keys to a generic
// pod controller; the string convention is the only thing
// dashboards / admin APIs can serialise.
//
// [sdk/engine/depname]: https://pkg.go.dev/github.com/GizClaw/flowcraft/sdk/engine/depname
RequiredDepNames []string
}
Capabilities describes the optional features an Engine implementation declares to its host. The pod / agent layer reads these to:
- validate a PodSpec at Apply time (e.g. RestartPolicy=Always requires SupportsResume=true, otherwise the spec is rejected before any work starts);
- decide which hooks to wire (configure CheckpointStore only when EmitsCheckpoint is true);
- refuse to run an engine that needs user prompts in a headless deployment (EmitsUserPrompt=true on a host without an interactive channel becomes a fail-fast).
All fields default to "do not claim the capability" (zero value). Engines that do not satisfy the Describer interface are treated as the zero Capabilities — the most conservative assumption.
func CapabilitiesOf ¶ added in v0.2.5
func CapabilitiesOf(e Engine) Capabilities
CapabilitiesOf returns the engine's declared capabilities, or the zero value when the engine does not implement Describer. Hosts SHOULD use this helper rather than ad-hoc type assertions so the "missing = zero" convention stays uniform.
type Cause ¶
type Cause string
Cause classifies why a run was asked to stop. The agent layer maps these onto its higher-level commit/discard policy (e.g. discard the partial output on user_input, commit it on host_shutdown).
Engines should NEVER branch on Cause for control flow — Cause is metadata for the host, not a directive for the engine. The engine's only correct response to any cause is "stop cleanly and return".
const ( // CauseUnknown is the zero value. Hosts should avoid sending it; // it exists so a zero-value Interrupt is recognisable. CauseUnknown Cause = "" // CauseUserCancel is a user-initiated cancel ("stop talking", // "abort this turn"). Output is typically discarded. CauseUserCancel Cause = "user_cancel" // CauseUserInput is a barge-in: the user spoke / typed and the // agent should yield to fresh input. Output is typically discarded. CauseUserInput Cause = "user_input" // CauseHostShutdown is a graceful shutdown from the host. // Output should typically be committed if any was produced. CauseHostShutdown Cause = "host_shutdown" // CauseCustom carries a host-defined cause in [Interrupt.Detail]. CauseCustom Cause = "custom" )
type Checkpoint ¶
type Checkpoint struct {
// ExecID identifies the engine execution this checkpoint belongs
// to. MUST equal the producing [Run.ID].
ExecID string `json:"exec_id"`
// Step is an opaque, engine-defined marker that locates "where"
// the run is. For graph it is typically the next node id; for a
// script engine it might be a continuation id. The host treats
// this as opaque bytes.
Step string `json:"step,omitempty"`
// Iteration is an optional monotonic counter for engines that
// loop (graph re-entry counter, scheduler tick, …). Zero is fine
// when the engine doesn't track iterations.
Iteration int `json:"iteration,omitempty"`
// Board is the Board state at the boundary. Always non-nil.
Board *BoardSnapshot `json:"board"`
// Payload is engine-specific extra state the engine wants to
// persist alongside the Board. Treated as opaque JSON by the
// store; the producing engine is the only consumer that knows
// how to decode it.
Payload json.RawMessage `json:"payload,omitempty"`
// Attributes mirrors [Run.Attributes] at the time the checkpoint
// was produced (run id at the agent layer, tenant, graph id, …).
// Stores may use these for indexing/lookup.
Attributes map[string]string `json:"attributes,omitempty"`
// Timestamp is the wall-clock time the engine produced the
// checkpoint. Hosts may overwrite when they actually persist.
Timestamp time.Time `json:"timestamp"`
// OriginalStartedAt is the wall-clock time the original (fresh)
// run started. Stays constant across resumes so dashboards
// computing total wall time (e.g. SLO budget burn) don't reset
// every time a host re-loads the checkpoint and resumes.
//
// Engines SHOULD copy this from [ResumeContext.StartedAt] (when
// resuming) or from the time they began the fresh run (when
// producing the first checkpoint). Hosts driving resume via
// [LoadAndResume] thread the value through automatically — the
// helper reads OriginalStartedAt off the loaded checkpoint and
// stamps it back on the next ResumeContext.
//
// Zero time means "not recorded" (engines that ship before this
// field was added, or that don't track wall time). Consumers
// should fall back to Timestamp in that case.
OriginalStartedAt time.Time `json:"original_started_at,omitempty"`
// SpecVersion identifies the engine's spec / definition version
// at the time the checkpoint was produced. The format is
// engine-defined: graph runner uses the [GraphMeta.Version]
// string; a script engine could store a content hash; vessel
// composes the spec document version.
//
// CanResume implementations compare this against the engine's
// current version: a mismatch means the underlying spec has
// drifted (graph re-edited, script reloaded, vessel reapplied
// with new agent definition) and silently resuming would replay
// against semantics the original run never saw. Engines that
// detect drift surface errdefs.NotAvailable from CanResume so
// the host can either fall back to a fresh run or surface the
// mismatch to the operator.
//
// Empty means "no version recorded" — older checkpoints, or
// engines that have no concept of versioned spec. CanResume
// MUST treat empty as "skip drift check" rather than "always
// fail" so old checkpoints stay loadable.
SpecVersion string `json:"spec_version,omitempty"`
}
Checkpoint is the engine-agnostic persistence record produced at a safe boundary during execution. Each engine decides what its own step marker / payload looks like; this struct only owns the common envelope shape.
Engines populate Checkpoint and hand it to [Checkpointer.Checkpoint] (the host method). The host is responsible for writing it durably; engines must not assume the call has persisted anything.
type CheckpointDeleter ¶
CheckpointDeleter optionally extends CheckpointStore with the ability to delete a single execution's checkpoints. Used by the host when a run completes successfully and its checkpoints are no longer needed.
type CheckpointLister ¶
CheckpointLister optionally extends CheckpointStore with the ability to enumerate persisted exec ids. Stores that support listing satisfy this interface; agent-level resume / dashboard code can type-assert to it.
type CheckpointStore ¶
type CheckpointStore interface {
Save(ctx context.Context, cp Checkpoint) error
Load(ctx context.Context, execID string) (*Checkpoint, error)
}
CheckpointStore is the host-side persistence contract. The host's [Checkpointer.Checkpoint] implementation typically delegates to a CheckpointStore. The interface is intentionally narrow: Save persists; Load returns the most-recent persisted record for the given exec id, or (nil, nil) if absent. All methods must be safe for concurrent use.
type CheckpointSuggester ¶ added in v0.2.5
type CheckpointSuggester interface {
SuggestCheckpoint() error
}
CheckpointSuggester is the optional engine-side interface a host uses to ask the engine to write a Checkpoint at the next safe boundary — typically before a voluntary restart, scale-down, or pod reschedule.
Semantics (advisory, not synchronous):
- The engine SHOULD call its host's Checkpointer at the next point in execution where Checkpoint.Step is well-defined. It is NOT obligated to interrupt itself; SuggestCheckpoint returns immediately with no guarantee that the checkpoint has been written by the time it returns.
- The host typically pairs SuggestCheckpoint with a follow-up Interrupt after a grace period: "save what you can, then stop".
- Engines that have no notion of a step boundary (purely memory-resident, sub-second runs) MAY treat this as a no-op.
- Calling SuggestCheckpoint on an engine that does not implement this interface is impossible by the type system; hosts use SuggestCheckpoint (the helper below) which type-asserts and no-ops on engines that do not satisfy the interface.
type Checkpointer ¶
type Checkpointer interface {
Checkpoint(ctx context.Context, cp Checkpoint) error
}
Checkpointer persists a checkpoint at a safe boundary the engine has reached. The host decides whether to actually write; engines must not assume durability.
Hosts without configured checkpointing should make this a no-op (return nil) rather than an error so engines can call it unconditionally.
type Cloneable ¶
type Cloneable interface {
Clone() any
}
Cloneable may be implemented by values stored in Board vars to provide a type-safe deep copy instead of the reflection fallback used by Board.Snapshot and RestoreBoard.
type Dependencies ¶
type Dependencies struct {
// contains filtered or unexported fields
}
Dependencies is a typed dependency-injection container that an engine host (typically the agent runtime) populates at build time and that engine implementations consume at run time.
The container is keyed by an opaque any (use a typed key constant per dependency, never a bare string) to keep the lookup explicit and to discourage stringly-typed coupling. Concurrent reads after Build are safe; mutations through Set/Remove acquire a write lock so a host can adjust the container at any time.
This replaces graph.Dependencies / workflow.ToolDeps and intentionally has no engine-specific knowledge.
func NewDependencies ¶
func NewDependencies() *Dependencies
NewDependencies creates an empty dependency container.
func (*Dependencies) Clone ¶ added in v0.3.4
func (d *Dependencies) Clone() *Dependencies
Clone returns a new Dependencies with a shallow copy of d's items. Stored values are NOT deep-copied — callers that mutate a stored value via the clone WILL also mutate the original (this matches the standard "container clone" semantics: shape is independent, payload is shared).
Cloning is the canonical way for callers that own a base Dependencies (e.g. an agent.Agent's per-run wiring) to derive a per-call container with extra entries (e.g. agent.Run promoting agent.Agent.Tools into [depname.ToolAllowedNames]) without poisoning the caller's container.
Cloning a nil receiver returns nil so callers can chain Clone without a guard. The returned container is independently lockable.
func (*Dependencies) Get ¶
func (d *Dependencies) Get(key any) (any, bool)
Get returns the dependency for the given key, untyped.
func (*Dependencies) Has ¶
func (d *Dependencies) Has(key any) bool
Has reports whether the given key is present.
func (*Dependencies) Remove ¶
func (d *Dependencies) Remove(key any)
Remove deletes a dependency. Missing keys are a no-op.
func (*Dependencies) Set ¶
func (d *Dependencies) Set(key, value any)
Set stores a dependency under the given key. Overwrites any existing value for the same key.
type Describer ¶ added in v0.2.5
type Describer interface {
Capabilities() Capabilities
}
Describer is the optional interface an Engine implementation satisfies to advertise its Capabilities. Hosts that need to gate behaviour on capabilities type-assert on Engine; engines that do not implement Describer are treated as having the zero Capabilities (no features claimed) — the safe default.
Kept as a separate optional interface (not folded into Engine) because most engines need only Execute and the SDK contract MUST stay easy to satisfy with a one-method type.
type Engine ¶
type Engine interface {
Execute(ctx context.Context, run Run, host Host, board *Board) (*Board, error)
}
Engine is the deliberately thin contract every local execution engine satisfies so the agent layer can drive it through a uniform shape. Concrete engines (sdk/graph DAG executor, future script- based engines, …) usually expose richer APIs in addition to this method.
Contract:
Execute MUST run to completion, until interrupted, or until ctx is cancelled.
On clean completion, return the final Board (often the same pointer as the input — engines mutate in place by default) and a nil error.
On cooperative interrupt (host sent through host.Interrupts()), return the (partial) Board together with the result of Interrupted. The error then satisfies errdefs.IsInterrupted(err) and can be destructured into an InterruptedError for the cause.
On ctx cancellation, return the (partial) Board and ctx.Err().
On any other failure, return the (partial) Board together with a domain error (preferably classified via errdefs). Returning a non-nil board on error lets the host decide whether to commit / discard / persist.
When run.ResumeFrom is non-nil, Execute resumes from that checkpoint instead of starting fresh. See [Run.ResumeFrom] for the resume contract; engines that do not support resume MUST return an errdefs.NotAvailable-classified error rather than silently restarting.
Engines MUST NOT close any host-owned channel and MUST NOT mutate run.Attributes or run.ResumeFrom.
func WithCapabilities ¶ added in v0.3.4
func WithCapabilities(eng Engine, caps Capabilities) Engine
WithCapabilities wraps eng so it advertises caps via the Describer interface. Use it when the underlying engine cannot add a Capabilities() method itself — typically EngineFunc-based adapters where the engine value is a function type and methods cannot carry extra state.
Example:
eng := engine.WithCapabilities(
engine.EngineFunc(func(...) (...) { ... }),
engine.Capabilities{
EmitsCheckpoint: true,
RequiredDepNames: []string{depname.LLMClient},
})
Wrapping is a no-op view: the wrapper forwards every call to the underlying engine and also satisfies any optional interface the underlying engine satisfies (Resumer / CheckpointSuggester) via type-assertion delegation in the per-feature helpers (AsResumer / SuggestCheckpoint). The capability claim is the only behaviour the wrapper adds.
type EngineFunc ¶
EngineFunc adapts a plain function to the Engine interface. Useful for test doubles and trivial engines.
type Host ¶
type Host interface {
Publisher
Interrupter
UserPrompter
Checkpointer
UsageReporter
}
Host is the contract a runtime exposes to a running engine.
Host is intentionally a *composition* of small, single-method interfaces — Publisher, Interrupter, UserPrompter, Checkpointer. The composition exists to keep [Engine.Execute] readable; downstream code (graph nodes, tools, …) should depend on the smallest interface it actually needs:
// A pure-mapping node only emits events:
func (n *MapNode) Execute(ctx, board, pub engine.Publisher) error
// A streaming LLM node also wants the interrupt channel:
func (n *LLMNode) Execute(ctx, board,
pub engine.Publisher, intr engine.Interrupter) error
Host implementations must be safe for concurrent use. The engine may invoke any method from any goroutine.
func ComposeHost ¶ added in v0.2.5
func ComposeHost(base Host, mws ...HostMiddleware) Host
ComposeHost returns base wrapped by every middleware in mws, in declaration order (first = outermost). Returns base unchanged when mws is empty.
func HostFromContext ¶ added in v0.3.4
HostFromContext returns the Host attached to ctx by a previous WithHost call, plus an "ok" flag. The ok=false branch means either no engine wired the host into ctx (the extension is running outside an engine) or the caller passed a nil Host.
Extensions that require host capabilities should treat ok=false as a usage error and surface it via the extension's own error contract (e.g. ask_user surfaces errdefs.NotAvailable so LLMs see "I cannot prompt the user" instead of crashing).
type HostFuncs ¶ added in v0.2.5
type HostFuncs struct {
Inner Host
PublishFn func(ctx context.Context, env event.Envelope) error
InterruptsFn func() <-chan Interrupt
AskUserFn func(ctx context.Context, prompt UserPrompt) (UserReply, error)
CheckpointFn func(ctx context.Context, cp Checkpoint) error
ReportUsageFn func(ctx context.Context, usage model.TokenUsage) error
}
HostFuncs is the func-field adapter that lets a middleware decorate just the Host methods it cares about while delegating the rest to an underlying Host. Construct one with the inner host as Inner and override only the func fields you need:
wrapped := engine.HostFuncs{
Inner: base,
ReportUsageFn: func(ctx context.Context, u model.TokenUsage) error {
// budget enforcement here
return base.ReportUsage(ctx, u)
},
}
Every nil func field falls through to Inner so partial decorators stay short. Inner MUST be non-nil; a nil Inner is a programming bug and triggers a panic at the first delegated call.
func (HostFuncs) Checkpoint ¶ added in v0.2.5
func (h HostFuncs) Checkpoint(ctx context.Context, cp Checkpoint) error
Checkpoint routes through CheckpointFn or Inner.
func (HostFuncs) Interrupts ¶ added in v0.2.5
Interrupts routes through InterruptsFn or Inner.
func (HostFuncs) ReportUsage ¶ added in v0.2.5
ReportUsage routes through ReportUsageFn or Inner.
type HostMiddleware ¶ added in v0.2.5
HostMiddleware decorates a Host with policy / observability / resource-management behaviour. The pod / agent layer typically stacks several middlewares (audit → rate-limit → budget → secret-resolve) around a base Host built from a PodSpec; this type and the ComposeHost helper exist so the stack can be declared as a slice instead of N levels of struct embedding.
Convention:
- Middleware ordering matches the slice order: ComposeHost(base, A, B, C) returns C(B(A(base))). The first middleware in the slice is the OUTERMOST wrapper and therefore runs first when an engine calls a Host method. This matches how HTTP middleware stacks are normally declared.
- Each middleware MUST return a Host value that delegates unchanged for any sub-interface it does not specifically decorate. The HostFuncs adapter is provided to make this easy: zero-value func fields delegate to the wrapped Host.
- Middlewares are invoked from any goroutine — implementations must be safe for concurrent use, mirroring the Host contract.
func TracingMiddleware ¶ added in v0.2.9
func TracingMiddleware() HostMiddleware
TracingMiddleware returns a HostMiddleware that wraps every Host method call in an OTel span. It is the default observability layer for engines that emit envelopes, checkpoint, or talk back to users — making the cross-process surface visible in any trace UI without per-engine instrumentation.
Spans created (one per Host call):
- engine.host.publish — attribs: messaging.destination (subject), event.id, event.run_id (from envelope headers when present)
- engine.host.checkpoint — attribs: checkpoint.run_id, checkpoint.node_id, checkpoint.seq
- engine.host.ask_user — attribs: prompt.kind
- engine.host.report_usage — attribs: usage.model, usage.input_tokens, usage.output_tokens
Errors set the span status to Error and record the error.
Span lifetimes are tight (the wrapped call) so this middleware does NOT decorate Interrupts() — that returns a long-lived channel and a span around it would either be a single point-in- time event (uninteresting) or last for the entire run (better modeled by the engine's own outer span).
Compose it with other middlewares using ComposeHost; place it near the OUTER end of the stack so its spans wrap the work done by inner middlewares (rate limiting, budget enforcement, etc.).
type Interrupt ¶
Interrupt is the value the host sends through Host.Interrupts() to ask the running engine to stop. It is also the payload of the Interrupted error so the host can introspect why.
Interrupt is a plain value — copy it freely.
type InterruptedError ¶
type InterruptedError = interruptedErr
InterruptedError is the concrete error type returned by Interrupted. It is exported so hosts can use errors.As to destructure it; the preferred way to produce one is Interrupted, not direct construction.
Implements errdefs interrupted-marker so errdefs.IsInterrupted returns true on any error wrapping (or equal to) one of these.
type Interrupter ¶
type Interrupter interface {
Interrupts() <-chan Interrupt
}
Interrupter exposes the host's cooperative-interrupt channel.
Engines select on the returned channel between steps:
select {
case intr := <-h.Interrupts():
return engine.Interrupted(intr)
case <-ctx.Done():
return ctx.Err()
default:
}
A nil channel means "no cooperative interrupt available"; engines should treat it as "never fires" — receiving on nil blocks forever, which is the correct semantic.
type LoadAndResumeOption ¶ added in v0.2.9
type LoadAndResumeOption func(*loadAndResumeOpts)
LoadAndResumeOption tunes LoadAndResume behaviour.
func WithFreshStartAllowed ¶ added in v0.2.9
func WithFreshStartAllowed(allowed bool) LoadAndResumeOption
WithFreshStartAllowed controls behaviour when the store has no checkpoint for run.ID. Default true — execute fresh. Pass false to require an existing checkpoint and surface errdefs.NotFound when none is present (useful for "resume only" admin commands).
func WithResumeAttempt ¶ added in v0.2.9
func WithResumeAttempt(n int) LoadAndResumeOption
WithResumeAttempt sets [ResumeContext.Attempt]. The default of 1 is correct on fresh runs; supervisors implementing retry loops should bump this for each re-attempt.
func WithResumeSignal ¶ added in v0.2.9
func WithResumeSignal(s string) LoadAndResumeOption
WithResumeSignal sets [ResumeContext.Signal] for the about-to-run execution. Defaults to "manual".
func WithResumeStartedAt ¶ added in v0.2.9
func WithResumeStartedAt(t time.Time) LoadAndResumeOption
WithResumeStartedAt sets [ResumeContext.StartedAt]. Default is time.Now() at the moment LoadAndResume is invoked. Pass an earlier timestamp when continuing a long-running run so dashboard "total wall time" remains accurate.
type NoopCheckpointStore ¶
type NoopCheckpointStore struct{}
NoopCheckpointStore drops every checkpoint and reports no state. Use as a default when checkpointing is not configured.
func (NoopCheckpointStore) Load ¶
func (NoopCheckpointStore) Load(context.Context, string) (*Checkpoint, error)
Load satisfies CheckpointStore.
func (NoopCheckpointStore) Save ¶
func (NoopCheckpointStore) Save(context.Context, Checkpoint) error
Save satisfies CheckpointStore.
type NoopHost ¶
type NoopHost struct{}
NoopHost is a zero-cost Host implementation that discards events, never reports interrupts, refuses user prompts, and skips checkpoints. It is meant for tests and embedded scenarios where an engine is invoked outside any real runtime.
func (NoopHost) AskUser ¶
AskUser returns errdefs.NotAvailable so engines can detect that user interaction is unsupported in this host.
func (NoopHost) Checkpoint ¶
func (NoopHost) Checkpoint(context.Context, Checkpoint) error
Checkpoint discards the checkpoint.
func (NoopHost) Interrupts ¶
Interrupts returns nil so engines that select on it block forever on that case (i.e. interrupts never fire under NoopHost).
func (NoopHost) ReportUsage ¶
ReportUsage discards the usage report. NoopHost has no budget so always returns nil — engines never see BudgetExceeded under it.
type Publisher ¶
Publisher emits a single event envelope.
Subject schema is NOT owned by this package: the host decides what the routing keys look like. Engines simply produce envelopes whose subject they construct from whatever convention their host has agreed with the consumer side.
Publish errors MUST NOT cause the producing engine to fail the run by default; the engine should log/record and continue. Backpressure or transport failures are an observability concern, not a control- flow signal.
type ResumeContext ¶ added in v0.2.9
type ResumeContext struct {
// Attempt is 1-based: the very first attempt (fresh run, no
// resume) is 1; the first re-execution after a checkpoint is
// 2; and so on. Hosts that limit retry budget read this field.
Attempt int
// StartedAt is the wall-clock time the original [Run] began.
// Stays constant across resumes so dashboards can compute
// total wall time (e.g. SLO budget burn).
StartedAt time.Time
// Signal identifies the trigger that produced THIS resume.
// Convention (extensible — engines/middleware MUST treat
// unknown values as opaque):
//
// - "manual" — operator clicked Resume / CLI
// - "interrupt-recovery" — host re-executed after a host.Interrupts() Stop
// - "schedule" — cron / queue replayer
// - "crash" — supervisor restarted after process exit
// - "retry" — automated retry on classified failure
Signal string
// CheckpointAt is when the checkpoint that fuels this resume
// was originally produced (the engine's view of "paused at").
// Empty time.Time on fresh starts.
CheckpointAt time.Time
}
ResumeContext is the auxiliary metadata about the *current* resume attempt. Engines, observers and middleware read it from context.Context to differentiate fresh starts from resumes, count attempts in retry loops, and surface "this is replay" indicators in trace UIs.
ResumeContext is distinct from [Run.ResumeFrom]: the checkpoint describes WHERE the engine should pick up; ResumeContext describes WHY the resume is happening (who triggered it, which attempt this is, how long the original run has been alive). Keep them separate so hosts can drive replay + retry semantics without mutating the checkpoint payload.
func ResumeContextFromContext ¶ added in v0.2.9
func ResumeContextFromContext(ctx context.Context) (ResumeContext, bool)
ResumeContextFromContext returns the ResumeContext attached to ctx, plus an "ok" flag. The ok=false branch means the engine is running a fresh start (or the host did not bother to populate the context — engines should treat both cases identically).
type Resumer ¶ added in v0.2.9
type Resumer interface {
// CanResume returns nil if the given checkpoint is resumable by
// this engine, or a classified error explaining why not:
//
// - errdefs.Validation: checkpoint shape is wrong (engine kind
// mismatch, missing required Payload fields, ExecID
// conflict).
// - errdefs.NotAvailable: engine recognises the checkpoint but
// cannot resume it (incompatible engine version, removed
// node type, etc.).
//
// Implementations MUST be cheap (no I/O, no LLM calls); the
// probe runs synchronously on the host's resume path.
CanResume(cp Checkpoint) error
}
Resumer is the optional capability an Engine may advertise to signal it can drive [Run.ResumeFrom] and to short-circuit obvious mismatches *before* the host spins up an execution.
The interface is intentionally minimal — a single CanResume probe — so engines opt in cheaply and hosts can build resume tooling (admin UIs, CLI commands, supervised retry loops) without a full dry-run.
Engines that do NOT implement Resumer remain fully spec-compliant: the [Run.ResumeFrom] contract still applies and they MUST return an errdefs.NotAvailable-classified error if asked to resume. The helpers below (IsResumable, LoadAndResume) treat the absence of Resumer as "engine handles resume opaquely; trust Execute to surface the right error".
func AsResumer ¶ added in v0.2.9
AsResumer is the typed counterpart of IsResumable. It walks any WithCapabilities-style wrappers via Unwrap (errors.As-style) so a Resumer wrapped to advertise additional capabilities still surfaces correctly.
type Run ¶
type Run struct {
// ID is a unique identifier for this engine execution. Engines
// use it as a correlation key in telemetry and may include it in
// any subjects/headers their host's subject schema requires.
//
// The host generates ID and is responsible for keeping it stable
// across resume / checkpoint cycles.
ID string
// ParentRunID identifies the parent engine.Run when this Run was
// dispatched by another agent (multi-agent call chains). Empty
// for top-level runs. Promoted to a typed field — separate from
// Attributes — so loop / depth detection at the pod layer can
// rely on the contract rather than hoping every engine remembers
// to populate the same string attribute key.
ParentRunID string
// Attributes carries arbitrary host-supplied metadata that should
// flow into telemetry spans and event headers (tenant id, agent
// id, parent span links, engine kind, …).
//
// Convention: keys MUST use the constants in sdk/telemetry
// (`telemetry.AttrTenantID`, `telemetry.AttrAgentID`,
// `telemetry.AttrEngineKind`, …) so cross-package consumers
// (dashboards, log queries) can filter without per-package
// translation rules. There is intentionally no dedicated typed
// field for those values — the typed slot is reserved for fields
// the engine contract itself depends on (currently ParentRunID
// for loop detection, ResumeFrom for recovery).
Attributes map[string]string
// Deps is the typed dependency container the host has populated
// for this run (LLM clients, tool registries, retrievers, …).
// May be nil if the engine needs no dependencies; engines that
// look up dependencies should use [GetDep] which handles nil.
Deps *Dependencies
// ResumeFrom, when non-nil, instructs the engine to continue
// execution from the provided checkpoint instead of starting a
// fresh run. The engine is the sole interpreter of
// [Checkpoint.Step] and [Checkpoint.Payload]; the host treats
// them as opaque.
//
// Contract:
//
// - When ResumeFrom is non-nil the engine SHOULD prefer
// ResumeFrom.Board over the board parameter passed to
// [Engine.Execute]; passing both is allowed but the
// checkpoint's board takes precedence as it represents the
// state at the boundary the run paused on.
//
// - When ResumeFrom.ExecID differs from [Run.ID] the engine MUST
// return an errdefs.Validation-classified error: forking a
// run requires a fresh execution id, not a resume.
//
// - Engines that do not support resume MUST return an
// errdefs.NotAvailable-classified error when they observe a
// non-nil ResumeFrom rather than silently restarting from
// scratch.
//
// Hosts that drive resumption typically [CheckpointStore.Load]
// the most recent checkpoint, set ResumeFrom, and call
// [Engine.Execute] again with the same Run.ID.
ResumeFrom *Checkpoint
}
Run is the per-execution input bundle an engine receives alongside the host. It is a plain data struct — no methods, no builder, no hidden state — assembled once by the host and passed to [Engine.Execute] read-only.
All fields are conceptually immutable for the duration of the run. Engines may read freely; they MUST NOT mutate the maps in place nor mutate the referenced ResumeFrom checkpoint.
type StreamDeltaPayload ¶
type StreamDeltaPayload struct {
// Type discriminates the payload variant. See StreamDeltaType
// constants for the standard values.
Type StreamDeltaType `json:"type"`
// Content carries the assistant text for "token" and the tool
// output (typically already serialised) for "tool_result".
Content string `json:"content,omitempty"`
// ID is the tool-call identifier the model assigned. Set on
// "tool_call" only; for "tool_result" use ToolCallID instead.
ID string `json:"id,omitempty"`
// Name is the tool name. Set on "tool_call"; recommended on
// "tool_result" so consumers can correlate without a separate
// dispatch table.
Name string `json:"name,omitempty"`
// Arguments is the tool input the model produced. Engines MAY
// pass it as either a string (raw JSON) or an already-decoded
// map / slice — consumers should accept both.
Arguments any `json:"arguments,omitempty"`
// ToolCallID pairs a "tool_result" with the originating
// "tool_call". Required on tool_result.
ToolCallID string `json:"tool_call_id,omitempty"`
// IsError reports whether the tool dispatch returned an error
// payload (vs. a successful result). Set on "tool_result".
IsError bool `json:"is_error,omitempty"`
// Cancelled reports whether this tool_result is a synthesised
// cancellation (the call was never dispatched because the round
// was interrupted). Set on "tool_result" only.
Cancelled bool `json:"cancelled,omitempty"`
}
StreamDeltaPayload is the canonical decoded shape of a SubjectStreamDelta envelope's payload.
Engines MUST emit payloads that JSON-decode into this struct; the runtime constraint is checked by DecodeStreamDelta. Engines MAY add fields beyond what this struct lists — the JSON decoder is permissive on unknowns — but consumers SHOULD only rely on the fields documented here.
Per-Type field requirements:
Type Required Recommended ------------ -------------------- -------------------- token Content — tool_call ID, Name Arguments tool_result ToolCallID, Content Name, IsError, Cancelled
func DecodeStreamDelta ¶
func DecodeStreamDelta(env event.Envelope) (StreamDeltaPayload, error)
DecodeStreamDelta extracts the payload of a stream-delta envelope. It returns an error when the envelope payload is empty or does not JSON-decode into StreamDeltaPayload. It does NOT verify the subject; callers may pre-filter with IsStreamDelta when iterating a mixed stream.
type StreamDeltaType ¶
type StreamDeltaType string
StreamDeltaType enumerates the kinds of in-flight increments a stream envelope can carry. Engines MUST set [StreamDeltaPayload.Type] to one of these values; consumers SHOULD treat unknown values as forward- compatible additions and skip them.
const ( // StreamDeltaToken is one piece of generated assistant text. // Required field: Content. StreamDeltaToken StreamDeltaType = "token" // StreamDeltaToolCall is one tool invocation the model just // requested. Required fields: ID, Name. Recommended: Arguments. StreamDeltaToolCall StreamDeltaType = "tool_call" // StreamDeltaToolResult is the outcome of one tool invocation — // either the actual result, or a synthesised cancellation when // the round was interrupted before the call dispatched. // Required fields: ToolCallID, Content. Recommended: Name, // IsError, Cancelled. StreamDeltaToolResult StreamDeltaType = "tool_result" )
type StreamRouter ¶ added in v0.2.9
type StreamRouter struct {
// contains filtered or unexported fields
}
StreamRouter forwards stream deltas (and optionally the rest of a run's lifecycle events) from one event.Bus to a dynamic set of sinks. It owns one subscription per run and tears it down automatically when the run's `engine.run.<id>.end` envelope is observed, so callers do not have to thread cleanup through their transport layer.
Typical use inside an HTTP handler that streams an SSE response:
router := engine.NewStreamRouter(bus,
engine.WithStreamSinkErrorHandler(func(id string, err error) {
log.Warn("sink error", "sink", id, "err", err)
}),
)
defer router.Close()
stop, err := router.Attach(runID, "sse-"+reqID, sseSink)
if err != nil { ... }
defer stop() // detaches when the request body closes
Multiple sinks may be attached to the same runID concurrently; each receives every delta. Attaching to a runID that has not yet produced events is fine — the underlying subscription is created lazily on first Attach, so the router observes events emitted after the call.
func NewStreamRouter ¶ added in v0.2.9
func NewStreamRouter(bus event.Bus, opts ...StreamRouterOption) *StreamRouter
NewStreamRouter constructs a router bound to bus. The router holds a single root context whose cancellation tears down every active subscription on [Close]. nil bus is rejected.
func (*StreamRouter) Attach ¶ added in v0.2.9
func (r *StreamRouter) Attach(runID, sinkID string, sink StreamSink) (detach func(), err error)
Attach registers sink for runID, returning a detach function the caller MUST invoke when the transport closes (deferred-friendly). sinkID identifies the attachment in error reports; pick something stable so logs can be correlated.
Returns an error if the router has been Close-d. Re-attaching a previously-detached sinkID is allowed.
func (*StreamRouter) Close ¶ added in v0.2.9
func (r *StreamRouter) Close() error
Close tears down every active fanout and waits for their loops to drain. Subsequent Attach calls return an error. Close is idempotent.
type StreamRouterOption ¶ added in v0.2.9
type StreamRouterOption func(*streamRouterOpts)
StreamRouterOption tunes NewStreamRouter behaviour.
func WithStreamBufferSize ¶ added in v0.2.9
func WithStreamBufferSize(n int) StreamRouterOption
WithStreamBufferSize sets the underlying subscription buffer. Default 256 — ample for typical token streams without consuming much memory. Pass via NewStreamRouter OR per-attachment via WithStreamSubOptions.
func WithStreamIncludeAllRunEvents ¶ added in v0.2.9
func WithStreamIncludeAllRunEvents() StreamRouterOption
WithStreamIncludeAllRunEvents switches the router to subscribe against PatternRun (everything for the run) instead of just PatternRunStream. Useful for transports that mirror the full event log (run.start / step.complete / etc.) rather than just stream deltas. When enabled, sinks receive the raw envelope but the decoded delta is the zero value for non-stream events; consumers should branch on IsStreamDelta before reading delta fields.
func WithStreamSinkErrorHandler ¶ added in v0.2.9
func WithStreamSinkErrorHandler(fn func(sinkID string, err error)) StreamRouterOption
WithStreamSinkErrorHandler installs a callback invoked once per sink-returned error. Defaults to a no-op (errors silently dropped) — observability of sink failures is the caller's responsibility.
func WithStreamSubOptions ¶ added in v0.2.9
func WithStreamSubOptions(opts ...event.SubOption) StreamRouterOption
WithStreamSubOptions appends opaque event.SubOption values to the router's bus subscription (e.g. WithBackpressure, custom predicates). The router prepends WithBufferSize from WithStreamBufferSize so callers cannot accidentally clobber the configured buffer.
type StreamSink ¶ added in v0.2.9
type StreamSink interface {
OnDelta(ctx context.Context, env event.Envelope, delta StreamDeltaPayload) error
}
StreamSink is the consumer-side counterpart of the EmitStream* helpers. A sink receives one decoded StreamDeltaPayload at a time along with its source envelope (for headers / trace ids / raw subject access) and forwards it to whatever transport the caller cares about — SSE, WebSocket, WebRTC datachannel, log file, metrics counter, etc.
Implementations:
- MUST be safe for concurrent OnDelta calls; the router below fans out to multiple sinks from one goroutine but a custom consumer may use the type from many.
- SHOULD return errors only for unrecoverable failures (closed transport, broken pipe). Returned errors propagate to the router's per-sink error log; they do NOT abort delivery to other sinks attached to the same run.
- MUST NOT block longer than the transport's natural backoff; long-running work belongs in a worker goroutine that the sink drains into.
StreamSinkFunc is the canonical func adapter.
type StreamSinkFunc ¶ added in v0.2.9
StreamSinkFunc is a func adapter for StreamSink. Use it to inline a sink without declaring a named type:
router.Attach(runID, engine.StreamSinkFunc(func(ctx, env, d) error {
return sse.WriteJSON(ctx, d)
}))
func (StreamSinkFunc) OnDelta ¶ added in v0.2.9
func (f StreamSinkFunc) OnDelta(ctx context.Context, env event.Envelope, delta StreamDeltaPayload) error
OnDelta implements StreamSink.
type UsageReporter ¶
type UsageReporter interface {
ReportUsage(ctx context.Context, usage model.TokenUsage) error
}
UsageReporter accepts incremental LLM token-usage reports an engine observes during a run. Each call adds delta usage; the host is responsible for accumulation, billing, and downstream telemetry.
Engines should call ReportUsage once per LLM invocation that returns usage metadata (typical: streaming nodes call it on completion with the per-call totals). Reports SHOULD be best-effort for *observability* failures — a slow exporter must not block forward progress.
Budget enforcement contract:
- The host MAY return errdefs.BudgetExceeded (or any error classified by errdefs.IsBudgetExceeded) to signal that the accumulated usage has crossed a configured budget and the next LLM call would not be authorised.
- Engines that observe such an error MUST stop performing further LLM-cost-incurring work in this run and return the error from Execute (typically wrapped). Continuing would defeat the budget.
- Any other non-nil error is observability-only — engines SHOULD log/swallow and continue, matching the pre-budget contract.
Hosts without billing or budget enforcement return nil unconditionally (see NoopHost.ReportUsage).
type UserPrompt ¶
type UserPrompt struct {
Parts []model.Part
Schema []byte
Source string
Metadata map[string]string
}
UserPrompt describes what the engine is asking the host to relay to the end user. It deliberately stays one level below "chat message":
- Parts carries the multi-modal payload (text, image, audio, file, structured data) using model.Part — the same building block that model.Message uses, minus the chat-specific Role.
- Schema is an optional structured-input hint (JSON-schema-shaped bytes) for cases where the host wants to render a form or validate the response.
- Source identifies the engine step that produced the prompt; useful for trace correlation and resume.
- Metadata is free-form host-passed-through metadata.
Why []model.Part rather than model.Message: a Message also carries Role (system/user/assistant/tool), which is a chat-layer concept the engine has no business naming. Parts give us full multi-modality (image, audio, file, data) without tying the engine to chat semantics — the agent layer wraps Parts back into a Message with the right Role on its way out, and unwraps user-supplied Parts on the way in.
type UserPrompter ¶
type UserPrompter interface {
AskUser(ctx context.Context, prompt UserPrompt) (UserReply, error)
}
UserPrompter lets an engine ask the host to prompt the end user (chat input, voice DTMF, structured form, …) and block until the reply arrives.
Hosts that don't expose user interaction should return an errdefs.NotAvailable-classified error. Engines that get such an error from a step that strictly needs user input should propagate it so the host can decide whether to fail or fall back.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package depname enumerates the conventional string identifiers engines use when declaring [engine.Capabilities.RequiredDepNames] and when looking values up in [engine.Dependencies].
|
Package depname enumerates the conventional string identifiers engines use when declaring [engine.Capabilities.RequiredDepNames] and when looking values up in [engine.Dependencies]. |
|
Package enginetest provides reusable contract-test machinery for the interfaces declared in sdk/engine — engine.Engine and engine.Host today, more if the engine package grows.
|
Package enginetest provides reusable contract-test machinery for the interfaces declared in sdk/engine — engine.Engine and engine.Host today, more if the engine package grows. |