Documentation
¶
Overview ¶
Package workflow provides composable patterns for orchestrating AI-powered pipelines.
The package implements four core workflow patterns:
- Chain: Sequential execution where output flows to the next step
- Parallel: Concurrent execution with result aggregation
- Router: Conditional branching based on predicates or LLM classification
- Loop: Iterative execution until a condition is met
All workflow types implement the Step interface, enabling arbitrary nesting and composition of patterns.
Basic Usage ¶
Create a simple chain workflow:
chain := workflow.NewChain("my-chain",
workflow.NewFuncStep("setup", func(ctx context.Context, state *workflow.State) error {
state.Set("input", "Hello, World!")
return nil
}),
workflow.NewPromptStep("process", provider,
func(s *workflow.State) []gains.Message {
return []gains.Message{
{Role: gains.RoleUser, Content: s.GetString("input")},
}
},
"output",
),
)
wf := workflow.New("my-workflow", chain)
result, err := wf.Run(context.Background(), workflow.NewState())
Parallel Execution ¶
Execute multiple steps concurrently:
parallel := workflow.NewParallel("research", steps,
func(state *workflow.State, results map[string]*workflow.StepResult) error {
// Aggregate results
var combined string
for name, result := range results {
combined += fmt.Sprintf("%s: %v\n", name, result.Output)
}
state.Set("combined", combined)
return nil
},
)
Conditional Routing ¶
Route based on state conditions:
router := workflow.NewRouter("route",
[]workflow.Route{
{
Name: "high-priority",
Condition: func(ctx context.Context, s *workflow.State) bool {
return s.GetString("priority") == "high"
},
Step: highPriorityStep,
},
},
defaultStep,
)
Or use LLM-based classification:
classifier := workflow.NewClassifierRouter("classify", provider,
func(s *workflow.State) []gains.Message {
return []gains.Message{
{Role: gains.RoleSystem, Content: "Classify as: billing, technical, general"},
{Role: gains.RoleUser, Content: s.GetString("ticket")},
}
},
map[string]workflow.Step{
"billing": billingStep,
"technical": technicalStep,
"general": generalStep,
},
)
Iterative Loops ¶
Repeat steps until a condition is met:
// Create a content creator that reads feedback on subsequent iterations
creator := workflow.NewPromptStep("creator", provider,
func(s *workflow.State) []gains.Message {
feedback := s.GetString("feedback")
if feedback == "" {
return []gains.Message{{Role: gains.RoleUser, Content: "Write a blog post about Go"}}
}
return []gains.Message{
{Role: gains.RoleUser, Content: "Write a blog post about Go"},
{Role: gains.RoleAssistant, Content: s.GetString("draft")},
{Role: gains.RoleUser, Content: "Revise based on: " + feedback},
}
},
workflow.WithOutputKey("draft"),
)
// Create an editor that approves or provides feedback
editor := workflow.NewPromptStep("editor", provider,
func(s *workflow.State) []gains.Message {
return []gains.Message{{
Role: gains.RoleUser,
Content: "Review this draft. Reply APPROVED or provide feedback:\n\n" +
s.GetString("draft"),
}}
},
workflow.WithOutputKey("feedback"),
)
// Combine into a chain and loop until approved
reviewCycle := workflow.NewChain("review-cycle", creator, editor)
loop := workflow.NewLoop("content-loop", reviewCycle,
func(ctx context.Context, s *workflow.State) bool {
return strings.Contains(s.GetString("feedback"), "APPROVED")
},
workflow.WithMaxIterations(5),
)
Streaming Events ¶
Monitor workflow progress in real-time:
events := wf.RunStream(ctx, state)
for event := range events {
switch event.Type {
case workflow.EventStepStart:
fmt.Printf("Starting: %s\n", event.StepName)
case workflow.EventStreamDelta:
fmt.Print(event.Delta)
case workflow.EventStepComplete:
fmt.Printf("Completed: %s\n", event.StepName)
case workflow.EventError:
fmt.Printf("Error: %v\n", event.Error)
}
}
Composability ¶
Workflows can be nested since all patterns implement Step:
outer := workflow.NewChain("outer",
setupStep,
workflow.NewParallel("inner-parallel", parallelSteps, nil),
workflow.NewRouter("inner-router", routes, nil),
workflow.NewLoop("inner-loop", refinementStep, condition),
finalStep,
)
Index ¶
- Variables
- func ClassifierSchema(routes map[string]Step) ai.Option
- type Aggregator
- type Chain
- type ChatClient
- type ClassifierRouter
- type Condition
- type ErrorHandler
- type Event
- type EventType
- type FuncStep
- type Loop
- type LoopCondition
- type LoopOption
- type MemoryAdapter
- type Option
- func WithChatOptions(opts ...ai.Option) Option
- func WithContinueOnError(enabled bool) Option
- func WithErrorHandler(fn ErrorHandler) Option
- func WithMaxConcurrency(n int) Option
- func WithMaxTokens(n int) Option
- func WithModel(model ai.Model) Option
- func WithOnStepComplete(fn StepCallback) Option
- func WithStepTimeout(d time.Duration) Option
- func WithTemperature(t float64) Option
- func WithTimeout(d time.Duration) Option
- type Options
- type Parallel
- type ParallelError
- type PromptFunc
- type PromptStep
- type Result
- type Route
- type Router
- type State
- type StateAdapter
- type Step
- type StepCallback
- type StepError
- type StepFunc
- type StepResult
- type TerminationReason
- type Workflow
Constants ¶
This section is empty.
Variables ¶
var ( // ErrWorkflowTimeout indicates the workflow exceeded its timeout. ErrWorkflowTimeout = errors.New("workflow: timeout exceeded") // ErrWorkflowCancelled indicates the workflow was cancelled. ErrWorkflowCancelled = errors.New("workflow: cancelled") // ErrNoRouteMatched indicates no route condition was satisfied. ErrNoRouteMatched = errors.New("workflow: no route matched") // ErrStepNotFound indicates a referenced step does not exist. ErrStepNotFound = errors.New("workflow: step not found") // ErrMaxIterationsExceeded indicates a loop reached its iteration limit. ErrMaxIterationsExceeded = errors.New("workflow: maximum loop iterations exceeded") )
Functions ¶
Types ¶
type Aggregator ¶
type Aggregator func(state *State, results map[string]*StepResult) error
Aggregator combines results from parallel steps into the shared state.
type Chain ¶
type Chain struct {
// contains filtered or unexported fields
}
Chain executes steps sequentially, passing state between them.
type ChatClient ¶
type ChatClient interface {
Chat(ctx context.Context, messages []ai.Message, opts ...ai.Option) (*ai.Response, error)
ChatStream(ctx context.Context, messages []ai.Message, opts ...ai.Option) (<-chan ai.StreamEvent, error)
}
ChatClient is the interface for chat capabilities needed by workflow steps.
type ClassifierRouter ¶
type ClassifierRouter struct {
// contains filtered or unexported fields
}
ClassifierRouter uses an LLM to classify input and route accordingly.
func NewClassifierRouter ¶
func NewClassifierRouter( name string, c ChatClient, prompt PromptFunc, routes map[string]Step, opts ...ai.Option, ) *ClassifierRouter
NewClassifierRouter creates a router that uses LLM classification. The LLM response should match one of the route keys (case-insensitive). For more reliable classification, use ClassifierSchema().
func (*ClassifierRouter) Name ¶
func (c *ClassifierRouter) Name() string
Name returns the router name.
func (*ClassifierRouter) Run ¶
func (c *ClassifierRouter) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)
Run classifies input and executes the matching route.
type ErrorHandler ¶
ErrorHandler is called when a step encounters an error. Return nil to suppress the error, or return an error to propagate it.
type Event ¶
type Event struct {
// Type identifies the kind of event.
Type EventType
// StepName identifies the step that produced this event.
StepName string
// Delta contains streaming content for EventStreamDelta.
Delta string
// Result contains step result for EventStepComplete.
Result *StepResult
// ToolCall contains tool call info for EventToolCall.
ToolCall *ai.ToolCall
// ParallelResults contains results from parallel execution.
ParallelResults map[string]*StepResult
// RouteName identifies the selected route for EventRouteSelected.
RouteName string
// Iteration is the current loop iteration (1-indexed) for EventLoopIteration.
Iteration int
// Error contains the error for EventError.
Error error
// Message contains additional context.
Message string
// Timestamp is when the event occurred.
Timestamp time.Time
}
Event represents an observable occurrence during workflow execution.
type EventType ¶
type EventType string
EventType identifies the kind of event occurring during workflow execution.
const ( // EventWorkflowStart fires when the workflow begins. EventWorkflowStart EventType = "workflow_start" // EventWorkflowComplete fires when the workflow finishes. EventWorkflowComplete EventType = "workflow_complete" // EventStepStart fires when a step begins execution. EventStepStart EventType = "step_start" // EventStepComplete fires when a step finishes successfully. EventStepComplete EventType = "step_complete" // EventStepSkipped fires when a step is skipped (e.g., routing). EventStepSkipped EventType = "step_skipped" // EventStreamDelta fires for streaming content from LLM. EventStreamDelta EventType = "stream_delta" // EventToolCall fires when a tool is called within a step. EventToolCall EventType = "tool_call" // EventParallelStart fires when parallel execution begins. EventParallelStart EventType = "parallel_start" // EventParallelComplete fires when all parallel branches complete. EventParallelComplete EventType = "parallel_complete" // EventRouteSelected fires when a route is chosen. EventRouteSelected EventType = "route_selected" // EventLoopIteration fires at the start of each loop iteration. EventLoopIteration EventType = "loop_iteration" // EventError fires when an error occurs. EventError EventType = "error" )
type FuncStep ¶
type FuncStep struct {
// contains filtered or unexported fields
}
FuncStep wraps a function as a Step.
func NewFuncStep ¶
NewFuncStep creates a step from a function.
type Loop ¶
type Loop struct {
// contains filtered or unexported fields
}
Loop repeatedly executes a step until a condition returns true. Use for iterative refinement workflows where steps need to repeat based on evaluation results stored in state.
func NewLoop ¶
func NewLoop(name string, step Step, condition LoopCondition, opts ...LoopOption) *Loop
NewLoop creates a loop that executes step until condition returns true. The condition is checked after each iteration. If the condition never returns true, the loop terminates after maxIterations (default 10) and returns ErrMaxIterationsExceeded.
The loop stores the current iteration count in state under "{name}_iteration".
type LoopCondition ¶
LoopCondition evaluates state to determine if the loop should exit. Return true to exit the loop, false to continue iterating.
type LoopOption ¶
type LoopOption func(*Loop)
LoopOption configures a Loop.
func WithMaxIterations ¶
func WithMaxIterations(n int) LoopOption
WithMaxIterations sets the maximum number of loop iterations. Default is 10.
type MemoryAdapter ¶
type MemoryAdapter = store.MemoryAdapter
MemoryAdapter is an in-memory implementation of StateAdapter.
func NewMemoryAdapter ¶
func NewMemoryAdapter() *MemoryAdapter
NewMemoryAdapter creates a new in-memory adapter for State.
type Option ¶
type Option func(*Options)
Option is a functional option for workflow configuration.
func WithChatOptions ¶
WithChatOptions passes options to LLM calls.
func WithContinueOnError ¶
WithContinueOnError allows the workflow to continue after errors.
func WithErrorHandler ¶
func WithErrorHandler(fn ErrorHandler) Option
WithErrorHandler sets a custom error handler.
func WithMaxConcurrency ¶
WithMaxConcurrency limits parallel step execution. A value of 0 means unlimited concurrency.
func WithMaxTokens ¶
WithMaxTokens is a convenience option to set max tokens for chat calls.
func WithOnStepComplete ¶
func WithOnStepComplete(fn StepCallback) Option
WithOnStepComplete sets a callback for step completion.
func WithStepTimeout ¶
WithStepTimeout sets the default timeout for each step.
func WithTemperature ¶
WithTemperature is a convenience option to set temperature for chat calls.
func WithTimeout ¶
WithTimeout sets the overall workflow timeout.
type Options ¶
type Options struct {
// Timeout sets a deadline for the entire workflow.
Timeout time.Duration
// StepTimeout sets default timeout for individual steps.
StepTimeout time.Duration
// MaxConcurrency limits parallel step execution (0 = unlimited).
MaxConcurrency int
// ErrorHandler is called on step errors.
ErrorHandler ErrorHandler
// OnStepComplete is called after each step.
OnStepComplete StepCallback
// ContinueOnError allows workflow to continue after step errors.
ContinueOnError bool
// ChatOptions are passed to LLM calls within steps.
ChatOptions []ai.Option
}
Options contains configuration for workflow execution.
func ApplyOptions ¶
ApplyOptions applies functional options with defaults.
type Parallel ¶
type Parallel struct {
// contains filtered or unexported fields
}
Parallel executes steps concurrently and aggregates results.
func NewParallel ¶
func NewParallel(name string, steps []Step, aggregator Aggregator) *Parallel
NewParallel creates a parallel workflow. The aggregator is called with all results after all steps complete. If aggregator is nil, each branch's state changes are merged back.
type ParallelError ¶
ParallelError wraps errors from parallel execution.
func (*ParallelError) Error ¶
func (e *ParallelError) Error() string
Error returns a formatted message summarizing the parallel execution failures.
func (*ParallelError) Unwrap ¶
func (e *ParallelError) Unwrap() error
Unwrap returns the first error for errors.Is/As compatibility.
type PromptFunc ¶
PromptFunc generates messages from state for an LLM call.
type PromptStep ¶
type PromptStep struct {
// contains filtered or unexported fields
}
PromptStep makes a single LLM call with a dynamic prompt.
func NewPromptStep ¶
func NewPromptStep(name string, c ChatClient, prompt PromptFunc, outputKey string, opts ...ai.Option) *PromptStep
NewPromptStep creates a step for a single LLM call. The prompt function generates messages from current state. If outputKey is non-empty, the response content is stored in state under that key.
func (*PromptStep) Run ¶
func (p *PromptStep) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)
Run executes the LLM call.
type Result ¶
type Result struct {
// WorkflowName identifies the workflow.
WorkflowName string
// State contains the final state after execution.
State *State
// Output is the primary output from the workflow.
Output any
// Usage aggregates token usage across all steps.
Usage ai.Usage
// Termination indicates why execution stopped.
Termination TerminationReason
// Error contains any error that caused termination.
Error error
}
Result represents the final outcome of workflow execution.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router selects and executes a step based on conditions.
func NewRouter ¶
NewRouter creates a conditional router. Routes are evaluated in order; first match wins. Default route is used if no conditions match (can be nil).
type State ¶
State provides thread-safe key-value state management for workflows. This is the primary state container passed through workflow steps.
State supports reading and writing values of any type:
state := workflow.NewState(nil)
state.Set("count", 42)
state.Set("name", "example")
count := state.GetInt("count") // 42
name := state.GetString("name") // "example"
func NewState ¶
func NewState(adapter StateAdapter) *State
NewState creates a new State with the given adapter. If adapter is nil, a default in-memory adapter is used.
func NewStateFrom ¶
NewStateFrom creates a new State initialized with the given data.
type StateAdapter ¶
StateAdapter defines the interface for persistence backends. Implementations must be thread-safe.
type Step ¶
type Step interface {
// Name returns a unique identifier for the step.
Name() string
// Run executes the step and returns the result.
Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)
// RunStream executes the step and returns a channel of events.
RunStream(ctx context.Context, state *State, opts ...Option) <-chan Event
}
Step represents a single unit of work in a workflow. Steps can be functions, LLM calls, or nested workflows.
type StepCallback ¶
type StepCallback func(ctx context.Context, result *StepResult)
StepCallback is called after each step completes.
type StepError ¶
StepError wraps errors from step execution.
type StepResult ¶
type StepResult struct {
// StepName identifies which step produced this result.
StepName string
// Output is the primary output value (optional).
Output any
// Response contains the LLM response if the step used one.
Response *ai.Response
// Usage aggregates token usage if applicable.
Usage ai.Usage
// Metadata holds step-specific metadata.
Metadata map[string]any
}
StepResult contains the output of a step execution.
type TerminationReason ¶
type TerminationReason string
TerminationReason indicates why the workflow stopped.
const ( // TerminationComplete indicates normal completion. TerminationComplete TerminationReason = "complete" // TerminationTimeout indicates the deadline was exceeded. TerminationTimeout TerminationReason = "timeout" // TerminationCancelled indicates context cancellation. TerminationCancelled TerminationReason = "cancelled" // TerminationError indicates an error occurred. TerminationError TerminationReason = "error" )