Documentation
¶
Overview ¶
Package workflow provides composable patterns for orchestrating AI-powered pipelines.
The package implements four core workflow patterns:
- Chain: Sequential execution where output flows to the next step
- Parallel: Concurrent execution with result aggregation
- Router: Conditional branching based on predicates or LLM classification
- Loop: Iterative execution until a condition is met
All workflow types implement the Step interface, enabling arbitrary nesting and composition of patterns.
Basic Usage ¶
Create a simple chain workflow:
chain := workflow.NewChain("my-chain",
workflow.NewFuncStep("setup", func(ctx context.Context, state *workflow.State) error {
state.Set("input", "Hello, World!")
return nil
}),
workflow.NewPromptStep("process", provider,
func(s *workflow.State) []gains.Message {
return []gains.Message{
{Role: gains.RoleUser, Content: s.GetString("input")},
}
},
"output",
),
)
wf := workflow.New("my-workflow", chain)
result, err := wf.Run(context.Background(), workflow.NewState())
Parallel Execution ¶
Execute multiple steps concurrently:
parallel := workflow.NewParallel("research", steps,
func(state *workflow.State, results map[string]*workflow.StepResult, errors map[string]error) error {
// Aggregate results (errors contains any failed steps when ContinueOnError=true)
var combined string
for name, result := range results {
combined += fmt.Sprintf("%s: %v\n", name, result.Output)
}
state.Set("combined", combined)
return nil
},
)
Access branch state values with typed helpers:
var KeyAnalysis = workflow.NewKey[*AnalysisResult]("analysis")
parallel := workflow.NewParallel("analyze", steps,
func(state *workflow.State, results map[string]*workflow.StepResult, errors map[string]error) error {
for name, result := range results {
// Type-safe access to branch state
analysis := workflow.MustGetFromBranch(result, KeyAnalysis)
fmt.Printf("%s score: %d\n", name, analysis.Score)
}
return nil
},
)
Conditional Routing ¶
Route based on state conditions:
router := workflow.NewRouter("route",
[]workflow.Route{
{
Name: "high-priority",
Condition: func(ctx context.Context, s *workflow.State) bool {
return s.GetString("priority") == "high"
},
Step: highPriorityStep,
},
},
defaultStep,
)
Or use LLM-based classification:
classifier := workflow.NewClassifierRouter("classify", provider,
func(s *workflow.State) []gains.Message {
return []gains.Message{
{Role: gains.RoleSystem, Content: "Classify as: billing, technical, general"},
{Role: gains.RoleUser, Content: s.GetString("ticket")},
}
},
map[string]workflow.Step{
"billing": billingStep,
"technical": technicalStep,
"general": generalStep,
},
)
Iterative Loops ¶
Repeat steps until a condition is met:
// Create a content creator that reads feedback on subsequent iterations
creator := workflow.NewPromptStep("creator", provider,
func(s *workflow.State) []gains.Message {
feedback := s.GetString("feedback")
if feedback == "" {
return []gains.Message{{Role: gains.RoleUser, Content: "Write a blog post about Go"}}
}
return []gains.Message{
{Role: gains.RoleUser, Content: "Write a blog post about Go"},
{Role: gains.RoleAssistant, Content: s.GetString("draft")},
{Role: gains.RoleUser, Content: "Revise based on: " + feedback},
}
},
workflow.WithOutputKey("draft"),
)
// Create an editor that approves or provides feedback
editor := workflow.NewPromptStep("editor", provider,
func(s *workflow.State) []gains.Message {
return []gains.Message{{
Role: gains.RoleUser,
Content: "Review this draft. Reply APPROVED or provide feedback:\n\n" +
s.GetString("draft"),
}}
},
workflow.WithOutputKey("feedback"),
)
// Combine into a chain and loop until approved
reviewCycle := workflow.NewChain("review-cycle", creator, editor)
loop := workflow.NewLoop("content-loop", reviewCycle,
func(ctx context.Context, s *workflow.State) bool {
return strings.Contains(s.GetString("feedback"), "APPROVED")
},
workflow.WithMaxIterations(5),
)
Streaming Events ¶
Monitor workflow progress in real-time:
import "github.com/spetersoncode/gains/event"
events := wf.RunStream(ctx, state)
for e := range events {
switch e.Type {
case event.StepStart:
fmt.Printf("Starting: %s\n", e.StepName)
case event.MessageDelta:
fmt.Print(e.Delta)
case event.StepEnd:
fmt.Printf("Completed: %s\n", e.StepName)
case event.RunError:
fmt.Printf("Error: %v\n", e.Error)
}
}
Composability ¶
Workflows can be nested since all patterns implement Step:
outer := workflow.NewChain("outer",
setupStep,
workflow.NewParallel("inner-parallel", parallelSteps, nil),
workflow.NewRouter("inner-router", routes, nil),
workflow.NewLoop("inner-loop", refinementStep, condition),
finalStep,
)
Index ¶
- Variables
- func ClassifierSchema(routes map[string]Step) ai.Option
- func Delete[T any](s *State, key Key[T])
- func Get[T any](s *State, key Key[T]) (T, bool)
- func GetFromBranch[T any](result *StepResult, key Key[T]) (T, bool)
- func GetOr[T any](s *State, key Key[T], defaultVal T) T
- func Has[T any](s *State, key Key[T]) bool
- func MustGet[T any](s *State, key Key[T]) T
- func MustGetFromBranch[T any](result *StepResult, key Key[T]) T
- func Set[T any](s *State, key Key[T], value T)
- func SetIfAbsent[T any](s *State, key Key[T], value T) bool
- func StateInto(state *State, structPtr any)
- func Update[T any](s *State, key Key[T], fn func(T) T) T
- type AgentResult
- type AgentStep
- type Aggregator
- type Chain
- type ClassifierRouter
- func (c *ClassifierRouter) ClassificationKey() Key[string]
- func (c *ClassifierRouter) Name() string
- func (c *ClassifierRouter) RouteKey() Key[string]
- func (c *ClassifierRouter) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)
- func (c *ClassifierRouter) RunStream(ctx context.Context, state *State, opts ...Option) <-chan Event
- type Condition
- type ErrorHandler
- type Event
- type FuncStep
- type Key
- type Loop
- func NewLoop(name string, step Step, condition LoopCondition, opts ...LoopOption) *Loop
- func NewLoopUntil[T comparable](name string, step Step, key Key[T], value T, opts ...LoopOption) *Loop
- func NewLoopUntilSet[T any](name string, step Step, key Key[T], opts ...LoopOption) *Loop
- func NewLoopWhile[T comparable](name string, step Step, key Key[T], value T, opts ...LoopOption) *Loop
- type LoopCondition
- type LoopOption
- type MemoryAdapter
- type Option
- func WithChatOptions(opts ...ai.Option) Option
- func WithContinueOnError(enabled bool) Option
- func WithErrorHandler(fn ErrorHandler) Option
- func WithMaxConcurrency(n int) Option
- func WithMaxTokens(n int) Option
- func WithModel(model ai.Model) Option
- func WithOnStepComplete(fn StepCallback) Option
- func WithStepTimeout(d time.Duration) Option
- func WithTemperature(t float64) Option
- func WithTimeout(d time.Duration) Option
- type Options
- type Parallel
- type ParallelError
- type PromptFunc
- type PromptStep
- type Result
- type Route
- type Router
- type State
- type StateAdapter
- type Step
- type StepCallback
- type StepError
- type StepFunc
- type StepResult
- type TerminationReason
- type ToolExecutionError
- type ToolStep
- type TypedPromptStep
- type Workflow
Constants ¶
This section is empty.
Variables ¶
var ( // ErrWorkflowTimeout indicates the workflow exceeded its timeout. ErrWorkflowTimeout = errors.New("workflow: timeout exceeded") // ErrWorkflowCancelled indicates the workflow was cancelled. ErrWorkflowCancelled = errors.New("workflow: cancelled") // ErrNoRouteMatched indicates no route condition was satisfied. ErrNoRouteMatched = errors.New("workflow: no route matched") // ErrStepNotFound indicates a referenced step does not exist. ErrStepNotFound = errors.New("workflow: step not found") // ErrMaxIterationsExceeded indicates a loop reached its iteration limit. ErrMaxIterationsExceeded = errors.New("workflow: maximum loop iterations exceeded") )
Functions ¶
func ClassifierSchema ¶
ClassifierSchema returns a ai.Option that enforces structured output for classification. Use with providers that support JSON schema (OpenAI, Anthropic). Note: May not work with streaming on all providers.
func Get ¶ added in v0.2.3
Get retrieves a value from state using a typed key. Returns the typed value and true if the key exists and the type matches. Returns the zero value and false if the key is missing or type mismatches.
Example:
analysis, ok := workflow.Get(state, KeyAnalysis)
if ok {
fmt.Println(analysis.Sentiment)
}
func GetFromBranch ¶ added in v0.2.5
func GetFromBranch[T any](result *StepResult, key Key[T]) (T, bool)
GetFromBranch extracts a typed value from a branch's state within StepResult. This is a convenience helper for parallel workflow aggregators. Returns the typed value and true if the key exists, or zero value and false otherwise.
Example:
aggregator := func(state *State, results map[string]*StepResult, errors map[string]error) error {
for name, result := range results {
if riff, ok := workflow.GetFromBranch(result, KeyRiff); ok {
fmt.Println("Branch", name, "produced:", riff)
}
}
}
func GetOr ¶ added in v0.2.3
GetOr retrieves a value from state using a typed key. Returns defaultVal if the key is missing or the type mismatches.
Example:
retries := workflow.GetOr(state, KeyRetryCount, 0)
func MustGet ¶ added in v0.2.1
MustGet retrieves a value from state using a typed key. Panics if the key is missing or the value cannot be asserted to type T. Use when you are certain the key exists with the correct type.
Example:
analysis := workflow.MustGet(state, KeyAnalysis) fmt.Println(analysis.Sentiment)
func MustGetFromBranch ¶ added in v0.2.5
func MustGetFromBranch[T any](result *StepResult, key Key[T]) T
MustGetFromBranch extracts a typed value from a branch's state within StepResult. Panics if the branch state is missing, the key doesn't exist, or the type mismatches. Use when you are certain the branch produced the expected key.
Example:
aggregator := func(state *State, results map[string]*StepResult, errors map[string]error) error {
for name, result := range results {
riff := workflow.MustGetFromBranch(result, KeyRiff)
fmt.Println("Branch", name, "produced:", riff)
}
}
func Set ¶ added in v0.2.3
Set stores a value in state using a typed key. The compiler enforces that value has the correct type for the key.
Example:
workflow.Set(state, KeyAnalysis, &AnalysisResult{...})
func SetIfAbsent ¶ added in v0.2.3
SetIfAbsent stores a value only if the key does not exist. Returns true if the value was set, false if the key already existed.
func StateInto ¶ added in v0.2.7
StateInto copies state values into a struct's fields. Field names are matched using json tags, falling back to field name in snake_case. Only exported fields with matching state keys are populated.
Example:
var result MyState workflow.StateInto(state, &result) fmt.Println(result.UserID) // "abc123"
Types ¶
type AgentResult ¶ added in v0.2.4
type AgentResult struct {
// Response is the final model response.
Response *ai.Response
// Messages is the complete conversation history.
Messages []ai.Message
// Steps is the number of agent iterations.
Steps int
// Termination indicates why the agent stopped.
Termination agent.TerminationReason
}
AgentResult contains the structured output from an AgentStep execution.
type AgentStep ¶ added in v0.2.4
type AgentStep struct {
// contains filtered or unexported fields
}
AgentStep wraps the agent package for autonomous tool-calling within a workflow. It runs an agent loop to completion and stores the final result in state.
func NewAgentStep ¶ added in v0.2.4
func NewAgentStep( name string, chatClient chat.Client, registry *tool.Registry, prompt PromptFunc, outputKey string, agentOpts []agent.Option, chatOpts ...ai.Option, ) *AgentStep
NewAgentStep creates a step that runs an autonomous tool-calling agent.
Parameters:
- name: Unique identifier for the step
- chatClient: Client supporting ChatStream for the agent
- registry: Tool registry with registered handlers
- prompt: Function that builds initial messages from state
- outputKey: State key for storing agent result (empty to skip storage)
- agentOpts: Options passed to agent.Run/RunStream
- chatOpts: Options passed to each chat call
Example:
registry := tool.NewRegistry()
tool.RegisterFunc(registry, "search", "Search the web", searchHandler)
step := workflow.NewAgentStep(
"research",
client,
registry,
func(s *workflow.State) []ai.Message {
topic := s.GetString("topic")
return []ai.Message{{
Role: ai.RoleUser,
Content: fmt.Sprintf("Research %s and provide a summary", topic),
}}
},
"research_result",
[]agent.Option{agent.WithMaxSteps(5)},
ai.WithModel(model.Claude35Sonnet),
)
func (*AgentStep) OutputKey ¶ added in v0.2.4
func (a *AgentStep) OutputKey() Key[*AgentResult]
OutputKey returns a typed key for accessing the AgentResult in state. The key name is the step's outputKey.
type Aggregator ¶
Aggregator combines results from parallel steps into the shared state. The errors map contains any step failures when ContinueOnError is true, giving full visibility into which steps succeeded and which failed.
func CollectInto ¶ added in v0.2.5
func CollectInto[T any](inputKey Key[T], outputKey Key[[]T]) Aggregator
CollectInto returns an aggregator that collects values from all branches into a slice. Each branch should set the inputKey; all values are collected into outputKey as []T. Order of collected values is non-deterministic due to concurrent execution.
func CollectMap ¶ added in v0.2.5
func CollectMap[T any](inputKey Key[T], outputKey Key[map[string]T]) Aggregator
CollectMap returns an aggregator that collects values from all branches into a map. The map keys are the step names, values are the typed values from each branch.
func MergeAll ¶ added in v0.2.5
func MergeAll() Aggregator
MergeAll returns an aggregator that merges all keys from all branch states. This is the default behavior when no aggregator is provided to NewParallel.
func MergeKeys ¶ added in v0.2.5
func MergeKeys(keys ...string) Aggregator
MergeKeys returns an aggregator that merges only the specified keys from branch states. Keys not in the list are ignored. Later branches overwrite earlier ones for the same key.
func MergeTypedKey ¶ added in v0.2.5
func MergeTypedKey[T any](key Key[T]) Aggregator
MergeTypedKey returns an aggregator that merges a single typed key from branch states. The last branch to set the key wins.
type Chain ¶
type Chain struct {
// contains filtered or unexported fields
}
Chain executes steps sequentially, passing state between them.
type ClassifierRouter ¶
type ClassifierRouter struct {
// contains filtered or unexported fields
}
ClassifierRouter uses an LLM to classify input and route accordingly.
func NewClassifierRouter ¶
func NewClassifierRouter( name string, c chat.Client, prompt PromptFunc, routes map[string]Step, opts ...ai.Option, ) *ClassifierRouter
NewClassifierRouter creates a router that uses LLM classification. The LLM response should match one of the route keys (case-insensitive). For more reliable classification, use ClassifierSchema().
func (*ClassifierRouter) ClassificationKey ¶ added in v0.2.3
func (c *ClassifierRouter) ClassificationKey() Key[string]
ClassificationKey returns a typed key for the raw classification result. The key name follows the pattern "{routerName}_classification".
func (*ClassifierRouter) Name ¶
func (c *ClassifierRouter) Name() string
Name returns the router name.
func (*ClassifierRouter) RouteKey ¶ added in v0.2.3
func (c *ClassifierRouter) RouteKey() Key[string]
RouteKey returns a typed key for the selected route name. The key name follows the pattern "{routerName}_route".
func (*ClassifierRouter) Run ¶
func (c *ClassifierRouter) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)
Run classifies input and executes the matching route.
type Condition ¶
Condition determines if a route should be taken.
func ConditionEquals ¶ added in v0.2.3
func ConditionEquals[T comparable](key Key[T], value T) Condition
ConditionEquals returns a condition that matches when the key equals value.
func ConditionMatches ¶ added in v0.2.3
ConditionMatches returns a condition using a predicate function on the key value.
func ConditionSet ¶ added in v0.2.3
ConditionSet returns a condition that matches when the key exists in state.
type ErrorHandler ¶
ErrorHandler is called when a step encounters an error. Return nil to suppress the error, or return an error to propagate it.
type Event ¶
Event is an alias to the unified event type. Workflow events use these event.Type values:
- event.RunStart, event.RunEnd, event.RunError
- event.StepStart, event.StepEnd, event.StepSkipped
- event.MessageStart, event.MessageDelta, event.MessageEnd
- event.ToolCallStart, event.ToolCallArgs, event.ToolCallEnd, event.ToolCallResult
- event.ParallelStart, event.ParallelEnd
- event.RouteSelected, event.LoopIteration
type FuncStep ¶
type FuncStep struct {
// contains filtered or unexported fields
}
FuncStep wraps a function as a Step.
func NewFuncStep ¶
NewFuncStep creates a step from a function.
type Key ¶ added in v0.2.3
type Key[T any] struct { // contains filtered or unexported fields }
Key represents a typed state key that associates a name with type T. Keys provide compile-time type safety for workflow state access.
Define keys as package-level variables for reuse:
var (
KeyAnalysis = workflow.NewKey[*AnalysisResult]("analysis")
KeyProjects = workflow.NewKey[[]*ProjectConfig]("projects")
KeyApproved = workflow.NewKey[bool]("approved")
)
func NewKey ¶ added in v0.2.3
NewKey creates a typed key with the given name. The type parameter T specifies the type of values stored under this key.
type Loop ¶
type Loop struct {
// contains filtered or unexported fields
}
Loop repeatedly executes a step until a condition returns true. Use for iterative refinement workflows where steps need to repeat based on evaluation results stored in state.
func NewLoop ¶
func NewLoop(name string, step Step, condition LoopCondition, opts ...LoopOption) *Loop
NewLoop creates a loop that executes step until condition returns true. The condition is checked after each iteration. If the condition never returns true, the loop terminates after maxIterations (default 10) and returns ErrMaxIterationsExceeded.
The loop stores the current iteration count in state under "{name}_iteration".
func NewLoopUntil ¶ added in v0.2.3
func NewLoopUntil[T comparable](name string, step Step, key Key[T], value T, opts ...LoopOption) *Loop
NewLoopUntil creates a loop that exits when the typed key equals the target value.
func NewLoopUntilSet ¶ added in v0.2.3
NewLoopUntilSet creates a loop that exits when the typed key has a truthy value.
func NewLoopWhile ¶ added in v0.2.3
func NewLoopWhile[T comparable](name string, step Step, key Key[T], value T, opts ...LoopOption) *Loop
NewLoopWhile creates a loop that continues while the typed key equals the target value. The loop exits when the key no longer equals the value (or is unset).
func (*Loop) IterationKey ¶ added in v0.2.3
IterationKey returns a typed key for the current iteration count. The key name follows the pattern "{loopName}_iteration".
type LoopCondition ¶
LoopCondition evaluates state to determine if the loop should exit. Return true to exit the loop, false to continue iterating.
type LoopOption ¶
type LoopOption func(*Loop)
LoopOption configures a Loop.
func WithMaxIterations ¶
func WithMaxIterations(n int) LoopOption
WithMaxIterations sets the maximum number of loop iterations. Default is 10.
type MemoryAdapter ¶
type MemoryAdapter = store.MemoryAdapter
MemoryAdapter is an in-memory implementation of StateAdapter.
func NewMemoryAdapter ¶
func NewMemoryAdapter() *MemoryAdapter
NewMemoryAdapter creates a new in-memory adapter for State.
type Option ¶
type Option func(*Options)
Option is a functional option for workflow configuration.
func WithChatOptions ¶
WithChatOptions passes options to LLM calls.
func WithContinueOnError ¶
WithContinueOnError allows the workflow to continue after errors.
func WithErrorHandler ¶
func WithErrorHandler(fn ErrorHandler) Option
WithErrorHandler sets a custom error handler.
func WithMaxConcurrency ¶
WithMaxConcurrency limits parallel step execution. A value of 0 means unlimited concurrency.
func WithMaxTokens ¶
WithMaxTokens is a convenience option to set max tokens for chat calls.
func WithOnStepComplete ¶
func WithOnStepComplete(fn StepCallback) Option
WithOnStepComplete sets a callback for step completion.
func WithStepTimeout ¶
WithStepTimeout sets the default timeout for each step.
func WithTemperature ¶
WithTemperature is a convenience option to set temperature for chat calls.
func WithTimeout ¶
WithTimeout sets the overall workflow timeout.
type Options ¶
type Options struct {
// Timeout sets a deadline for the entire workflow.
Timeout time.Duration
// StepTimeout sets default timeout for individual steps.
StepTimeout time.Duration
// MaxConcurrency limits parallel step execution (0 = unlimited).
MaxConcurrency int
// ErrorHandler is called on step errors.
ErrorHandler ErrorHandler
// OnStepComplete is called after each step.
OnStepComplete StepCallback
// ContinueOnError allows workflow to continue after step errors.
ContinueOnError bool
// ChatOptions are passed to LLM calls within steps.
ChatOptions []ai.Option
}
Options contains configuration for workflow execution.
func ApplyOptions ¶
ApplyOptions applies functional options with defaults.
type Parallel ¶
type Parallel struct {
// contains filtered or unexported fields
}
Parallel executes steps concurrently and aggregates results.
func NewParallel ¶
func NewParallel(name string, steps []Step, aggregator Aggregator) *Parallel
NewParallel creates a parallel workflow. The aggregator is called with all results after all steps complete. If aggregator is nil, each branch's state changes are merged back.
type ParallelError ¶
ParallelError wraps errors from parallel execution.
func (*ParallelError) Error ¶
func (e *ParallelError) Error() string
Error returns a formatted message summarizing the parallel execution failures.
func (*ParallelError) Unwrap ¶
func (e *ParallelError) Unwrap() error
Unwrap returns the first error for errors.Is/As compatibility.
type PromptFunc ¶
PromptFunc generates messages from state for an LLM call.
type PromptStep ¶
type PromptStep struct {
// contains filtered or unexported fields
}
PromptStep makes a single LLM call with a dynamic prompt.
func NewPromptStep ¶
func NewPromptStep(name string, c chat.Client, prompt PromptFunc, outputKey string, opts ...ai.Option) *PromptStep
NewPromptStep creates a step for a single LLM call. The prompt function generates messages from current state. If outputKey is non-empty, the response content is stored in state under that key.
func (*PromptStep) Run ¶
func (p *PromptStep) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)
Run executes the LLM call.
type Result ¶
type Result struct {
// WorkflowName identifies the workflow.
WorkflowName string
// State contains the final state after execution.
State *State
// Output is the primary output from the workflow.
Output any
// Usage aggregates token usage across all steps.
Usage ai.Usage
// Termination indicates why execution stopped.
Termination TerminationReason
// Error contains any error that caused termination.
Error error
}
Result represents the final outcome of workflow execution.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router selects and executes a step based on conditions.
func NewRouter ¶
NewRouter creates a conditional router. Routes are evaluated in order; first match wins. Default route is used if no conditions match (can be nil).
func (*Router) RouteKey ¶ added in v0.2.3
RouteKey returns a typed key for the selected route name. The key name follows the pattern "{routerName}_route".
type State ¶
State provides thread-safe key-value state management for workflows. This is the primary state container passed through workflow steps.
For type-safe state access, use the typed Key[T] API:
var KeyCount = workflow.NewKey[int]("count")
var KeyName = workflow.NewKey[string]("name")
workflow.Set(state, KeyCount, 42)
workflow.Set(state, KeyName, "example")
count, ok := workflow.Get(state, KeyCount) // 42, true
name := workflow.MustGet(state, KeyName) // "example"
The typed API provides compile-time type checking. See Key, Get, Set, MustGet, and GetOr for the full typed API.
func GetBranchState ¶ added in v0.2.5
func GetBranchState(result *StepResult) (*State, bool)
GetBranchState extracts the branch state from a StepResult. In parallel workflows, each branch runs with a cloned state stored in metadata. Returns the branch state and true if found, or nil and false otherwise.
Example:
aggregator := func(state *State, results map[string]*StepResult, errors map[string]error) error {
for name, result := range results {
if branchState, ok := workflow.GetBranchState(result); ok {
// Access branch-specific state
}
}
}
func NewState ¶
func NewState(adapter StateAdapter) *State
NewState creates a new State with the given adapter. If adapter is nil, a default in-memory adapter is used.
func NewStateFrom ¶
NewStateFrom creates a new State initialized with the given data.
func NewStateFromStruct ¶ added in v0.2.7
NewStateFromStruct creates a new State initialized from a struct's fields. Field names are derived from json tags, falling back to the field name in snake_case. Zero values are skipped unless the field has `state:"include_zero"` tag.
Example:
type MyState struct {
UserID string `json:"user_id"`
Count int `json:"count"`
Optional string `json:"optional,omitempty"`
}
state := workflow.NewStateFromStruct(&MyState{
UserID: "abc123",
Count: 42,
})
// State now contains: {"user_id": "abc123", "count": 42}
The struct pointer can be nil, in which case an empty state is returned.
type StateAdapter ¶
StateAdapter defines the interface for persistence backends. Implementations must be thread-safe.
type Step ¶
type Step interface {
// Name returns a unique identifier for the step.
Name() string
// Run executes the step and returns the result.
Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)
// RunStream executes the step and returns a channel of events.
RunStream(ctx context.Context, state *State, opts ...Option) <-chan Event
}
Step represents a single unit of work in a workflow. Steps can be functions, LLM calls, or nested workflows.
type StepCallback ¶
type StepCallback func(ctx context.Context, result *StepResult)
StepCallback is called after each step completes.
type StepError ¶
StepError wraps errors from step execution.
type StepResult ¶
type StepResult struct {
// StepName identifies which step produced this result.
StepName string
// Output is the primary output value (optional).
Output any
// Response contains the LLM response if the step used one.
Response *ai.Response
// Usage aggregates token usage if applicable.
Usage ai.Usage
// Metadata holds step-specific metadata.
Metadata map[string]any
}
StepResult contains the output of a step execution.
type TerminationReason ¶
type TerminationReason string
TerminationReason indicates why the workflow stopped.
const ( // TerminationComplete indicates normal completion. TerminationComplete TerminationReason = "complete" // TerminationTimeout indicates the deadline was exceeded. TerminationTimeout TerminationReason = "timeout" // TerminationCancelled indicates context cancellation. TerminationCancelled TerminationReason = "cancelled" // TerminationError indicates an error occurred. TerminationError TerminationReason = "error" )
type ToolExecutionError ¶ added in v0.2.4
type ToolExecutionError struct {
ToolName string // Name of the tool that failed
Content string // Error content returned by the tool
}
ToolExecutionError indicates a tool returned an error result.
func (*ToolExecutionError) Error ¶ added in v0.2.4
func (e *ToolExecutionError) Error() string
Error returns a formatted error message.
type ToolStep ¶ added in v0.2.4
type ToolStep[T any] struct { // contains filtered or unexported fields }
ToolStep executes a single tool with typed arguments. Use for direct tool invocations where the arguments are known.
func NewToolStep ¶ added in v0.2.4
func NewToolStep[T any]( name string, registry *tool.Registry, toolName string, argsFunc func(state *State) (T, error), outputKey string, ) *ToolStep[T]
NewToolStep creates a step that executes a tool with typed arguments. The type T should match the tool's expected argument structure.
Parameters:
- name: Unique identifier for the step
- registry: Tool registry containing the tool handler
- toolName: Name of the tool to execute
- argsFunc: Function that builds tool arguments from state
- outputKey: State key for storing tool result (empty to skip storage)
Example:
type SearchArgs struct {
Query string `json:"query"`
Limit int `json:"limit"`
}
step := workflow.NewToolStep(
"search",
registry,
"web_search",
func(s *workflow.State) (SearchArgs, error) {
return SearchArgs{
Query: s.GetString("search_query"),
Limit: 10,
}, nil
},
"search_results",
)
type TypedPromptStep ¶ added in v0.2.1
type TypedPromptStep[T any] struct { // contains filtered or unexported fields }
TypedPromptStep makes an LLM call with structured output that is automatically unmarshaled into type T and stored in state.
func NewTypedPromptStep ¶ added in v0.2.1
func NewTypedPromptStep[T any]( name string, c chat.Client, prompt PromptFunc, schema ai.ResponseSchema, outputKey string, opts ...ai.Option, ) *TypedPromptStep[T]
NewTypedPromptStep creates a step that returns structured output of type T. The schema parameter defines the JSON schema for the response. The unmarshaled *T is stored in state under outputKey.
Example:
type Analysis struct {
Sentiment string `json:"sentiment"`
Keywords []string `json:"keywords"`
}
analysisSchema := ai.ResponseSchema{
Name: "analysis",
Schema: schema.Object().
Field("sentiment", schema.String().Required()).
Field("keywords", schema.Array(schema.String()).Required()).
MustBuild(),
}
step := workflow.NewTypedPromptStep[Analysis](
"analyze",
client,
func(s *State) []ai.Message {
return []ai.Message{{Role: ai.RoleUser, Content: s.GetString("text")}}
},
analysisSchema,
"analysis",
ai.WithModel(model.Claude35Sonnet),
)
// After execution, state.Get("analysis") returns *Analysis
func (*TypedPromptStep[T]) Name ¶ added in v0.2.1
func (p *TypedPromptStep[T]) Name() string
Name returns the step name.
func (*TypedPromptStep[T]) Run ¶ added in v0.2.1
func (p *TypedPromptStep[T]) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)
Run executes the LLM call and unmarshals the response into T.