workflow

package
v0.2.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2025 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package workflow provides composable patterns for orchestrating AI-powered pipelines.

The package implements five core workflow patterns:

  • Chain: Sequential execution where steps share mutable state
  • Parallel: Concurrent execution with branch state isolation and aggregation
  • Router: Conditional branching based on predicates or LLM classification
  • Loop: Iterative execution until a condition is met
  • Merge: Conditional joining of multiple steps with fan-in aggregation

All workflow types implement the Step[S] interface, enabling arbitrary nesting and composition. The generic type parameter S is your user-defined state struct.

State Model

Define your own state struct to hold workflow data:

type MyState struct {
    Input    string
    Analysis SentimentResult
    Summary  string
}

State is passed by pointer and mutated in place. After workflow completion, access results directly from your state fields.

Basic Usage

Create a simple chain workflow:

type PipelineState struct {
    Topic   string
    Content string
}

chain := workflow.NewChain("my-chain",
    workflow.NewFuncStep[PipelineState]("setup", func(ctx context.Context, s *PipelineState) error {
        s.Topic = "Go generics"
        return nil
    }),
    workflow.NewPromptStep("generate", client,
        func(s *PipelineState) []gains.Message {
            return []gains.Message{
                {Role: gains.RoleUser, Content: "Write about: " + s.Topic},
            }
        },
        nil, // no schema = plain text
        func(s *PipelineState) *string { return &s.Content },
    ),
)

wf := workflow.New("my-workflow", chain)
state := &PipelineState{}
result, err := wf.Run(ctx, state)
fmt.Println(state.Content) // Access result from state

Structured Output

Use PromptStep with a schema for automatic JSON unmarshaling:

type AnalysisState struct {
    Input    string
    Analysis SentimentResult
}

type SentimentResult struct {
    Sentiment string  `json:"sentiment"`
    Score     float64 `json:"score"`
}

schema := gains.MustSchemaFor[SentimentResult]()

step := workflow.NewPromptStep("analyze", client,
    func(s *AnalysisState) []gains.Message {
        return []gains.Message{{Role: gains.RoleUser, Content: s.Input}}
    },
    schema, // schema provided = JSON unmarshal
    func(s *AnalysisState) *SentimentResult { return &s.Analysis },
)

Parallel Execution

Execute multiple steps concurrently with isolated branch state:

type ResearchState struct {
    Topic       string
    Technical   string
    Business    string
    Combined    string
}

parallel := workflow.NewParallel("research",
    []workflow.Step[ResearchState]{technicalStep, businessStep},
    func(state *ResearchState, branches map[string]*ResearchState, errors map[string]error) error {
        // Each branch ran with a deep copy; merge results back
        for name, branch := range branches {
            if name == "technical" {
                state.Technical = branch.Technical
            } else if name == "business" {
                state.Business = branch.Business
            }
        }
        state.Combined = state.Technical + "\n\n" + state.Business
        return nil
    },
)

Conditional Routing

Route based on state conditions:

type TicketState struct {
    Priority string
    Response string
}

router := workflow.NewRouter("route",
    []workflow.Route[TicketState]{
        {
            Name: "urgent",
            Condition: func(ctx context.Context, s *TicketState) bool {
                return s.Priority == "high"
            },
            Step: urgentHandler,
        },
    },
    normalHandler, // default
)

Or use LLM-based classification:

classifier := workflow.NewClassifierRouter("classify", client,
    func(s *TicketState) []gains.Message {
        return []gains.Message{
            {Role: gains.RoleSystem, Content: "Classify as: billing, technical, general"},
            {Role: gains.RoleUser, Content: s.Ticket},
        }
    },
    map[string]workflow.Step[TicketState]{
        "billing":   billingStep,
        "technical": technicalStep,
        "general":   generalStep,
    },
)

Iterative Loops

Repeat steps until a condition is met:

type EditState struct {
    Draft    string
    Feedback string
}

// Create a writer that revises based on feedback
writer := workflow.NewPromptStep("writer", client,
    func(s *EditState) []gains.Message {
        if s.Feedback == "" {
            return []gains.Message{{Role: gains.RoleUser, Content: "Write a poem"}}
        }
        return []gains.Message{
            {Role: gains.RoleUser, Content: fmt.Sprintf(
                "Revise this:\n%s\n\nFeedback: %s", s.Draft, s.Feedback,
            )},
        }
    },
    nil,
    func(s *EditState) *string { return &s.Draft },
)

// Create an editor that approves or provides feedback
editor := workflow.NewPromptStep("editor", client,
    func(s *EditState) []gains.Message {
        return []gains.Message{{
            Role:    gains.RoleUser,
            Content: "Review this. Reply APPROVED or provide feedback:\n\n" + s.Draft,
        }}
    },
    nil,
    func(s *EditState) *string { return &s.Feedback },
)

// Loop until approved
loop := workflow.NewLoopUntil("refine",
    workflow.NewChain("cycle", writer, editor),
    func(s *EditState) bool {
        return strings.Contains(strings.ToUpper(s.Feedback), "APPROVED")
    },
    workflow.WithMaxIterations(5),
)

Streaming Events

Monitor workflow progress in real-time:

import "github.com/spetersoncode/gains/event"

state := &MyState{}
events := wf.RunStream(ctx, state)
for e := range events {
    switch e.Type {
    case event.StepStart:
        fmt.Printf("Starting: %s\n", e.StepName)
    case event.MessageDelta:
        fmt.Print(e.Delta)
    case event.StepEnd:
        fmt.Printf("Completed: %s\n", e.StepName)
    case event.RunError:
        fmt.Printf("Error: %v\n", e.Error)
    }
}
// Access final results from state
fmt.Println(state.Summary)

Composability

Workflows can be nested since all patterns implement Step[S]:

outer := workflow.NewChain("outer",
    setupStep,
    workflow.NewParallel("inner-parallel", parallelSteps, aggregator),
    workflow.NewRouter("inner-router", routes, defaultStep),
    workflow.NewLoopUntil("inner-loop", refinementStep, condition),
    finalStep,
)

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrWorkflowTimeout indicates the workflow exceeded its timeout.
	ErrWorkflowTimeout = errors.New("workflow: timeout exceeded")

	// ErrWorkflowCancelled indicates the workflow was cancelled.
	ErrWorkflowCancelled = errors.New("workflow: cancelled")

	// ErrNoRouteMatched indicates no route condition was satisfied.
	ErrNoRouteMatched = errors.New("workflow: no route matched")

	// ErrStepNotFound indicates a referenced step does not exist.
	ErrStepNotFound = errors.New("workflow: step not found")

	// ErrMaxIterationsExceeded indicates a loop reached its iteration limit.
	ErrMaxIterationsExceeded = errors.New("workflow: maximum loop iterations exceeded")
)

Functions

func ClassifierSchema

func ClassifierSchema[S any](routes map[string]Step[S]) ai.Option

ClassifierSchema returns a ai.Option that enforces structured output for classification. Use with providers that support JSON schema (OpenAI, Anthropic). Note: May not work with streaming on all providers.

func DeepClone added in v0.2.8

func DeepClone[S any](src *S) (*S, error)

DeepClone creates a deep copy of a struct using JSON serialization. This is safe for concurrent use and handles nested structures. For performance-critical code, implement a custom clone method.

func NewTool added in v0.2.9

func NewTool(runner Runner, opts ...ToolOption) tool.Registration

NewTool creates a tool registration that wraps a workflow.Runner as a callable tool. This enables workflow composition where agents can invoke workflows as sub-tasks.

The tool arguments are passed directly to the workflow as initial state. The workflow runs to completion and the final state or message content is returned.

Example:

type AnalysisState struct {
    Query  string `json:"query" desc:"Analysis query" required:"true"`
    Result string `json:"result"`
}

runner := workflow.NewRunnerJSON[AnalysisState]("analysis", analysisWorkflow)
registry.Add(workflow.NewTool(runner,
    workflow.WithToolDescription("Run complex data analysis"),
))

func NewToolWithSchema added in v0.2.9

func NewToolWithSchema[T any](runner Runner, opts ...ToolOption) tool.Registration

NewToolWithSchema creates a workflow tool with a typed schema. Use this when you want compile-time type safety for workflow inputs.

Example:

type SearchInput struct {
    Query   string `json:"query" desc:"Search query" required:"true"`
    Limit   int    `json:"limit" desc:"Max results"`
}

registry.Add(workflow.NewToolWithSchema[SearchInput](searchRunner,
    workflow.WithToolDescription("Search for information"),
))

Types

type AgentResult added in v0.2.4

type AgentResult struct {
	// Response is the final model response.
	Response *ai.Response
	// Messages is the complete conversation history.
	Messages []ai.Message
	// Steps is the number of agent iterations.
	Steps int
	// Termination indicates why the agent stopped.
	Termination agent.TerminationReason
}

AgentResult contains the structured output from an AgentStep execution.

type AgentStep added in v0.2.4

type AgentStep[S any] struct {
	// contains filtered or unexported fields
}

AgentStep wraps the agent package for autonomous tool-calling within a workflow. It runs an agent loop to completion and stores the final result in state via setter.

func NewAgentStep added in v0.2.4

func NewAgentStep[S any](
	name string,
	chatClient chat.Client,
	registry *tool.Registry,
	prompt PromptFunc[S],
	setter func(*S, *AgentResult),
	agentOpts []agent.Option,
	chatOpts ...ai.Option,
) *AgentStep[S]

NewAgentStep creates a step that runs an autonomous tool-calling agent.

Parameters:

  • name: Unique identifier for the step
  • chatClient: Client supporting ChatStream for the agent
  • registry: Tool registry with registered handlers
  • prompt: Function that builds initial messages from state
  • setter: Function that stores the result in state (nil to skip storage)
  • agentOpts: Options passed to agent.Run/RunStream
  • chatOpts: Options passed to each chat call

Example:

registry := tool.NewRegistry()
tool.RegisterFunc(registry, "search", "Search the web", searchHandler)

step := workflow.NewAgentStep[MyState](
    "research",
    client,
    registry,
    func(s *MyState) []ai.Message {
        return []ai.Message{{
            Role: ai.RoleUser,
            Content: fmt.Sprintf("Research %s and provide a summary", s.Topic),
        }}
    },
    func(s *MyState, r *AgentResult) { s.ResearchResult = r },
    []agent.Option{agent.WithMaxSteps(5)},
    ai.WithModel(model.Claude35Sonnet),
)

func (*AgentStep[S]) Name added in v0.2.4

func (a *AgentStep[S]) Name() string

Name returns the step name.

func (*AgentStep[S]) Run added in v0.2.4

func (a *AgentStep[S]) Run(ctx context.Context, state *S, opts ...Option) error

Run executes the agent to completion.

func (*AgentStep[S]) RunStream added in v0.2.4

func (a *AgentStep[S]) RunStream(ctx context.Context, state *S, opts ...Option) <-chan Event

RunStream executes the agent and emits mapped workflow events.

type Aggregator

type Aggregator[S any] func(state *S, branches map[string]*S, errors map[string]error) error

Aggregator combines results from parallel steps into the shared state. Each branch runs with a deep copy of state; aggregator merges branch states back. The errors map contains any step failures when ContinueOnError is true.

type Chain

type Chain[S any] struct {
	// contains filtered or unexported fields
}

Chain executes steps sequentially, passing state between them.

func NewChain

func NewChain[S any](name string, steps ...Step[S]) *Chain[S]

NewChain creates a sequential workflow.

func (*Chain[S]) Name

func (c *Chain[S]) Name() string

Name returns the chain name.

func (*Chain[S]) Run

func (c *Chain[S]) Run(ctx context.Context, state *S, opts ...Option) error

Run executes steps sequentially.

func (*Chain[S]) RunStream

func (c *Chain[S]) RunStream(ctx context.Context, state *S, opts ...Option) <-chan Event

RunStream executes steps sequentially and emits events.

type ClassifierRouter

type ClassifierRouter[S any] struct {
	// contains filtered or unexported fields
}

ClassifierRouter uses an LLM to classify input and route accordingly.

func NewClassifierRouter

func NewClassifierRouter[S any](
	name string,
	c chat.Client,
	prompt PromptFunc[S],
	routes map[string]Step[S],
	opts ...ai.Option,
) *ClassifierRouter[S]

NewClassifierRouter creates a router that uses LLM classification. The LLM response should match one of the route keys (case-insensitive). For more reliable classification, use ClassifierSchema().

func (*ClassifierRouter[S]) Name

func (c *ClassifierRouter[S]) Name() string

Name returns the router name.

func (*ClassifierRouter[S]) Run

func (c *ClassifierRouter[S]) Run(ctx context.Context, state *S, opts ...Option) error

Run classifies input and executes the matching route.

func (*ClassifierRouter[S]) RunStream

func (c *ClassifierRouter[S]) RunStream(ctx context.Context, state *S, opts ...Option) <-chan Event

RunStream classifies input with streaming and executes the matching route.

type Condition

type Condition[S any] func(ctx context.Context, state *S) bool

Condition determines if a route should be taken.

type ErrorHandler

type ErrorHandler func(ctx context.Context, stepName string, err error) error

ErrorHandler is called when a step encounters an error. The handler determines how to proceed:

  • Return nil: Error is suppressed (handled gracefully)
  • Return an error: The returned error is propagated (may be transformed)

When used with ContinueOnError:

  • Handler returns nil + ContinueOnError=true: Continue to next step
  • Handler returns nil + ContinueOnError=false: Stop workflow successfully (no error)
  • Handler returns error: Stop workflow with that error (regardless of ContinueOnError)

type Event

type Event = event.Event

Event is an alias to the unified event type. Workflow events use these event.Type values:

  • event.RunStart, event.RunEnd, event.RunError
  • event.StepStart, event.StepEnd, event.StepSkipped
  • event.MessageStart, event.MessageDelta, event.MessageEnd
  • event.ToolCallStart, event.ToolCallArgs, event.ToolCallEnd, event.ToolCallResult
  • event.ParallelStart, event.ParallelEnd
  • event.RouteSelected, event.LoopIteration
  • event.StateSnapshot, event.StateDelta

type ExitCondition added in v0.2.8

type ExitCondition[S any] func(ctx context.Context, state *S, iteration int) bool

ExitCondition determines when the loop should stop. Return true to EXIT the loop, false to continue iterating. The iteration parameter is 1-indexed (first iteration is 1).

type FuncStep

type FuncStep[S any] struct {
	// contains filtered or unexported fields
}

FuncStep wraps a function as a Step.

func NewFuncStep

func NewFuncStep[S any](name string, fn StepFunc[S]) *FuncStep[S]

NewFuncStep creates a step from a function.

func (*FuncStep[S]) Name

func (f *FuncStep[S]) Name() string

Name returns the step name.

func (*FuncStep[S]) Run

func (f *FuncStep[S]) Run(ctx context.Context, state *S, opts ...Option) error

Run executes the function.

func (*FuncStep[S]) RunStream

func (f *FuncStep[S]) RunStream(ctx context.Context, state *S, opts ...Option) <-chan Event

RunStream executes the function and emits events.

type Loop

type Loop[S any] struct {
	// contains filtered or unexported fields
}

Loop repeatedly executes a step until a condition returns true. Use for iterative refinement workflows where steps need to repeat based on evaluation results stored in state.

func NewLoopN added in v0.2.8

func NewLoopN[S any](name string, step Step[S], n int) *Loop[S]

NewLoopN creates a loop that executes exactly n times.

Example:

loop := NewLoopN[MyState]("retry", step, 3)  // retry 3 times

func NewLoopUntil added in v0.2.3

func NewLoopUntil[S any](
	name string,
	step Step[S],
	predicate func(*S) bool,
	opts ...LoopOption,
) *Loop[S]

NewLoopUntil creates a loop that exits when the predicate returns true. This is the recommended way to create loops for most use cases.

Example:

loop := NewLoopUntil[MyState]("process", step,
    func(s *MyState) bool { return s.Done },
)

func NewLoopWhile added in v0.2.3

func NewLoopWhile[S any](
	name string,
	step Step[S],
	predicate func(*S) bool,
	opts ...LoopOption,
) *Loop[S]

NewLoopWhile creates a loop that continues while the predicate returns true. Exits when the predicate returns false.

Example:

loop := NewLoopWhile[MyState]("retry", step,
    func(s *MyState) bool { return s.NeedsRetry },
)

func NewLoopWithExitCondition added in v0.2.8

func NewLoopWithExitCondition[S any](
	name string,
	step Step[S],
	exitCondition ExitCondition[S],
	opts ...LoopOption,
) *Loop[S]

NewLoopWithExitCondition creates a loop with a custom exit condition. Use this for complex conditions that can't be expressed with the simple helpers.

The exit condition receives the context, state, and current iteration (1-indexed). Return true to exit the loop, false to continue.

Example:

loop := NewLoopWithExitCondition[MyState]("refine", step,
    func(ctx context.Context, s *MyState, iter int) bool {
        return iter >= 5 || s.Quality > 0.95
    },
)

func (*Loop[S]) Name

func (l *Loop[S]) Name() string

Name returns the loop name.

func (*Loop[S]) Run

func (l *Loop[S]) Run(ctx context.Context, state *S, opts ...Option) error

Run executes the step repeatedly until the exit condition returns true.

func (*Loop[S]) RunStream

func (l *Loop[S]) RunStream(ctx context.Context, state *S, opts ...Option) <-chan Event

RunStream executes the step repeatedly and emits events.

type LoopOption

type LoopOption func(*loopConfig)

LoopOption configures a Loop.

func WithMaxIterations

func WithMaxIterations(n int) LoopOption

WithMaxIterations sets the maximum number of loop iterations. Default is 10.

type Option

type Option func(*Options)

Option is a functional option for workflow configuration.

func WithChatOptions

func WithChatOptions(opts ...ai.Option) Option

WithChatOptions passes options to LLM calls.

func WithContinueOnError

func WithContinueOnError(enabled bool) Option

WithContinueOnError allows the workflow to continue after errors.

func WithErrorHandler

func WithErrorHandler(fn ErrorHandler) Option

WithErrorHandler sets a custom error handler.

func WithMaxConcurrency

func WithMaxConcurrency(n int) Option

WithMaxConcurrency limits parallel step execution. A value of 0 means unlimited concurrency.

func WithMaxTokens

func WithMaxTokens(n int) Option

WithMaxTokens is a convenience option to set max tokens for chat calls.

func WithModel

func WithModel(model ai.Model) Option

WithModel is a convenience option to set the model for chat calls.

func WithStepTimeout

func WithStepTimeout(d time.Duration) Option

WithStepTimeout sets the default timeout for each step.

func WithTemperature

func WithTemperature(t float64) Option

WithTemperature is a convenience option to set temperature for chat calls.

func WithTimeout

func WithTimeout(d time.Duration) Option

WithTimeout sets the overall workflow timeout.

type Options

type Options struct {
	// Timeout sets a deadline for the entire workflow.
	Timeout time.Duration

	// StepTimeout sets default timeout for individual steps.
	StepTimeout time.Duration

	// MaxConcurrency limits parallel step execution (0 = unlimited).
	MaxConcurrency int

	// ErrorHandler is called on step errors. See ErrorHandler type for semantics.
	ErrorHandler ErrorHandler

	// ContinueOnError affects behavior when ErrorHandler returns nil (suppresses error).
	// If true, workflow continues to next step. If false, workflow stops successfully.
	// Has no effect when ErrorHandler returns an error (workflow always stops with error).
	ContinueOnError bool

	// ChatOptions are passed to LLM calls within steps.
	ChatOptions []ai.Option
}

Options contains configuration for workflow execution.

func ApplyOptions

func ApplyOptions(opts ...Option) *Options

ApplyOptions applies functional options with defaults.

type Parallel

type Parallel[S any] struct {
	// contains filtered or unexported fields
}

Parallel executes steps concurrently and aggregates results.

func NewParallel

func NewParallel[S any](name string, steps []Step[S], aggregator Aggregator[S]) *Parallel[S]

NewParallel creates a parallel workflow. The aggregator is called with all results after all steps complete. If aggregator is nil, no automatic merging occurs (user handles via aggregator).

func (*Parallel[S]) Name

func (p *Parallel[S]) Name() string

Name returns the parallel workflow name.

func (*Parallel[S]) Run

func (p *Parallel[S]) Run(ctx context.Context, state *S, opts ...Option) error

Run executes steps concurrently.

func (*Parallel[S]) RunStream

func (p *Parallel[S]) RunStream(ctx context.Context, state *S, opts ...Option) <-chan Event

RunStream executes steps concurrently and emits events.

type ParallelError

type ParallelError struct {
	Errors map[string]error
}

ParallelError wraps errors from parallel execution.

func (*ParallelError) Error

func (e *ParallelError) Error() string

Error returns a formatted message summarizing the parallel execution failures.

func (*ParallelError) Unwrap

func (e *ParallelError) Unwrap() error

Unwrap returns the first error for errors.Is/As compatibility.

type PromptFunc

type PromptFunc[S any] func(state *S) []ai.Message

PromptFunc generates messages from state for an LLM call.

type PromptStep

type PromptStep[S, T any] struct {
	// contains filtered or unexported fields
}

PromptStep makes a single LLM call and stores the result in a state field. Generic over state type S and field type T.

When schema is non-nil, the response is unmarshaled as JSON into the field. When schema is nil, T must be string and the response is assigned directly. Run() returns error only - results are stored in state via the field getter.

func NewPromptStep

func NewPromptStep[S, T any](
	name string,
	c chat.Client,
	prompt PromptFunc[S],
	schema *ai.ResponseSchema,
	field func(*S) *T,
	opts ...ai.Option,
) *PromptStep[S, T]

NewPromptStep creates a step for a single LLM call. The field getter returns a pointer to where the result should be stored. Type parameters are inferred from the function arguments.

For plain text (schema = nil):

step := NewPromptStep("summarize", client, promptFn, nil,
    func(s *MyState) *string { return &s.Summary },
)

For structured JSON (schema required):

step := NewPromptStep("analyze", client, promptFn, schema,
    func(s *MyState) *Analysis { return &s.Analysis },
)

func (*PromptStep[S, T]) Name

func (p *PromptStep[S, T]) Name() string

Name returns the step name.

func (*PromptStep[S, T]) Run

func (p *PromptStep[S, T]) Run(ctx context.Context, state *S, opts ...Option) error

Run executes the LLM call.

func (*PromptStep[S, T]) RunStream

func (p *PromptStep[S, T]) RunStream(ctx context.Context, state *S, opts ...Option) <-chan Event

RunStream executes the LLM call with streaming.

type Registry added in v0.2.9

type Registry struct {
	// contains filtered or unexported fields
}

Registry stores and retrieves Runners by name. It provides named workflow dispatch for AG-UI server integration.

func NewRegistry added in v0.2.9

func NewRegistry() *Registry

NewRegistry creates a new workflow registry.

func (*Registry) Get added in v0.2.9

func (r *Registry) Get(name string) Runner

Get retrieves a Runner by name. Returns nil if no runner with the given name exists.

func (*Registry) Has added in v0.2.9

func (r *Registry) Has(name string) bool

Has returns true if a runner with the given name exists.

func (*Registry) Len added in v0.2.9

func (r *Registry) Len() int

Len returns the number of registered runners.

func (*Registry) Names added in v0.2.9

func (r *Registry) Names() []string

Names returns all registered workflow names.

func (*Registry) Register added in v0.2.9

func (r *Registry) Register(runner Runner)

Register adds a Runner to the registry. If a runner with the same name already exists, it is replaced.

func (*Registry) RunStream added in v0.2.9

func (r *Registry) RunStream(ctx context.Context, name string, input any, opts ...Option) <-chan Event

RunStream executes the named workflow and returns an event stream. Returns an error channel if the workflow is not found.

func (*Registry) Unregister added in v0.2.9

func (r *Registry) Unregister(name string)

Unregister removes a Runner from the registry.

type Result

type Result[S any] struct {
	// WorkflowName identifies the workflow.
	WorkflowName string

	// State contains the final state after execution.
	// All step outputs are stored in state fields via setters.
	State *S

	// Termination indicates why execution stopped.
	Termination TerminationReason

	// Error contains any error that caused termination.
	Error error
}

Result represents the final outcome of workflow execution. State contains all output from the workflow - access results via state fields.

type RetryStep added in v0.2.8

type RetryStep[S any] struct {
	// contains filtered or unexported fields
}

RetryStep wraps a step with retry logic. Transient errors are retried with exponential backoff.

func NewRetryStep added in v0.2.8

func NewRetryStep[S any](name string, step Step[S]) *RetryStep[S]

NewRetryStep creates a step that retries on transient errors. Uses the default retry configuration (10 attempts, exponential backoff).

Example:

step := NewRetryStep("fetch-with-retry", fetchStep)

func NewRetryStepWithConfig added in v0.2.8

func NewRetryStepWithConfig[S any](name string, step Step[S], config retry.Config) *RetryStep[S]

NewRetryStepWithConfig creates a step with custom retry configuration.

Example:

cfg := retry.Config{
    MaxAttempts:  3,
    InitialDelay: 500 * time.Millisecond,
    MaxDelay:     5 * time.Second,
    Multiplier:   2.0,
    Jitter:       0.1,
}
step := NewRetryStepWithConfig("fetch", fetchStep, cfg)

func (*RetryStep[S]) Name added in v0.2.8

func (r *RetryStep[S]) Name() string

Name returns the step name.

func (*RetryStep[S]) Run added in v0.2.8

func (r *RetryStep[S]) Run(ctx context.Context, state *S, opts ...Option) error

Run executes the wrapped step with retry logic.

func (*RetryStep[S]) RunStream added in v0.2.8

func (r *RetryStep[S]) RunStream(ctx context.Context, state *S, opts ...Option) <-chan Event

RunStream executes the wrapped step with retry logic and emits events. Retry events are emitted to provide observability into retry attempts.

type Route

type Route[S any] struct {
	Name      string
	Condition Condition[S]
	Step      Step[S]
}

Route represents a conditional path in a router.

type RouteResult added in v0.2.8

type RouteResult struct {
	RouteName      string // Name of the route that was taken
	Classification string // For ClassifierRouter: the LLM classification
}

RouteResult can be stored in state to record which route was taken. Store this via a setter if you need to access route information.

type Router

type Router[S any] struct {
	// contains filtered or unexported fields
}

Router selects and executes a step based on conditions.

func NewRouter

func NewRouter[S any](name string, routes []Route[S], defaultRoute Step[S]) *Router[S]

NewRouter creates a conditional router. Routes are evaluated in order; first match wins. Default route is used if no conditions match (can be nil).

func (*Router[S]) Name

func (r *Router[S]) Name() string

Name returns the router name.

func (*Router[S]) Run

func (r *Router[S]) Run(ctx context.Context, state *S, opts ...Option) error

Run evaluates conditions and executes the matching step.

func (*Router[S]) RunStream

func (r *Router[S]) RunStream(ctx context.Context, state *S, opts ...Option) <-chan Event

RunStream evaluates conditions and streams the matching step's events.

type Runner added in v0.2.9

type Runner interface {
	// Name returns the workflow's unique identifier.
	Name() string

	// RunStream executes the workflow with the given input state and returns an event stream.
	// The input is decoded into the workflow's state type.
	// The state parameter may be nil if no initial state is provided.
	RunStream(ctx context.Context, state any, opts ...Option) <-chan Event
}

Runner is a type-erased interface for executing workflows. It allows workflows with different state types to be stored and executed uniformly without knowing the concrete state type at compile time.

This interface is designed for AG-UI server integration where workflows need to be dispatched by name with untyped input.

func NewRunner added in v0.2.9

func NewRunner[S any](name string, step Step[S], factory func(input any) (*S, error)) Runner

NewRunner creates a Runner from a Step[S] and a state factory.

The factory function receives the untyped input (typically map[string]any from JSON) and should return an initialized state pointer. If input is nil, the factory should return a zero-initialized state.

Example:

type MyState struct {
    Query   string `json:"query"`
    Results []string
}

runner := workflow.NewRunner("search", myWorkflow, func(input any) (*MyState, error) {
    state := &MyState{}
    if input != nil {
        data, _ := json.Marshal(input)
        json.Unmarshal(data, state)
    }
    return state, nil
})

func NewRunnerJSON added in v0.2.9

func NewRunnerJSON[S any](name string, step Step[S]) Runner

NewRunnerJSON creates a Runner that decodes JSON input into the state type. This is a convenience function for the common case of JSON-encoded input.

Example:

type MyState struct {
    Query string `json:"query"`
}

runner := workflow.NewRunnerJSON[MyState]("search", myWorkflow)

type RunnerFunc added in v0.2.9

type RunnerFunc[S any] struct {
	// contains filtered or unexported fields
}

RunnerFunc wraps a Step[S] as a Runner using a state factory function. The factory creates a new state instance and optionally initializes it from input.

func (*RunnerFunc[S]) Name added in v0.2.9

func (r *RunnerFunc[S]) Name() string

Name returns the workflow name.

func (*RunnerFunc[S]) RunStream added in v0.2.9

func (r *RunnerFunc[S]) RunStream(ctx context.Context, input any, opts ...Option) <-chan Event

RunStream executes the workflow and returns an event stream.

type StateEmitter added in v0.2.9

type StateEmitter interface {
	// EmitSnapshot sends a complete state snapshot to the frontend.
	// Use this for initial state or when delta tracking becomes complex.
	EmitSnapshot(state any)

	// EmitDelta sends incremental state changes using JSON Patch (RFC 6902).
	// More efficient than snapshots for small, frequent updates.
	EmitDelta(patches ...event.JSONPatch)
}

StateEmitter allows workflow steps to emit state change notifications for AG-UI shared state synchronization. Steps can send full snapshots or incremental patches to keep the frontend in sync.

func NewChannelEmitter added in v0.2.9

func NewChannelEmitter(ch chan<- Event, stepName string) StateEmitter

NewChannelEmitter creates a StateEmitter that sends events to the given channel. The stepName is included in emitted events for tracing.

func NewNoOpEmitter added in v0.2.9

func NewNoOpEmitter() StateEmitter

NewNoOpEmitter creates a StateEmitter that discards all emissions.

type StatefulFuncStep added in v0.2.9

type StatefulFuncStep[S any] struct {
	// contains filtered or unexported fields
}

StatefulFuncStep wraps a function that can emit state changes. Use this step type when you need to synchronize state with the frontend during step execution (e.g., progress updates, intermediate results).

func NewStatefulFuncStep added in v0.2.9

func NewStatefulFuncStep[S any](name string, fn StatefulStepFunc[S]) *StatefulFuncStep[S]

NewStatefulFuncStep creates a step from a function that can emit state changes.

Example:

step := workflow.NewStatefulFuncStep("process", func(ctx context.Context, state *MyState, emit workflow.StateEmitter) error {
    for i := 0; i < 10; i++ {
        state.Progress = (i + 1) * 10
        emit.EmitDelta(event.Replace("/progress", state.Progress))
    }
    return nil
})

func (*StatefulFuncStep[S]) Name added in v0.2.9

func (f *StatefulFuncStep[S]) Name() string

Name returns the step name.

func (*StatefulFuncStep[S]) Run added in v0.2.9

func (f *StatefulFuncStep[S]) Run(ctx context.Context, state *S, opts ...Option) error

Run executes the function with a no-op emitter (state events are discarded).

func (*StatefulFuncStep[S]) RunStream added in v0.2.9

func (f *StatefulFuncStep[S]) RunStream(ctx context.Context, state *S, opts ...Option) <-chan Event

RunStream executes the function and emits events including state changes.

type StatefulStepFunc added in v0.2.9

type StatefulStepFunc[S any] func(ctx context.Context, state *S, emit StateEmitter) error

StatefulStepFunc is a function signature for steps that can emit state changes. The StateEmitter allows sending state snapshots or deltas for AG-UI synchronization.

type Step

type Step[S any] interface {
	// Name returns a unique identifier for the step.
	Name() string

	// Run executes the step and mutates state in place.
	Run(ctx context.Context, state *S, opts ...Option) error

	// RunStream executes the step and returns a channel of events.
	// State is mutated in place during streaming.
	RunStream(ctx context.Context, state *S, opts ...Option) <-chan Event
}

Step represents a single unit of work in a workflow. Steps are generic over the state type S, providing compile-time type safety. State is passed by pointer and mutated in place - the caller retains access to the final state.

type StepError

type StepError struct {
	StepName string
	Err      error
}

StepError wraps errors from step execution.

func (*StepError) Error

func (e *StepError) Error() string

Error returns a formatted error message including the step name.

func (*StepError) Unwrap

func (e *StepError) Unwrap() error

Unwrap returns the underlying error for use with errors.Is and errors.As.

type StepFunc

type StepFunc[S any] func(ctx context.Context, state *S) error

StepFunc is a function signature for simple step implementations.

type TerminationReason

type TerminationReason string

TerminationReason indicates why the workflow stopped.

const (
	// TerminationComplete indicates normal completion.
	TerminationComplete TerminationReason = "complete"

	// TerminationTimeout indicates the deadline was exceeded.
	TerminationTimeout TerminationReason = "timeout"

	// TerminationCancelled indicates context cancellation.
	TerminationCancelled TerminationReason = "cancelled"

	// TerminationError indicates an error occurred.
	TerminationError TerminationReason = "error"
)

type ToolExecutionError added in v0.2.4

type ToolExecutionError struct {
	ToolName string // Name of the tool that failed
	Content  string // Error content returned by the tool
}

ToolExecutionError indicates a tool returned an error result.

func (*ToolExecutionError) Error added in v0.2.4

func (e *ToolExecutionError) Error() string

Error returns a formatted error message.

type ToolOption added in v0.2.9

type ToolOption func(*toolConfig)

ToolOption configures a workflow tool.

func WithToolDescription added in v0.2.9

func WithToolDescription(desc string) ToolOption

WithToolDescription sets a custom description for the workflow tool.

func WithToolResultMapper added in v0.2.9

func WithToolResultMapper(mapper func(events []event.Event) (string, error)) ToolOption

WithToolResultMapper sets a custom function to convert workflow events to a tool result. By default, the mapper returns the last non-error message content.

func WithToolWorkflowOptions added in v0.2.9

func WithToolWorkflowOptions(opts ...Option) ToolOption

WithToolWorkflowOptions passes options through to the workflow.

type ToolStep added in v0.2.4

type ToolStep[S, T any] struct {
	// contains filtered or unexported fields
}

ToolStep executes a single tool with typed arguments. The typed arguments are preserved in the output, allowing downstream steps to access the original structured args.

func NewToolStep added in v0.2.4

func NewToolStep[S, T any](
	name string,
	registry *tool.Registry,
	toolName string,
	argsFunc func(*S) (T, error),
	setter func(*S, *ToolStepOutput[T]),
) *ToolStep[S, T]

NewToolStep creates a step that executes a tool with typed arguments. The setter receives both the typed arguments and the result string.

Parameters:

  • name: Unique identifier for the step
  • registry: Tool registry containing the tool handler
  • toolName: Name of the tool to execute
  • argsFunc: Function that builds typed tool arguments from state
  • setter: Function that stores the output in state

Example:

type SearchArgs struct {
    Query string `json:"query"`
    Limit int    `json:"limit"`
}

step := NewToolStep[MyState, SearchArgs]("search", registry, "web_search",
    func(s *MyState) (SearchArgs, error) {
        return SearchArgs{Query: s.Topic, Limit: 10}, nil
    },
    func(s *MyState, out *ToolStepOutput[SearchArgs]) {
        s.SearchResult = out.Result
        s.LastQuery = out.Args.Query  // Access typed args
    },
)

func (*ToolStep[S, T]) Name added in v0.2.4

func (t *ToolStep[S, T]) Name() string

Name returns the step name.

func (*ToolStep[S, T]) Run added in v0.2.4

func (t *ToolStep[S, T]) Run(ctx context.Context, state *S, opts ...Option) error

Run executes the tool.

func (*ToolStep[S, T]) RunStream added in v0.2.4

func (t *ToolStep[S, T]) RunStream(ctx context.Context, state *S, opts ...Option) <-chan Event

RunStream executes the tool and emits events.

type ToolStepOutput added in v0.2.8

type ToolStepOutput[T any] struct {
	Args   T      // Original typed arguments
	Result string // Tool execution result
}

ToolStepOutput holds both the typed arguments and string result from a tool execution.

type Workflow

type Workflow[S any] struct {
	// contains filtered or unexported fields
}

Workflow is the top-level orchestrator that wraps a root step. It provides the primary entry point for workflow execution.

func New

func New[S any](name string, root Step[S]) *Workflow[S]

New creates a new workflow with a root step.

func (*Workflow[S]) Name

func (w *Workflow[S]) Name() string

Name returns the workflow name.

func (*Workflow[S]) Run

func (w *Workflow[S]) Run(ctx context.Context, state *S, opts ...Option) (*Result[S], error)

Run executes the workflow synchronously. State is mutated in place - access results via state fields after completion. The state parameter must not be nil.

func (*Workflow[S]) RunStream

func (w *Workflow[S]) RunStream(ctx context.Context, state *S, opts ...Option) <-chan Event

RunStream executes the workflow and returns an event channel. State is mutated in place during streaming. The state parameter must not be nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL