step

package
v0.1.0-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package step is the determinism boundary. Non-deterministic ops (clock, randomness, LLM, tools) route through this package so the runtime can record and replay them.

Index

Constants

View Source
const DefaultMaxParallelTools = 8

DefaultMaxParallelTools is the fan-out cap used by CallTools when Config.MaxParallelTools is zero.

Variables

View Source
var ErrBudgetExceeded = errors.New("step: budget exceeded")

ErrBudgetExceeded is returned by LLMCall when the pre-call input-token estimate exceeds BudgetConfig.MaxInputTokens. The matching BudgetExceeded event is emitted before the error is returned. Callers (typically the agent loop) wrap this into RunFailed.

View Source
var ErrInvalidConfig = fmt.Errorf("step: invalid Config")

ErrInvalidConfig is returned by NewContext when cfg lacks a required field (Log, RunID) or combines ModeReplay without Recorded.

View Source
var ErrInvalidStream = errors.New("step: invalid provider stream")

ErrInvalidStream is returned when the provider stream violates the chunk state machine.

View Source
var ErrMissingRawResponseHash = errors.New("step: provider returned empty RawResponseHash")

ErrMissingRawResponseHash is returned when ChunkEnd lacks a 32-byte hash and RequireRawResponseHash is set.

View Source
var ErrReplayMismatch = errors.New("step: replay mismatch")

ErrReplayMismatch is the sentinel every mismatch wraps. Use errors.Is to detect a divergence; errors.As against *MismatchError for structured fields.

View Source
var ErrToolNotFound = errors.New("step: tool not found")

ErrToolNotFound is returned by CallTool when the requested tool name is not in the Registry. A ToolCallFailed event with ErrorType="tool" is emitted before the error is returned.

Functions

func CallTool

func CallTool(ctx context.Context, call ToolCall) (json.RawMessage, error)

CallTool invokes the named tool, emitting ToolCallScheduled before and ToolCallCompleted/Failed after.

ErrorType classification:

  • "panic" — tool panicked (wraps tool.ErrPanicked)
  • "cancelled" — ctx cancelled / deadline exceeded
  • "tool" — ErrToolNotFound or any error returned by the tool

Retry (Idempotent+MaxAttempts>1): every attempt emits its own Scheduled+Completed/Failed pair with Attempt:n. Retries only on tool.ErrTransient; ctx errors and ErrToolNotFound are terminal.

Panics if ctx has no step.Context.

func Emit

func Emit[T any](ctx context.Context, c *Context, kind event.Kind, payload T) error

Emit writes a typed event payload into the run's log, advancing the hash chain under c.mu. Intended for use by the agent loop (root starling package) which emits RunStarted and terminal events itself — step's own helpers use the unexported emit directly.

kind must match the payload type. Safe for concurrent use.

func LLMCall

func LLMCall(ctx context.Context, req *provider.Request) (resp *provider.Response, err error)

LLMCall performs a single streaming completion. Emits TurnStarted → (ReasoningEmitted)* → AssistantMessageCompleted and returns the aggregated Response.

Pre-call input-token budget is enforced against MaxInputTokens; on breach, emits BudgetExceeded and returns ErrBudgetExceeded (no TurnStarted). On mid-stream error or ctx cancellation, no AssistantMessageCompleted is emitted — the agent loop emits the terminal RunFailed / RunCancelled event.

Panics if ctx has no step.Context or the Context has no Provider.

func NewCallID

func NewCallID() string

NewCallID returns a fresh ULID for ToolCall.CallID. Use it when the caller needs to know the ID before invoking CallTool.

func Now

func Now(ctx context.Context) time.Time

Now returns the current wall-clock time. Live mode reads Context.clockFn and records the value as nanoseconds; replay mode returns the recorded value. In both modes a SideEffectRecorded event is emitted so replay re-appends an identical chain. Panics if ctx has no step.Context, the event log rejects the write, or (in replay) the next recorded event doesn't match.

func Random

func Random(ctx context.Context) uint64

Random returns a cryptographically random uint64. Live mode draws from crypto/rand and records as SideEffectRecorded{name:"rand"}; replay returns the recorded value. A SideEffectRecorded event is emitted in both modes. Panics on missing ctx, CSPRNG failure, or replay mismatch.

func ReplayDurationMs

func ReplayDurationMs(ctx context.Context, fallback int64) int64

ReplayDurationMs returns the duration_ms value from the event recorded at the next seq position during replay; in live mode it returns fallback unchanged. Wall-clock durations are inherently non-deterministic (live and replay take different amounts of time, especially under the race detector), so any event carrying a DurationMs field must route its value through this helper before emission to keep emit-compare happy.

Applies to ToolCallCompleted, ToolCallFailed, RunCompleted, RunCancelled, and RunFailed — every event whose payload includes a `duration_ms` CBOR field. The helper decodes only that one field, so it's robust to unrelated schema additions on those payload types.

func SideEffect

func SideEffect[T any](ctx context.Context, name string, fn func() (T, error)) (T, error)

SideEffect records arbitrary non-determinism: HTTP, filesystem, anything beyond clock/RNG. Live mode runs fn and records its result under name; replay decodes the recorded value without invoking fn and re-emits a matching SideEffectRecorded event so the replayed chain stays aligned. fn errors are propagated unrecorded (replay re-runs fn). T must be CBOR-serialisable. Panics on missing ctx or replay decode failure.

func WithContext

func WithContext(parent context.Context, c *Context) context.Context

WithContext returns a derived context.Context carrying c.

Types

type BudgetConfig

type BudgetConfig struct {
	MaxInputTokens  int64
	MaxOutputTokens int64
	MaxUSD          float64
}

BudgetConfig holds the budget caps enforced inside the step package. MaxInputTokens is checked pre-call; MaxOutputTokens and MaxUSD are checked mid-stream after every ChunkUsage. Wall-clock enforcement lives at the agent level (via context.WithDeadline) so it can preempt blocking calls the step layer doesn't control.

Zero on any field disables that axis.

type Config

type Config struct {
	Log      eventlog.EventLog
	RunID    string
	Provider provider.Provider
	Tools    *Registry
	Budget   BudgetConfig

	// Mode selects live vs replay. Zero value is ModeLive.
	Mode Mode

	// Recorded is the pre-captured event stream consumed by replay-mode
	// non-determinism helpers. Required when Mode == ModeReplay; ignored
	// otherwise. NewContext panics on ModeReplay with nil Recorded.
	Recorded []event.Event

	// ClockFn overrides the wall-clock source used by step.Now. Defaults
	// to time.Now. Tests inject a fake clock; under ModeReplay it is
	// never invoked (the recorded value is returned).
	ClockFn func() time.Time

	// MaxParallelTools caps concurrent tool executions dispatched by
	// CallTools. Zero selects the default (8). A value of 1 effectively
	// serializes parallel dispatch, useful for debugging. Ignored by
	// single-tool CallTool.
	MaxParallelTools int

	// Logger receives structured records from the step helpers for
	// budget trips and tool retries. If nil, the Context falls back to
	// a discard handler — step code never panics on a missing logger.
	// The agent loop sets this from starling.Config.Logger with run_id
	// already bound.
	Logger *slog.Logger

	// ResumeFromSeq and ResumeFromPrevHash seed the chain cursor for
	// resumed runs. Both are zero/nil for a fresh run (the default: the
	// first emitted event is seq=1 with empty PrevHash). Set by
	// (*Agent).Resume to the last-recorded seq and its event-hash so
	// the first event this Context emits extends the existing chain.
	//
	// ResumeFromSeq is the seq of the last event already in the log;
	// the first emit uses seq = ResumeFromSeq + 1. ResumeFromPrevHash
	// must equal event.Hash(event.Marshal(lastEvent)).
	ResumeFromSeq      uint64
	ResumeFromPrevHash []byte

	// Metrics is an optional observability sink. Nil disables all
	// metric recording inside this step.Context. The root starling
	// package wires a Prometheus-backed implementation; step exposes
	// only the interface so it doesn't pull the client_golang
	// dependency into the core loop.
	Metrics MetricsSink

	// RequireRawResponseHash makes LLMCall fail if the provider's
	// ChunkEnd lacks a 32-byte hash.
	RequireRawResponseHash bool

	// EmitTimeout bounds the per-event Append the step emitter runs
	// under context.WithoutCancel. Zero means no timeout (the historical
	// behavior). Set this when a hung backend would otherwise block
	// terminal/failure-path emits and prevent shutdown.
	EmitTimeout time.Duration
}

Config captures the static dependencies a step.Context needs across a run. The agent loop builds one at run start and never mutates it.

Log and RunID are required and validated by NewContext. Provider and Tools are only checked when the corresponding helper is invoked: LLMCall panics if Provider is nil; CallTool returns ErrToolNotFound if Tools is nil. Budget is optional — a zero-valued BudgetConfig disables pre-call input-token enforcement.

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context is the opaque per-run state attached to every stdlib context.Context inside an agent run. It owns the hash-chain cursor (next seq + previous event hash) so that any step-level emitter sees a consistent view even under concurrent Now/Random/SideEffect calls.

Callers extract it with From and pass it into the non-deterministic helpers in this package. Construction is normally handled by the agent loop (the root starling package); advanced users wiring their own loops may call NewContext directly.

func From

func From(ctx context.Context) (*Context, bool)

From extracts the Context previously attached via WithContext. When no step.Context has been attached the second return is false and the first is nil — callers that must emit events should treat this as a programmer error.

func MustNewContext

func MustNewContext(cfg Config) *Context

MustNewContext is NewContext without the error return: it panics on invalid config. Useful in test setup and in the agent loop where a bad config is a programmer bug. Production callers should prefer NewContext.

func NewContext

func NewContext(cfg Config) (*Context, error)

NewContext returns a Context primed to emit the first event (seq=1, prevHash=nil). Log and RunID are required; Provider and Tools are checked lazily by LLMCall and CallTool.

func (*Context) Logger

func (c *Context) Logger() *slog.Logger

Logger returns the slog.Logger bound to this run. Never nil: returns a discard logger when the agent was built without one. Intended for the agent loop and downstream tool implementations that want to participate in the run's structured trace.

func (*Context) Metrics

func (c *Context) Metrics() MetricsSink

Metrics returns the MetricsSink this Context records to, or nil when metrics are disabled. Call sites should treat nil as a no-op rather than guarding every method call.

func (*Context) RunID

func (c *Context) RunID() string

RunID returns the run identifier associated with the context. Useful for tools / tests that need to correlate external state with the current run.

type MetricsSink

type MetricsSink interface {
	ObserveProviderCall(model, status string, d time.Duration, promptTokens, completionTokens int64)
	ObserveToolCall(toolName, status, errorType string, d time.Duration)
	ObserveEventlogAppend(kind, status string, d time.Duration)
	ObserveBudgetExceeded(axis string)
}

MetricsSink is the narrow interface step.Context uses to record observability samples. Implementations must be concurrency-safe and must tolerate being called in a tight loop — the eventlog observer runs on every emit.

Implementations may no-op any method; the interface exists only so step doesn't depend on a concrete metrics library.

type MismatchClass

type MismatchClass string

MismatchClass classifies why replay diverged.

const (
	MismatchExhausted MismatchClass = "exhausted"
	MismatchKind      MismatchClass = "kind"
	MismatchPayload   MismatchClass = "payload"
	MismatchTurnID    MismatchClass = "turn_id"
)

type MismatchError

type MismatchError struct {
	Seq          uint64
	Kind         event.Kind
	ExpectedKind event.Kind
	Class        MismatchClass
	Reason       string
}

MismatchError carries the structured details of a replay divergence. Implements errors.Is(ErrReplayMismatch).

func (*MismatchError) Error

func (e *MismatchError) Error() string

func (*MismatchError) Is

func (e *MismatchError) Is(target error) bool

type Mode

type Mode uint8

Mode selects between a live run (side effects execute and are recorded) and a replay run (side effects return pre-recorded values without re-running them).

const (
	// ModeLive is the default: helpers execute their effect and emit a
	// SideEffectRecorded event capturing the result.
	ModeLive Mode = iota

	// ModeReplay consumes pre-recorded SideEffectRecorded events from
	// Config.Recorded in order, returning the stored values instead of
	// re-running the effect. Used by the replay verifier.
	ModeReplay
)

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maps tool names to Tool implementations for a single run. It is the runtime-side lookup used by CallTool; the tool package itself stays free of registry concerns so tool authors can define tools without importing runtime state.

A Registry is immutable after construction — NewRegistry copies its inputs — which means callers can safely share one across goroutines.

func NewRegistry

func NewRegistry(tools ...tool.Tool) *Registry

NewRegistry returns a Registry containing the given tools, keyed by each tool's Name(). Duplicate names cause the later entry to win; callers that care about duplicate detection should check themselves before calling.

func (*Registry) Get

func (r *Registry) Get(name string) (tool.Tool, bool)

Get returns the Tool registered under name, or (nil, false) if not found.

func (*Registry) Names

func (r *Registry) Names() []string

Names returns the registered tool names in alphabetical order. The slice is a fresh copy; callers may mutate it.

Deterministic ordering matters for RunStarted.ToolRegistryHash: the hash is computed over ToolSchemas listed in the order Names returns.

type ToolCall

type ToolCall struct {
	CallID      string
	TurnID      string
	Name        string
	Args        json.RawMessage
	Idempotent  bool
	MaxAttempts int
	Backoff     func(attempt int) time.Duration
}

ToolCall describes a single tool invocation. CallID+TurnID link the invocation back to the AssistantMessageCompleted that planned it. Empty CallID mints a ULID; TurnID has no default (missing TurnID is almost always a caller bug).

Retry: Idempotent=true with MaxAttempts>1 retries on transient failures (errors.Is tool.ErrTransient). Each attempt emits a fresh ToolCallScheduled/Completed/Failed{Attempt:n}. Nil Backoff uses an exponential default (100ms base, ×2, cap 10s, 0–25% jitter). Non-idempotent calls run exactly once regardless of MaxAttempts.

type ToolResult

type ToolResult struct {
	CallID string
	Result json.RawMessage
	Err    error
}

ToolResult is the outcome of one tool invocation from CallTools. Result is the tool's JSON output (nil on failure). Err is the tool/registry/panic error. One failing tool does not cancel siblings.

func CallTools

func CallTools(ctx context.Context, calls []ToolCall) ([]ToolResult, error)

CallTools dispatches a batch of tool calls, emitting ToolCallScheduled for every call in input order up front, then running the tools concurrently with a semaphore (cap = Config.MaxParallelTools or DefaultMaxParallelTools when zero). Each tool's completion emits ToolCallCompleted or ToolCallFailed as it finishes, so seq numbers reflect actual completion order — that ordering is the committed ground truth the hash chain ratifies.

A failing tool does not cancel siblings; its error surfaces in the matching ToolResult.Err. Callers decide whether to keep going. The returned slice preserves input order (NOT completion order), so callers can correlate results with the calls they supplied.

Retry (per-call Idempotent + MaxAttempts>1) applies inside each worker. Only the attempt-1 Scheduled events are contiguous in the log; retry Scheduleds land interleaved with sibling Completeds by design, because a retry is contingent on the prior attempt's Failed.

Under ModeReplay, CallTools does NOT fan out — it executes tools sequentially in the order their final Completed/Failed events appear in the recording so the re-emitted payloads land at the same seq as the original run. Byte-for-byte divergence surfaces as ErrReplayMismatch from the underlying emit.

Panics if ctx has no step.Context attached.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL