Documentation
¶
Overview ¶
Package step is the determinism boundary. Non-deterministic ops (clock, randomness, LLM, tools) route through this package so the runtime can record and replay them.
Index ¶
- Constants
- Variables
- func CallTool(ctx context.Context, call ToolCall) (json.RawMessage, error)
- func Emit[T any](ctx context.Context, c *Context, kind event.Kind, payload T) error
- func LLMCall(ctx context.Context, req *provider.Request) (resp *provider.Response, err error)
- func NewCallID() string
- func Now(ctx context.Context) time.Time
- func Random(ctx context.Context) uint64
- func ReplayDurationMs(ctx context.Context, fallback int64) int64
- func SideEffect[T any](ctx context.Context, name string, fn func() (T, error)) (T, error)
- func WithContext(parent context.Context, c *Context) context.Context
- type BudgetConfig
- type Config
- type Context
- type MetricsSink
- type MismatchClass
- type MismatchError
- type Mode
- type Registry
- type ToolCall
- type ToolResult
Constants ¶
const DefaultMaxParallelTools = 8
DefaultMaxParallelTools is the fan-out cap used by CallTools when Config.MaxParallelTools is zero.
Variables ¶
var ErrBudgetExceeded = errors.New("step: budget exceeded")
ErrBudgetExceeded is returned by LLMCall when the pre-call input-token estimate exceeds BudgetConfig.MaxInputTokens. The matching BudgetExceeded event is emitted before the error is returned. Callers (typically the agent loop) wrap this into RunFailed.
var ErrInvalidConfig = fmt.Errorf("step: invalid Config")
ErrInvalidConfig is returned by NewContext when cfg lacks a required field (Log, RunID) or combines ModeReplay without Recorded.
var ErrInvalidStream = errors.New("step: invalid provider stream")
ErrInvalidStream is returned when the provider stream violates the chunk state machine.
var ErrMissingRawResponseHash = errors.New("step: provider returned empty RawResponseHash")
ErrMissingRawResponseHash is returned when ChunkEnd lacks a 32-byte hash and RequireRawResponseHash is set.
var ErrReplayMismatch = errors.New("step: replay mismatch")
ErrReplayMismatch is the sentinel every mismatch wraps. Use errors.Is to detect a divergence; errors.As against *MismatchError for structured fields.
var ErrToolNotFound = errors.New("step: tool not found")
ErrToolNotFound is returned by CallTool when the requested tool name is not in the Registry. A ToolCallFailed event with ErrorType="tool" is emitted before the error is returned.
Functions ¶
func CallTool ¶
CallTool invokes the named tool, emitting ToolCallScheduled before and ToolCallCompleted/Failed after.
ErrorType classification:
- "panic" — tool panicked (wraps tool.ErrPanicked)
- "cancelled" — ctx cancelled / deadline exceeded
- "tool" — ErrToolNotFound or any error returned by the tool
Retry (Idempotent+MaxAttempts>1): every attempt emits its own Scheduled+Completed/Failed pair with Attempt:n. Retries only on tool.ErrTransient; ctx errors and ErrToolNotFound are terminal.
Panics if ctx has no step.Context.
func Emit ¶
Emit writes a typed event payload into the run's log, advancing the hash chain under c.mu. Intended for use by the agent loop (root starling package) which emits RunStarted and terminal events itself — step's own helpers use the unexported emit directly.
kind must match the payload type. Safe for concurrent use.
func LLMCall ¶
LLMCall performs a single streaming completion. Emits TurnStarted → (ReasoningEmitted)* → AssistantMessageCompleted and returns the aggregated Response.
Pre-call input-token budget is enforced against MaxInputTokens; on breach, emits BudgetExceeded and returns ErrBudgetExceeded (no TurnStarted). On mid-stream error or ctx cancellation, no AssistantMessageCompleted is emitted — the agent loop emits the terminal RunFailed / RunCancelled event.
Panics if ctx has no step.Context or the Context has no Provider.
func NewCallID ¶
func NewCallID() string
NewCallID returns a fresh ULID for ToolCall.CallID. Use it when the caller needs to know the ID before invoking CallTool.
func Now ¶
Now returns the current wall-clock time. Live mode reads Context.clockFn and records the value as nanoseconds; replay mode returns the recorded value. In both modes a SideEffectRecorded event is emitted so replay re-appends an identical chain. Panics if ctx has no step.Context, the event log rejects the write, or (in replay) the next recorded event doesn't match.
func Random ¶
Random returns a cryptographically random uint64. Live mode draws from crypto/rand and records as SideEffectRecorded{name:"rand"}; replay returns the recorded value. A SideEffectRecorded event is emitted in both modes. Panics on missing ctx, CSPRNG failure, or replay mismatch.
func ReplayDurationMs ¶
ReplayDurationMs returns the duration_ms value from the event recorded at the next seq position during replay; in live mode it returns fallback unchanged. Wall-clock durations are inherently non-deterministic (live and replay take different amounts of time, especially under the race detector), so any event carrying a DurationMs field must route its value through this helper before emission to keep emit-compare happy.
Applies to ToolCallCompleted, ToolCallFailed, RunCompleted, RunCancelled, and RunFailed — every event whose payload includes a `duration_ms` CBOR field. The helper decodes only that one field, so it's robust to unrelated schema additions on those payload types.
func SideEffect ¶
SideEffect records arbitrary non-determinism: HTTP, filesystem, anything beyond clock/RNG. Live mode runs fn and records its result under name; replay decodes the recorded value without invoking fn and re-emits a matching SideEffectRecorded event so the replayed chain stays aligned. fn errors are propagated unrecorded (replay re-runs fn). T must be CBOR-serialisable. Panics on missing ctx or replay decode failure.
Types ¶
type BudgetConfig ¶
BudgetConfig holds the budget caps enforced inside the step package. MaxInputTokens is checked pre-call; MaxOutputTokens and MaxUSD are checked mid-stream after every ChunkUsage. Wall-clock enforcement lives at the agent level (via context.WithDeadline) so it can preempt blocking calls the step layer doesn't control.
Zero on any field disables that axis.
type Config ¶
type Config struct {
Log eventlog.EventLog
RunID string
Provider provider.Provider
Tools *Registry
Budget BudgetConfig
// Mode selects live vs replay. Zero value is ModeLive.
Mode Mode
// Recorded is the pre-captured event stream consumed by replay-mode
// non-determinism helpers. Required when Mode == ModeReplay; ignored
// otherwise. NewContext panics on ModeReplay with nil Recorded.
Recorded []event.Event
// ClockFn overrides the wall-clock source used by step.Now. Defaults
// to time.Now. Tests inject a fake clock; under ModeReplay it is
// never invoked (the recorded value is returned).
ClockFn func() time.Time
// MaxParallelTools caps concurrent tool executions dispatched by
// CallTools. Zero selects the default (8). A value of 1 effectively
// serializes parallel dispatch, useful for debugging. Ignored by
// single-tool CallTool.
MaxParallelTools int
// Logger receives structured records from the step helpers for
// budget trips and tool retries. If nil, the Context falls back to
// a discard handler — step code never panics on a missing logger.
// The agent loop sets this from starling.Config.Logger with run_id
// already bound.
Logger *slog.Logger
// ResumeFromSeq and ResumeFromPrevHash seed the chain cursor for
// resumed runs. Both are zero/nil for a fresh run (the default: the
// first emitted event is seq=1 with empty PrevHash). Set by
// (*Agent).Resume to the last-recorded seq and its event-hash so
// the first event this Context emits extends the existing chain.
//
// ResumeFromSeq is the seq of the last event already in the log;
// the first emit uses seq = ResumeFromSeq + 1. ResumeFromPrevHash
// must equal event.Hash(event.Marshal(lastEvent)).
ResumeFromSeq uint64
ResumeFromPrevHash []byte
// Metrics is an optional observability sink. Nil disables all
// metric recording inside this step.Context. The root starling
// package wires a Prometheus-backed implementation; step exposes
// only the interface so it doesn't pull the client_golang
// dependency into the core loop.
Metrics MetricsSink
// RequireRawResponseHash makes LLMCall fail if the provider's
// ChunkEnd lacks a 32-byte hash.
RequireRawResponseHash bool
// EmitTimeout bounds the per-event Append the step emitter runs
// under context.WithoutCancel. Zero means no timeout (the historical
// behavior). Set this when a hung backend would otherwise block
// terminal/failure-path emits and prevent shutdown.
EmitTimeout time.Duration
}
Config captures the static dependencies a step.Context needs across a run. The agent loop builds one at run start and never mutates it.
Log and RunID are required and validated by NewContext. Provider and Tools are only checked when the corresponding helper is invoked: LLMCall panics if Provider is nil; CallTool returns ErrToolNotFound if Tools is nil. Budget is optional — a zero-valued BudgetConfig disables pre-call input-token enforcement.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
Context is the opaque per-run state attached to every stdlib context.Context inside an agent run. It owns the hash-chain cursor (next seq + previous event hash) so that any step-level emitter sees a consistent view even under concurrent Now/Random/SideEffect calls.
Callers extract it with From and pass it into the non-deterministic helpers in this package. Construction is normally handled by the agent loop (the root starling package); advanced users wiring their own loops may call NewContext directly.
func From ¶
From extracts the Context previously attached via WithContext. When no step.Context has been attached the second return is false and the first is nil — callers that must emit events should treat this as a programmer error.
func MustNewContext ¶
MustNewContext is NewContext without the error return: it panics on invalid config. Useful in test setup and in the agent loop where a bad config is a programmer bug. Production callers should prefer NewContext.
func NewContext ¶
NewContext returns a Context primed to emit the first event (seq=1, prevHash=nil). Log and RunID are required; Provider and Tools are checked lazily by LLMCall and CallTool.
func (*Context) Logger ¶
Logger returns the slog.Logger bound to this run. Never nil: returns a discard logger when the agent was built without one. Intended for the agent loop and downstream tool implementations that want to participate in the run's structured trace.
func (*Context) Metrics ¶
func (c *Context) Metrics() MetricsSink
Metrics returns the MetricsSink this Context records to, or nil when metrics are disabled. Call sites should treat nil as a no-op rather than guarding every method call.
type MetricsSink ¶
type MetricsSink interface {
ObserveProviderCall(model, status string, d time.Duration, promptTokens, completionTokens int64)
ObserveToolCall(toolName, status, errorType string, d time.Duration)
ObserveEventlogAppend(kind, status string, d time.Duration)
ObserveBudgetExceeded(axis string)
}
MetricsSink is the narrow interface step.Context uses to record observability samples. Implementations must be concurrency-safe and must tolerate being called in a tight loop — the eventlog observer runs on every emit.
Implementations may no-op any method; the interface exists only so step doesn't depend on a concrete metrics library.
type MismatchClass ¶
type MismatchClass string
MismatchClass classifies why replay diverged.
const ( MismatchExhausted MismatchClass = "exhausted" MismatchKind MismatchClass = "kind" MismatchPayload MismatchClass = "payload" MismatchTurnID MismatchClass = "turn_id" )
type MismatchError ¶
type MismatchError struct {
Seq uint64
Kind event.Kind
ExpectedKind event.Kind
Class MismatchClass
Reason string
}
MismatchError carries the structured details of a replay divergence. Implements errors.Is(ErrReplayMismatch).
func (*MismatchError) Error ¶
func (e *MismatchError) Error() string
func (*MismatchError) Is ¶
func (e *MismatchError) Is(target error) bool
type Mode ¶
type Mode uint8
Mode selects between a live run (side effects execute and are recorded) and a replay run (side effects return pre-recorded values without re-running them).
const ( // ModeLive is the default: helpers execute their effect and emit a // SideEffectRecorded event capturing the result. ModeLive Mode = iota // ModeReplay consumes pre-recorded SideEffectRecorded events from // Config.Recorded in order, returning the stored values instead of // re-running the effect. Used by the replay verifier. ModeReplay )
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry maps tool names to Tool implementations for a single run. It is the runtime-side lookup used by CallTool; the tool package itself stays free of registry concerns so tool authors can define tools without importing runtime state.
A Registry is immutable after construction — NewRegistry copies its inputs — which means callers can safely share one across goroutines.
func NewRegistry ¶
NewRegistry returns a Registry containing the given tools, keyed by each tool's Name(). Duplicate names cause the later entry to win; callers that care about duplicate detection should check themselves before calling.
type ToolCall ¶
type ToolCall struct {
CallID string
TurnID string
Name string
Args json.RawMessage
Idempotent bool
MaxAttempts int
Backoff func(attempt int) time.Duration
}
ToolCall describes a single tool invocation. CallID+TurnID link the invocation back to the AssistantMessageCompleted that planned it. Empty CallID mints a ULID; TurnID has no default (missing TurnID is almost always a caller bug).
Retry: Idempotent=true with MaxAttempts>1 retries on transient failures (errors.Is tool.ErrTransient). Each attempt emits a fresh ToolCallScheduled/Completed/Failed{Attempt:n}. Nil Backoff uses an exponential default (100ms base, ×2, cap 10s, 0–25% jitter). Non-idempotent calls run exactly once regardless of MaxAttempts.
type ToolResult ¶
type ToolResult struct {
CallID string
Result json.RawMessage
Err error
}
ToolResult is the outcome of one tool invocation from CallTools. Result is the tool's JSON output (nil on failure). Err is the tool/registry/panic error. One failing tool does not cancel siblings.
func CallTools ¶
func CallTools(ctx context.Context, calls []ToolCall) ([]ToolResult, error)
CallTools dispatches a batch of tool calls, emitting ToolCallScheduled for every call in input order up front, then running the tools concurrently with a semaphore (cap = Config.MaxParallelTools or DefaultMaxParallelTools when zero). Each tool's completion emits ToolCallCompleted or ToolCallFailed as it finishes, so seq numbers reflect actual completion order — that ordering is the committed ground truth the hash chain ratifies.
A failing tool does not cancel siblings; its error surfaces in the matching ToolResult.Err. Callers decide whether to keep going. The returned slice preserves input order (NOT completion order), so callers can correlate results with the calls they supplied.
Retry (per-call Idempotent + MaxAttempts>1) applies inside each worker. Only the attempt-1 Scheduled events are contiguous in the log; retry Scheduleds land interleaved with sibling Completeds by design, because a retry is contingent on the prior attempt's Failed.
Under ModeReplay, CallTools does NOT fan out — it executes tools sequentially in the order their final Completed/Failed events appear in the recording so the re-emitted payloads land at the same seq as the original run. Byte-for-byte divergence surfaces as ErrReplayMismatch from the underlying emit.
Panics if ctx has no step.Context attached.