workflow

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package workflow provides composable patterns for orchestrating AI-powered pipelines.

The package implements four core workflow patterns:

  • Chain: Sequential execution where output flows to the next step
  • Parallel: Concurrent execution with result aggregation
  • Router: Conditional branching based on predicates or LLM classification
  • Loop: Iterative execution until a condition is met

All workflow types implement the Step interface, enabling arbitrary nesting and composition of patterns.

Basic Usage

Create a simple chain workflow:

chain := workflow.NewChain("my-chain",
	workflow.NewFuncStep("setup", func(ctx context.Context, state *workflow.State) error {
		state.Set("input", "Hello, World!")
		return nil
	}),
	workflow.NewPromptStep("process", provider,
		func(s *workflow.State) []gains.Message {
			return []gains.Message{
				{Role: gains.RoleUser, Content: s.GetString("input")},
			}
		},
		"output",
	),
)

wf := workflow.New("my-workflow", chain)
result, err := wf.Run(context.Background(), workflow.NewState())

Parallel Execution

Execute multiple steps concurrently:

parallel := workflow.NewParallel("research", steps,
	func(state *workflow.State, results map[string]*workflow.StepResult) error {
		// Aggregate results
		var combined string
		for name, result := range results {
			combined += fmt.Sprintf("%s: %v\n", name, result.Output)
		}
		state.Set("combined", combined)
		return nil
	},
)

Conditional Routing

Route based on state conditions:

router := workflow.NewRouter("route",
	[]workflow.Route{
		{
			Name: "high-priority",
			Condition: func(ctx context.Context, s *workflow.State) bool {
				return s.GetString("priority") == "high"
			},
			Step: highPriorityStep,
		},
	},
	defaultStep,
)

Or use LLM-based classification:

classifier := workflow.NewClassifierRouter("classify", provider,
	func(s *workflow.State) []gains.Message {
		return []gains.Message{
			{Role: gains.RoleSystem, Content: "Classify as: billing, technical, general"},
			{Role: gains.RoleUser, Content: s.GetString("ticket")},
		}
	},
	map[string]workflow.Step{
		"billing":   billingStep,
		"technical": technicalStep,
		"general":   generalStep,
	},
)

Iterative Loops

Repeat steps until a condition is met:

// Create a content creator that reads feedback on subsequent iterations
creator := workflow.NewPromptStep("creator", provider,
	func(s *workflow.State) []gains.Message {
		feedback := s.GetString("feedback")
		if feedback == "" {
			return []gains.Message{{Role: gains.RoleUser, Content: "Write a blog post about Go"}}
		}
		return []gains.Message{
			{Role: gains.RoleUser, Content: "Write a blog post about Go"},
			{Role: gains.RoleAssistant, Content: s.GetString("draft")},
			{Role: gains.RoleUser, Content: "Revise based on: " + feedback},
		}
	},
	workflow.WithOutputKey("draft"),
)

// Create an editor that approves or provides feedback
editor := workflow.NewPromptStep("editor", provider,
	func(s *workflow.State) []gains.Message {
		return []gains.Message{{
			Role: gains.RoleUser,
			Content: "Review this draft. Reply APPROVED or provide feedback:\n\n" +
				s.GetString("draft"),
		}}
	},
	workflow.WithOutputKey("feedback"),
)

// Combine into a chain and loop until approved
reviewCycle := workflow.NewChain("review-cycle", creator, editor)
loop := workflow.NewLoop("content-loop", reviewCycle,
	func(ctx context.Context, s *workflow.State) bool {
		return strings.Contains(s.GetString("feedback"), "APPROVED")
	},
	workflow.WithMaxIterations(5),
)

Streaming Events

Monitor workflow progress in real-time:

events := wf.RunStream(ctx, state)
for event := range events {
	switch event.Type {
	case workflow.EventStepStart:
		fmt.Printf("Starting: %s\n", event.StepName)
	case workflow.EventStreamDelta:
		fmt.Print(event.Delta)
	case workflow.EventStepComplete:
		fmt.Printf("Completed: %s\n", event.StepName)
	case workflow.EventError:
		fmt.Printf("Error: %v\n", event.Error)
	}
}

Composability

Workflows can be nested since all patterns implement Step:

outer := workflow.NewChain("outer",
	setupStep,
	workflow.NewParallel("inner-parallel", parallelSteps, nil),
	workflow.NewRouter("inner-router", routes, nil),
	workflow.NewLoop("inner-loop", refinementStep, condition),
	finalStep,
)

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrWorkflowTimeout indicates the workflow exceeded its timeout.
	ErrWorkflowTimeout = errors.New("workflow: timeout exceeded")

	// ErrWorkflowCancelled indicates the workflow was cancelled.
	ErrWorkflowCancelled = errors.New("workflow: cancelled")

	// ErrNoRouteMatched indicates no route condition was satisfied.
	ErrNoRouteMatched = errors.New("workflow: no route matched")

	// ErrStepNotFound indicates a referenced step does not exist.
	ErrStepNotFound = errors.New("workflow: step not found")

	// ErrMaxIterationsExceeded indicates a loop reached its iteration limit.
	ErrMaxIterationsExceeded = errors.New("workflow: maximum loop iterations exceeded")
)

Functions

func ClassifierSchema

func ClassifierSchema(routes map[string]Step) ai.Option

ClassifierSchema returns a ai.Option that enforces structured output for classification. Use with providers that support JSON schema (OpenAI, Anthropic). Note: May not work with streaming on all providers.

func Delete added in v0.2.3

func Delete[T any](s *State, key Key[T])

Delete removes a typed key from state.

func Get added in v0.2.3

func Get[T any](s *State, key Key[T]) (T, bool)

Get retrieves a value from state using a typed key. Returns the typed value and true if the key exists and the type matches. Returns the zero value and false if the key is missing or type mismatches.

Example:

analysis, ok := workflow.Get(state, KeyAnalysis)
if ok {
    fmt.Println(analysis.Sentiment)
}

func GetOr added in v0.2.3

func GetOr[T any](s *State, key Key[T], defaultVal T) T

GetOr retrieves a value from state using a typed key. Returns defaultVal if the key is missing or the type mismatches.

Example:

retries := workflow.GetOr(state, KeyRetryCount, 0)

func Has added in v0.2.3

func Has[T any](s *State, key Key[T]) bool

Has returns true if the typed key exists in state.

func MustGet added in v0.2.1

func MustGet[T any](s *State, key Key[T]) T

MustGet retrieves a value from state using a typed key. Panics if the key is missing or the value cannot be asserted to type T. Use when you are certain the key exists with the correct type.

Example:

analysis := workflow.MustGet(state, KeyAnalysis)
fmt.Println(analysis.Sentiment)

func Set added in v0.2.3

func Set[T any](s *State, key Key[T], value T)

Set stores a value in state using a typed key. The compiler enforces that value has the correct type for the key.

Example:

workflow.Set(state, KeyAnalysis, &AnalysisResult{...})

func SetIfAbsent added in v0.2.3

func SetIfAbsent[T any](s *State, key Key[T], value T) bool

SetIfAbsent stores a value only if the key does not exist. Returns true if the value was set, false if the key already existed.

func Update added in v0.2.3

func Update[T any](s *State, key Key[T], fn func(T) T) T

Update applies a function to the current value and stores the result. If the key doesn't exist, applies the function to the zero value. Returns the new value.

Types

type Aggregator

type Aggregator func(state *State, results map[string]*StepResult) error

Aggregator combines results from parallel steps into the shared state.

type Chain

type Chain struct {
	// contains filtered or unexported fields
}

Chain executes steps sequentially, passing state between them.

func NewChain

func NewChain(name string, steps ...Step) *Chain

NewChain creates a sequential workflow.

func (*Chain) Name

func (c *Chain) Name() string

Name returns the chain name.

func (*Chain) Run

func (c *Chain) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)

Run executes steps sequentially.

func (*Chain) RunStream

func (c *Chain) RunStream(ctx context.Context, state *State, opts ...Option) <-chan Event

RunStream executes steps sequentially and emits events.

type ChatClient

type ChatClient interface {
	Chat(ctx context.Context, messages []ai.Message, opts ...ai.Option) (*ai.Response, error)
	ChatStream(ctx context.Context, messages []ai.Message, opts ...ai.Option) (<-chan ai.StreamEvent, error)
}

ChatClient is the interface for chat capabilities needed by workflow steps.

type ClassifierRouter

type ClassifierRouter struct {
	// contains filtered or unexported fields
}

ClassifierRouter uses an LLM to classify input and route accordingly.

func NewClassifierRouter

func NewClassifierRouter(
	name string,
	c ChatClient,
	prompt PromptFunc,
	routes map[string]Step,
	opts ...ai.Option,
) *ClassifierRouter

NewClassifierRouter creates a router that uses LLM classification. The LLM response should match one of the route keys (case-insensitive). For more reliable classification, use ClassifierSchema().

func (*ClassifierRouter) ClassificationKey added in v0.2.3

func (c *ClassifierRouter) ClassificationKey() Key[string]

ClassificationKey returns a typed key for the raw classification result. The key name follows the pattern "{routerName}_classification".

func (*ClassifierRouter) Name

func (c *ClassifierRouter) Name() string

Name returns the router name.

func (*ClassifierRouter) RouteKey added in v0.2.3

func (c *ClassifierRouter) RouteKey() Key[string]

RouteKey returns a typed key for the selected route name. The key name follows the pattern "{routerName}_route".

func (*ClassifierRouter) Run

func (c *ClassifierRouter) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)

Run classifies input and executes the matching route.

func (*ClassifierRouter) RunStream

func (c *ClassifierRouter) RunStream(ctx context.Context, state *State, opts ...Option) <-chan Event

RunStream classifies input with streaming and executes the matching route.

type Condition

type Condition func(ctx context.Context, state *State) bool

Condition determines if a route should be taken.

func ConditionEquals added in v0.2.3

func ConditionEquals[T comparable](key Key[T], value T) Condition

ConditionEquals returns a condition that matches when the key equals value.

func ConditionMatches added in v0.2.3

func ConditionMatches[T any](key Key[T], pred func(T) bool) Condition

ConditionMatches returns a condition using a predicate function on the key value.

func ConditionSet added in v0.2.3

func ConditionSet[T any](key Key[T]) Condition

ConditionSet returns a condition that matches when the key exists in state.

type ErrorHandler

type ErrorHandler func(ctx context.Context, stepName string, err error) error

ErrorHandler is called when a step encounters an error. Return nil to suppress the error, or return an error to propagate it.

type Event

type Event struct {
	// Type identifies the kind of event.
	Type EventType

	// StepName identifies the step that produced this event.
	StepName string

	// Delta contains streaming content for EventStreamDelta.
	Delta string

	// Result contains step result for EventStepComplete.
	Result *StepResult

	// ToolCall contains tool call info for EventToolCall.
	ToolCall *ai.ToolCall

	// ParallelResults contains results from parallel execution.
	ParallelResults map[string]*StepResult

	// RouteName identifies the selected route for EventRouteSelected.
	RouteName string

	// Iteration is the current loop iteration (1-indexed) for EventLoopIteration.
	Iteration int

	// Error contains the error for EventError.
	Error error

	// Message contains additional context.
	Message string

	// Timestamp is when the event occurred.
	Timestamp time.Time
}

Event represents an observable occurrence during workflow execution.

type EventType

type EventType string

EventType identifies the kind of event occurring during workflow execution.

const (
	// EventWorkflowStart fires when the workflow begins.
	EventWorkflowStart EventType = "workflow_start"

	// EventWorkflowComplete fires when the workflow finishes.
	EventWorkflowComplete EventType = "workflow_complete"

	// EventStepStart fires when a step begins execution.
	EventStepStart EventType = "step_start"

	// EventStepComplete fires when a step finishes successfully.
	EventStepComplete EventType = "step_complete"

	// EventStepSkipped fires when a step is skipped (e.g., routing).
	EventStepSkipped EventType = "step_skipped"

	// EventStreamDelta fires for streaming content from LLM.
	EventStreamDelta EventType = "stream_delta"

	// EventToolCall fires when a tool is called within a step.
	EventToolCall EventType = "tool_call"

	// EventParallelStart fires when parallel execution begins.
	EventParallelStart EventType = "parallel_start"

	// EventParallelComplete fires when all parallel branches complete.
	EventParallelComplete EventType = "parallel_complete"

	// EventRouteSelected fires when a route is chosen.
	EventRouteSelected EventType = "route_selected"

	// EventLoopIteration fires at the start of each loop iteration.
	EventLoopIteration EventType = "loop_iteration"

	// EventError fires when an error occurs.
	EventError EventType = "error"
)

type FuncStep

type FuncStep struct {
	// contains filtered or unexported fields
}

FuncStep wraps a function as a Step.

func NewFuncStep

func NewFuncStep(name string, fn StepFunc) *FuncStep

NewFuncStep creates a step from a function.

func (*FuncStep) Name

func (f *FuncStep) Name() string

Name returns the step name.

func (*FuncStep) Run

func (f *FuncStep) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)

Run executes the function.

func (*FuncStep) RunStream

func (f *FuncStep) RunStream(ctx context.Context, state *State, opts ...Option) <-chan Event

RunStream executes the function and emits events.

type Key added in v0.2.3

type Key[T any] struct {
	// contains filtered or unexported fields
}

Key represents a typed state key that associates a name with type T. Keys provide compile-time type safety for workflow state access.

Define keys as package-level variables for reuse:

var (
    KeyAnalysis = workflow.NewKey[*AnalysisResult]("analysis")
    KeyProjects = workflow.NewKey[[]*ProjectConfig]("projects")
    KeyApproved = workflow.NewKey[bool]("approved")
)

func BoolKey added in v0.2.3

func BoolKey(name string) Key[bool]

BoolKey creates a Key[bool] with the given name.

func FloatKey added in v0.2.3

func FloatKey(name string) Key[float64]

FloatKey creates a Key[float64] with the given name.

func IntKey added in v0.2.3

func IntKey(name string) Key[int]

IntKey creates a Key[int] with the given name.

func NewKey added in v0.2.3

func NewKey[T any](name string) Key[T]

NewKey creates a typed key with the given name. The type parameter T specifies the type of values stored under this key.

func StringKey added in v0.2.3

func StringKey(name string) Key[string]

StringKey creates a Key[string] with the given name.

func (Key[T]) Name added in v0.2.3

func (k Key[T]) Name() string

Name returns the string name of the key. This is the underlying key used in state storage.

func (Key[T]) String added in v0.2.3

func (k Key[T]) String() string

String implements fmt.Stringer for debugging.

type Loop

type Loop struct {
	// contains filtered or unexported fields
}

Loop repeatedly executes a step until a condition returns true. Use for iterative refinement workflows where steps need to repeat based on evaluation results stored in state.

func NewLoop

func NewLoop(name string, step Step, condition LoopCondition, opts ...LoopOption) *Loop

NewLoop creates a loop that executes step until condition returns true. The condition is checked after each iteration. If the condition never returns true, the loop terminates after maxIterations (default 10) and returns ErrMaxIterationsExceeded.

The loop stores the current iteration count in state under "{name}_iteration".

func NewLoopUntil added in v0.2.3

func NewLoopUntil(name string, step Step, key string, value any, opts ...LoopOption) *Loop

NewLoopUntil creates a loop that exits when state[key] equals the target value. This is a convenience wrapper for common "exit when key equals X" patterns.

func NewLoopUntilKey added in v0.2.3

func NewLoopUntilKey[T comparable](name string, step Step, key Key[T], value T, opts ...LoopOption) *Loop

NewLoopUntilKey creates a loop that exits when the typed key equals the target value. This provides compile-time type safety compared to NewLoopUntil.

func NewLoopUntilKeySet added in v0.2.3

func NewLoopUntilKeySet[T any](name string, step Step, key Key[T], opts ...LoopOption) *Loop

NewLoopUntilKeySet creates a loop that exits when the typed key has a truthy value.

func NewLoopUntilSet added in v0.2.3

func NewLoopUntilSet(name string, step Step, key string, opts ...LoopOption) *Loop

NewLoopUntilSet creates a loop that exits when state[key] is "truthy". A value is truthy if it exists and is non-nil, non-zero, non-empty.

func NewLoopWhile added in v0.2.3

func NewLoopWhile(name string, step Step, key string, value any, opts ...LoopOption) *Loop

NewLoopWhile creates a loop that continues while state[key] equals the target value. The loop exits when the key no longer equals the value (or is unset).

func NewLoopWhileKey added in v0.2.3

func NewLoopWhileKey[T comparable](name string, step Step, key Key[T], value T, opts ...LoopOption) *Loop

NewLoopWhileKey creates a loop that continues while the typed key equals the target value. The loop exits when the key no longer equals the value (or is unset).

func (*Loop) IterationKey added in v0.2.3

func (l *Loop) IterationKey() Key[int]

IterationKey returns a typed key for the current iteration count. The key name follows the pattern "{loopName}_iteration".

func (*Loop) Name

func (l *Loop) Name() string

Name returns the loop name.

func (*Loop) Run

func (l *Loop) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)

Run executes the step repeatedly until the condition returns true.

func (*Loop) RunStream

func (l *Loop) RunStream(ctx context.Context, state *State, opts ...Option) <-chan Event

RunStream executes the step repeatedly and emits events.

type LoopCondition

type LoopCondition func(ctx context.Context, state *State) bool

LoopCondition evaluates state to determine if the loop should exit. Return true to exit the loop, false to continue iterating.

type LoopOption

type LoopOption func(*Loop)

LoopOption configures a Loop.

func WithMaxIterations

func WithMaxIterations(n int) LoopOption

WithMaxIterations sets the maximum number of loop iterations. Default is 10.

type MemoryAdapter

type MemoryAdapter = store.MemoryAdapter

MemoryAdapter is an in-memory implementation of StateAdapter.

func NewMemoryAdapter

func NewMemoryAdapter() *MemoryAdapter

NewMemoryAdapter creates a new in-memory adapter for State.

type Option

type Option func(*Options)

Option is a functional option for workflow configuration.

func WithChatOptions

func WithChatOptions(opts ...ai.Option) Option

WithChatOptions passes options to LLM calls.

func WithContinueOnError

func WithContinueOnError(enabled bool) Option

WithContinueOnError allows the workflow to continue after errors.

func WithErrorHandler

func WithErrorHandler(fn ErrorHandler) Option

WithErrorHandler sets a custom error handler.

func WithMaxConcurrency

func WithMaxConcurrency(n int) Option

WithMaxConcurrency limits parallel step execution. A value of 0 means unlimited concurrency.

func WithMaxTokens

func WithMaxTokens(n int) Option

WithMaxTokens is a convenience option to set max tokens for chat calls.

func WithModel

func WithModel(model ai.Model) Option

WithModel is a convenience option to set the model for chat calls.

func WithOnStepComplete

func WithOnStepComplete(fn StepCallback) Option

WithOnStepComplete sets a callback for step completion.

func WithStepTimeout

func WithStepTimeout(d time.Duration) Option

WithStepTimeout sets the default timeout for each step.

func WithTemperature

func WithTemperature(t float64) Option

WithTemperature is a convenience option to set temperature for chat calls.

func WithTimeout

func WithTimeout(d time.Duration) Option

WithTimeout sets the overall workflow timeout.

type Options

type Options struct {
	// Timeout sets a deadline for the entire workflow.
	Timeout time.Duration

	// StepTimeout sets default timeout for individual steps.
	StepTimeout time.Duration

	// MaxConcurrency limits parallel step execution (0 = unlimited).
	MaxConcurrency int

	// ErrorHandler is called on step errors.
	ErrorHandler ErrorHandler

	// OnStepComplete is called after each step.
	OnStepComplete StepCallback

	// ContinueOnError allows workflow to continue after step errors.
	ContinueOnError bool

	// ChatOptions are passed to LLM calls within steps.
	ChatOptions []ai.Option
}

Options contains configuration for workflow execution.

func ApplyOptions

func ApplyOptions(opts ...Option) *Options

ApplyOptions applies functional options with defaults.

type Parallel

type Parallel struct {
	// contains filtered or unexported fields
}

Parallel executes steps concurrently and aggregates results.

func NewParallel

func NewParallel(name string, steps []Step, aggregator Aggregator) *Parallel

NewParallel creates a parallel workflow. The aggregator is called with all results after all steps complete. If aggregator is nil, each branch's state changes are merged back.

func (*Parallel) Name

func (p *Parallel) Name() string

Name returns the parallel workflow name.

func (*Parallel) Run

func (p *Parallel) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)

Run executes steps concurrently.

func (*Parallel) RunStream

func (p *Parallel) RunStream(ctx context.Context, state *State, opts ...Option) <-chan Event

RunStream executes steps concurrently and emits events.

type ParallelError

type ParallelError struct {
	Errors map[string]error
}

ParallelError wraps errors from parallel execution.

func (*ParallelError) Error

func (e *ParallelError) Error() string

Error returns a formatted message summarizing the parallel execution failures.

func (*ParallelError) Unwrap

func (e *ParallelError) Unwrap() error

Unwrap returns the first error for errors.Is/As compatibility.

type PromptFunc

type PromptFunc func(state *State) []ai.Message

PromptFunc generates messages from state for an LLM call.

type PromptStep

type PromptStep struct {
	// contains filtered or unexported fields
}

PromptStep makes a single LLM call with a dynamic prompt.

func NewPromptStep

func NewPromptStep(name string, c ChatClient, prompt PromptFunc, outputKey string, opts ...ai.Option) *PromptStep

NewPromptStep creates a step for a single LLM call. The prompt function generates messages from current state. If outputKey is non-empty, the response content is stored in state under that key.

func (*PromptStep) Name

func (p *PromptStep) Name() string

Name returns the step name.

func (*PromptStep) Run

func (p *PromptStep) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)

Run executes the LLM call.

func (*PromptStep) RunStream

func (p *PromptStep) RunStream(ctx context.Context, state *State, opts ...Option) <-chan Event

RunStream executes the LLM call with streaming.

type Result

type Result struct {
	// WorkflowName identifies the workflow.
	WorkflowName string

	// State contains the final state after execution.
	State *State

	// Output is the primary output from the workflow.
	Output any

	// Usage aggregates token usage across all steps.
	Usage ai.Usage

	// Termination indicates why execution stopped.
	Termination TerminationReason

	// Error contains any error that caused termination.
	Error error
}

Result represents the final outcome of workflow execution.

type Route

type Route struct {
	Name      string
	Condition Condition
	Step      Step
}

Route represents a conditional path in a router.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router selects and executes a step based on conditions.

func NewRouter

func NewRouter(name string, routes []Route, defaultRoute Step) *Router

NewRouter creates a conditional router. Routes are evaluated in order; first match wins. Default route is used if no conditions match (can be nil).

func (*Router) Name

func (r *Router) Name() string

Name returns the router name.

func (*Router) RouteKey added in v0.2.3

func (r *Router) RouteKey() Key[string]

RouteKey returns a typed key for the selected route name. The key name follows the pattern "{routerName}_route".

func (*Router) Run

func (r *Router) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)

Run evaluates conditions and executes the matching step.

func (*Router) RunStream

func (r *Router) RunStream(ctx context.Context, state *State, opts ...Option) <-chan Event

RunStream evaluates conditions and streams the matching step's events.

type State

type State = store.Store

State provides thread-safe key-value state management for workflows. This is the primary state container passed through workflow steps.

State supports reading and writing values of any type:

state := workflow.NewState(nil)
state.Set("count", 42)
state.Set("name", "example")

count := state.GetInt("count")   // 42
name := state.GetString("name")  // "example"

func NewState

func NewState(adapter StateAdapter) *State

NewState creates a new State with the given adapter. If adapter is nil, a default in-memory adapter is used.

func NewStateFrom

func NewStateFrom(data map[string]any) *State

NewStateFrom creates a new State initialized with the given data.

type StateAdapter

type StateAdapter = store.Adapter

StateAdapter defines the interface for persistence backends. Implementations must be thread-safe.

type Step

type Step interface {
	// Name returns a unique identifier for the step.
	Name() string

	// Run executes the step and returns the result.
	Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)

	// RunStream executes the step and returns a channel of events.
	RunStream(ctx context.Context, state *State, opts ...Option) <-chan Event
}

Step represents a single unit of work in a workflow. Steps can be functions, LLM calls, or nested workflows.

type StepCallback

type StepCallback func(ctx context.Context, result *StepResult)

StepCallback is called after each step completes.

type StepError

type StepError struct {
	StepName string
	Err      error
}

StepError wraps errors from step execution.

func (*StepError) Error

func (e *StepError) Error() string

Error returns a formatted error message including the step name.

func (*StepError) Unwrap

func (e *StepError) Unwrap() error

Unwrap returns the underlying error for use with errors.Is and errors.As.

type StepFunc

type StepFunc func(ctx context.Context, state *State) error

StepFunc is a function signature for simple step implementations.

type StepResult

type StepResult struct {
	// StepName identifies which step produced this result.
	StepName string

	// Output is the primary output value (optional).
	Output any

	// Response contains the LLM response if the step used one.
	Response *ai.Response

	// Usage aggregates token usage if applicable.
	Usage ai.Usage

	// Metadata holds step-specific metadata.
	Metadata map[string]any
}

StepResult contains the output of a step execution.

type TerminationReason

type TerminationReason string

TerminationReason indicates why the workflow stopped.

const (
	// TerminationComplete indicates normal completion.
	TerminationComplete TerminationReason = "complete"

	// TerminationTimeout indicates the deadline was exceeded.
	TerminationTimeout TerminationReason = "timeout"

	// TerminationCancelled indicates context cancellation.
	TerminationCancelled TerminationReason = "cancelled"

	// TerminationError indicates an error occurred.
	TerminationError TerminationReason = "error"
)

type TypedPromptStep added in v0.2.1

type TypedPromptStep[T any] struct {
	// contains filtered or unexported fields
}

TypedPromptStep makes an LLM call with structured output that is automatically unmarshaled into type T and stored in state.

func NewTypedPromptStep added in v0.2.1

func NewTypedPromptStep[T any](
	name string,
	c ChatClient,
	prompt PromptFunc,
	schema ai.ResponseSchema,
	outputKey string,
	opts ...ai.Option,
) *TypedPromptStep[T]

NewTypedPromptStep creates a step that returns structured output of type T. The schema parameter defines the JSON schema for the response. The unmarshaled *T is stored in state under outputKey.

Example:

type Analysis struct {
    Sentiment  string   `json:"sentiment"`
    Keywords   []string `json:"keywords"`
}

analysisSchema := ai.ResponseSchema{
    Name: "analysis",
    Schema: schema.Object().
        Field("sentiment", schema.String().Required()).
        Field("keywords", schema.Array(schema.String()).Required()).
        MustBuild(),
}

step := workflow.NewTypedPromptStep[Analysis](
    "analyze",
    client,
    func(s *State) []ai.Message {
        return []ai.Message{{Role: ai.RoleUser, Content: s.GetString("text")}}
    },
    analysisSchema,
    "analysis",
    ai.WithModel(model.Claude35Sonnet),
)

// After execution, state.Get("analysis") returns *Analysis

func NewTypedPromptStepWithKey added in v0.2.3

func NewTypedPromptStepWithKey[T any](
	name string,
	c ChatClient,
	prompt PromptFunc,
	schema ai.ResponseSchema,
	outputKey Key[*T],
	opts ...ai.Option,
) *TypedPromptStep[T]

NewTypedPromptStepWithKey creates a step that stores output using a typed key. This provides stronger type guarantees than the string-based version. The key type must be a pointer (*T) since TypedPromptStep stores pointers.

Example:

var KeyAnalysis = workflow.NewKey[*SentimentAnalysis]("analysis")

step := workflow.NewTypedPromptStepWithKey(
    "analyze",
    client,
    promptFunc,
    sentimentSchema,
    KeyAnalysis,
)

func (*TypedPromptStep[T]) Name added in v0.2.1

func (p *TypedPromptStep[T]) Name() string

Name returns the step name.

func (*TypedPromptStep[T]) Run added in v0.2.1

func (p *TypedPromptStep[T]) Run(ctx context.Context, state *State, opts ...Option) (*StepResult, error)

Run executes the LLM call and unmarshals the response into T.

func (*TypedPromptStep[T]) RunStream added in v0.2.1

func (p *TypedPromptStep[T]) RunStream(ctx context.Context, state *State, opts ...Option) <-chan Event

RunStream executes the LLM call with streaming and unmarshals the final response.

type UnmarshalError added in v0.2.1

type UnmarshalError struct {
	StepName   string // Name of the step that failed
	Content    string // Raw response content for debugging
	TargetType string // The type we tried to unmarshal into
	Err        error  // The underlying unmarshal error
}

UnmarshalError indicates that an LLM response could not be unmarshaled into the expected type.

func (*UnmarshalError) Error added in v0.2.1

func (e *UnmarshalError) Error() string

Error returns a formatted error message.

func (*UnmarshalError) Unwrap added in v0.2.1

func (e *UnmarshalError) Unwrap() error

Unwrap returns the underlying error for use with errors.Is and errors.As.

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

Workflow is the top-level orchestrator that wraps a root step. It provides the primary entry point for workflow execution.

func New

func New(name string, root Step) *Workflow

New creates a new workflow with a root step.

func (*Workflow) Name

func (w *Workflow) Name() string

Name returns the workflow name.

func (*Workflow) Run

func (w *Workflow) Run(ctx context.Context, state *State, opts ...Option) (*Result, error)

Run executes the workflow synchronously.

func (*Workflow) RunStream

func (w *Workflow) RunStream(ctx context.Context, state *State, opts ...Option) <-chan Event

RunStream executes the workflow and returns an event channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL