workflow

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotConfigured = errors.New("step dependency not configured")

ErrNotConfigured is returned when a step's required dependency (Provider, Registry, Handler) has not been injected. Callers can check for this with errors.Is(err, ErrNotConfigured).

Functions

func RegisterChannel

func RegisterChannel[T any](sg *StateGraph, channel *Channel[T])

RegisterChannel registers a channel with the state graph.

func ValidateDAGDefinition

func ValidateDAGDefinition(def *DAGDefinition) error

ValidateDAGDefinition validates a loaded DAGDefinition

func WithWorkflowStreamEmitter added in v1.0.0

func WithWorkflowStreamEmitter(ctx context.Context, emitter WorkflowStreamEmitter) context.Context

WithWorkflowStreamEmitter stores a WorkflowStreamEmitter in the context.

Types

type AgentAdapter

type AgentAdapter struct {
	// contains filtered or unexported fields
}

AgentAdapter adapts an AgentInterface to the AgentExecutor interface, allowing any agent implementation to be used in workflow steps.

func NewAgentAdapter

func NewAgentAdapter(agent AgentInterface, opts ...AgentAdapterOption) *AgentAdapter

NewAgentAdapter creates an AgentAdapter that implements AgentExecutor.

func (*AgentAdapter) Execute

func (a *AgentAdapter) Execute(ctx context.Context, input any) (any, error)

Execute implements AgentExecutor. It converts the workflow input to a string, calls the agent, and converts the string output back.

func (*AgentAdapter) ID

func (a *AgentAdapter) ID() string

ID implements AgentExecutor.

func (*AgentAdapter) Name

func (a *AgentAdapter) Name() string

Name implements AgentExecutor.

type AgentAdapterOption

type AgentAdapterOption func(*AgentAdapter)

AgentAdapterOption configures an AgentAdapter.

func WithAgentInputMapper

func WithAgentInputMapper(mapper func(any) (string, error)) AgentAdapterOption

WithAgentInputMapper sets a custom function to convert workflow input to agent string input.

func WithAgentOutputMapper

func WithAgentOutputMapper(mapper func(string) (any, error)) AgentAdapterOption

WithAgentOutputMapper sets a custom function to convert agent string output to workflow output.

type AgentExecutor

type AgentExecutor interface {
	types.Executor
	// Name returns the agent's display name.
	Name() string
}

AgentExecutor defines the interface for agent execution in workflows. It embeds types.Executor (the minimal common agent contract) and adds Name() which is required by workflow steps (e.g., AgentStep default naming).

type AgentInterface

type AgentInterface interface {
	// Execute runs the agent with a string prompt and returns a string response.
	Execute(ctx context.Context, input string) (string, error)
	// ID returns the agent's unique identifier.
	ID() string
	// Name returns the agent's display name.
	Name() string
}

AgentInterface is a minimal agent contract defined in the workflow package to avoid importing the agent package (which would cause circular imports). The agent.Agent interface uses (ctx, *Input) -> (*Output, error), but this interface uses plain string I/O for simplicity. Callers can use AgentAdapterOption functions to customize input/output conversion.

Note: This interface intentionally differs from types.Executor in its Execute signature (string -> string vs any -> any). It exists for agents that only support string-based I/O. Use AgentAdapter to bridge an AgentInterface implementation to the AgentExecutor (types.Executor) contract.

type AgentRouter

type AgentRouter struct {
	// contains filtered or unexported fields
}

AgentRouter routes tasks to appropriate agents based on criteria.

func NewAgentRouter

func NewAgentRouter(selector func(ctx context.Context, input any, agents map[string]AgentExecutor) (AgentExecutor, error)) *AgentRouter

NewAgentRouter creates a new AgentRouter.

func (*AgentRouter) Execute

func (r *AgentRouter) Execute(ctx context.Context, input any) (any, error)

Execute implements the Step interface by routing to the appropriate agent.

func (*AgentRouter) Name

func (r *AgentRouter) Name() string

Name implements the Step interface.

func (*AgentRouter) RegisterAgent

func (r *AgentRouter) RegisterAgent(agent AgentExecutor)

RegisterAgent registers an agent with the router.

type AgentStep

type AgentStep struct {
	// contains filtered or unexported fields
}

AgentStep wraps an AgentExecutor as a workflow Step. This allows agents to be used as steps in workflow chains.

func NewAgentStep

func NewAgentStep(agent AgentExecutor, opts ...AgentStepOption) *AgentStep

NewAgentStep creates a new AgentStep from an AgentExecutor.

func (*AgentStep) AgentID

func (s *AgentStep) AgentID() string

AgentID returns the underlying agent's ID.

func (*AgentStep) Execute

func (s *AgentStep) Execute(ctx context.Context, input any) (any, error)

Execute implements the Step interface.

func (*AgentStep) Name

func (s *AgentStep) Name() string

Name implements the Step interface.

type AgentStepOption

type AgentStepOption func(*AgentStep)

AgentStepOption configures an AgentStep.

func WithInputMapper

func WithInputMapper(mapper func(any) (any, error)) AgentStepOption

WithInputMapper sets a function to transform input before agent execution.

func WithOutputMapper

func WithOutputMapper(mapper func(any) (any, error)) AgentStepOption

WithOutputMapper sets a function to transform output after agent execution.

func WithStepName

func WithStepName(name string) AgentStepOption

WithStepName sets a custom name for the step.

type Aggregator

type Aggregator interface {
	// Aggregate 聚合结果
	Aggregate(ctx context.Context, results []TaskResult) (any, error)
}

Aggregator 聚合器接口 将多个任务的结果聚合为最终输出

type AggregatorFunc

type AggregatorFunc func(ctx context.Context, results []TaskResult) (any, error)

AggregatorFunc 聚合器函数类型

type Annotation

type Annotation[T any] struct {
	Name    string
	Default T
	Reducer Reducer[T]
}

Annotation provides type-safe state definition.

func NewAnnotation

func NewAnnotation[T any](name string, defaultVal T, reducer Reducer[T]) Annotation[T]

NewAnnotation creates a new annotation.

func (Annotation[T]) CreateChannel

func (a Annotation[T]) CreateChannel() *Channel[T]

CreateChannel creates a channel from an annotation.

type ChainWorkflow

type ChainWorkflow struct {
	// contains filtered or unexported fields
}

ChainWorkflow 提示词链工作流 将任务分解为固定的步骤序列,每个步骤处理前一步的输出

func NewChainWorkflow

func NewChainWorkflow(name, description string, steps ...Step) *ChainWorkflow

NewChainWorkflow 创建提示词链工作流

func (*ChainWorkflow) AddStep

func (w *ChainWorkflow) AddStep(step Step)

AddStep 添加步骤

func (*ChainWorkflow) Description

func (w *ChainWorkflow) Description() string

func (*ChainWorkflow) Execute

func (w *ChainWorkflow) Execute(ctx context.Context, input any) (any, error)

Execute 执行提示词链 按顺序执行每个步骤,将前一步的输出作为下一步的输入

func (*ChainWorkflow) Name

func (w *ChainWorkflow) Name() string

func (*ChainWorkflow) Steps

func (w *ChainWorkflow) Steps() []Step

Steps 返回所有步骤

type Channel

type Channel[T any] struct {
	// contains filtered or unexported fields
}

Channel represents a state channel with optional reducer.

func GetChannel

func GetChannel[T any](sg *StateGraph, name string) (*Channel[T], error)

GetChannel retrieves a typed channel by name.

func NewChannel

func NewChannel[T any](name string, initial T, opts ...ChannelOption[T]) *Channel[T]

NewChannel creates a new state channel.

func (*Channel[T]) Get

func (c *Channel[T]) Get() T

Get returns the current value.

func (*Channel[T]) GetAny added in v1.0.0

func (c *Channel[T]) GetAny() any

GetAny returns the current value as any, implementing ChannelReader.

func (*Channel[T]) History

func (c *Channel[T]) History() []T

History returns the value history.

func (*Channel[T]) Update

func (c *Channel[T]) Update(update T) T

Update applies an update using the reducer.

func (*Channel[T]) Version

func (c *Channel[T]) Version() uint64

Version returns the current version number.

type ChannelOption

type ChannelOption[T any] func(*Channel[T])

ChannelOption configures a channel.

func WithHistory

func WithHistory[T any](max int) ChannelOption[T]

WithHistory enables history tracking with max entries.

func WithReducer

func WithReducer[T any](r Reducer[T]) ChannelOption[T]

WithReducer sets a custom reducer for the channel.

type ChannelReader added in v1.0.0

type ChannelReader interface {
	GetAny() any
	Version() uint64
}

ChannelReader is a non-generic interface for reading channel values. This is needed because Go's type system does not allow matching generic method signatures like Get() T via interface{ Get() any }.

type CheckpointDiff

type CheckpointDiff struct {
	Version1       int           `json:"version1"`
	Version2       int           `json:"version2"`
	AddedNodes     []string      `json:"added_nodes"`
	RemovedNodes   []string      `json:"removed_nodes"`
	ChangedNodes   []string      `json:"changed_nodes"`
	TimeDifference time.Duration `json:"time_difference"`
}

CheckpointDiff represents differences between checkpoints.

type CheckpointManager

type CheckpointManager interface {
	SaveCheckpoint(ctx context.Context, checkpoint *EnhancedCheckpoint) error
}

CheckpointManager interface for checkpoint integration

type CheckpointStore

type CheckpointStore interface {
	Save(ctx context.Context, checkpoint *EnhancedCheckpoint) error
	Load(ctx context.Context, checkpointID string) (*EnhancedCheckpoint, error)
	LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)
	LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
	ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
	Delete(ctx context.Context, checkpointID string) error
}

CheckpointStore defines storage interface for enhanced checkpoints (Workflow layer).

Note: Two CheckpointStore interfaces exist in the project, operating on different types:

  • agent.CheckpointStore — operates on *agent.Checkpoint (agent state, List/DeleteThread/Rollback)
  • workflow.CheckpointStore (this) — operates on *workflow.EnhancedCheckpoint (DAG node results, time-travel)

They cannot be unified because the checkpoint structs have different fields (agent state vs DAG execution state).

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker 熔断器实现

func NewCircuitBreaker

func NewCircuitBreaker(
	nodeID string,
	config CircuitBreakerConfig,
	eventHandler CircuitBreakerEventHandler,
	logger *zap.Logger,
) *CircuitBreaker

NewCircuitBreaker 创建熔断器

func (*CircuitBreaker) AllowRequest

func (cb *CircuitBreaker) AllowRequest() (bool, error)

AllowRequest 检查是否允许请求通过

func (*CircuitBreaker) GetFailures

func (cb *CircuitBreaker) GetFailures() int

GetFailures 获取当前失败次数

func (*CircuitBreaker) GetState

func (cb *CircuitBreaker) GetState() CircuitState

GetState 获取当前状态

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure 记录失败

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess 记录成功

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset 重置熔断器

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	// FailureThreshold 连续失败次数阈值,达到后触发熔断
	FailureThreshold int `json:"failure_threshold"`
	// RecoveryTimeout 熔断后等待恢复的时间
	RecoveryTimeout Duration `json:"recovery_timeout"`
	// HalfOpenMaxProbes 半开状态允许的探测请求数
	HalfOpenMaxProbes int `json:"half_open_max_probes"`
	// SuccessThresholdInHalfOpen 半开状态下连续成功多少次后恢复
	SuccessThresholdInHalfOpen int `json:"success_threshold_in_half_open"`
}

CircuitBreakerConfig 熔断器配置

func DefaultCircuitBreakerConfig

func DefaultCircuitBreakerConfig() CircuitBreakerConfig

DefaultCircuitBreakerConfig 默认熔断器配置

type CircuitBreakerEvent

type CircuitBreakerEvent struct {
	NodeID    string       `json:"node_id"`
	OldState  CircuitState `json:"old_state"`
	NewState  CircuitState `json:"new_state"`
	Timestamp time.Time    `json:"timestamp"`
	Reason    string       `json:"reason"`
	Failures  int          `json:"failures"`
}

CircuitBreakerEvent 熔断器状态变更事件

type CircuitBreakerEventHandler

type CircuitBreakerEventHandler interface {
	OnStateChange(event CircuitBreakerEvent)
}

CircuitBreakerEventHandler 事件处理器接口

type CircuitBreakerRegistry

type CircuitBreakerRegistry struct {
	// contains filtered or unexported fields
}

CircuitBreakerRegistry 熔断器注册表,管理所有节点的熔断器

func NewCircuitBreakerRegistry

func NewCircuitBreakerRegistry(
	config CircuitBreakerConfig,
	eventHandler CircuitBreakerEventHandler,
	logger *zap.Logger,
) *CircuitBreakerRegistry

NewCircuitBreakerRegistry 创建熔断器注册表

func (*CircuitBreakerRegistry) GetAllStates

func (r *CircuitBreakerRegistry) GetAllStates() map[string]CircuitState

GetAllStates 获取所有熔断器状态

func (*CircuitBreakerRegistry) GetOrCreate

func (r *CircuitBreakerRegistry) GetOrCreate(nodeID string) *CircuitBreaker

GetOrCreate 获取或创建节点的熔断器

func (*CircuitBreakerRegistry) ResetAll

func (r *CircuitBreakerRegistry) ResetAll()

ResetAll 重置所有熔断器

type CircuitState

type CircuitState int

CircuitState represents the state of a circuit breaker. This is an independent definition equivalent to circuitbreaker.State in llm/circuitbreaker/. The workflow package maintains its own copy to avoid depending on the llm package (dependency direction: llm <- workflow, not workflow -> llm).

const (
	// CircuitClosed 正常状态,允许请求通过
	CircuitClosed CircuitState = iota
	// CircuitOpen 熔断状态,拒绝所有请求
	CircuitOpen
	// CircuitHalfOpen 半开状态,允许探测请求
	CircuitHalfOpen
)

func (CircuitState) String

func (s CircuitState) String() string

type CodeStep

type CodeStep struct {
	Handler func(ctx context.Context, input any) (any, error)
}

CodeStep executes custom code via an injected Go handler function.

func (*CodeStep) Execute

func (s *CodeStep) Execute(ctx context.Context, input any) (any, error)

func (*CodeStep) Name

func (s *CodeStep) Name() string

type ConditionFunc

type ConditionFunc func(ctx context.Context, input any) (bool, error)

ConditionFunc evaluates a condition and returns true or false

type ConditionalAgentStep

type ConditionalAgentStep struct {
	// contains filtered or unexported fields
}

ConditionalAgentStep executes different agents based on conditions.

func NewConditionalAgentStep

func NewConditionalAgentStep() *ConditionalAgentStep

NewConditionalAgentStep creates a conditional agent step.

func (*ConditionalAgentStep) Default

Default sets the default agent when no conditions match.

func (*ConditionalAgentStep) Execute

func (c *ConditionalAgentStep) Execute(ctx context.Context, input any) (any, error)

Execute implements the Step interface.

func (*ConditionalAgentStep) Name

func (c *ConditionalAgentStep) Name() string

Name implements the Step interface.

func (*ConditionalAgentStep) When

func (c *ConditionalAgentStep) When(check func(ctx context.Context, input any) bool, agent AgentExecutor) *ConditionalAgentStep

When adds a condition-agent pair.

type DAGBuilder

type DAGBuilder struct {
	// contains filtered or unexported fields
}

DAGBuilder provides a fluent API for constructing DAG workflows

func NewDAGBuilder

func NewDAGBuilder(name string) *DAGBuilder

NewDAGBuilder creates a new DAG builder with the given name

func (*DAGBuilder) AddEdge

func (b *DAGBuilder) AddEdge(from, to string) *DAGBuilder

AddEdge adds a directed edge from one node to another

func (*DAGBuilder) AddNode

func (b *DAGBuilder) AddNode(id string, nodeType NodeType) *NodeBuilder

AddNode adds a node to the graph and returns a NodeBuilder for configuration

func (*DAGBuilder) Build

func (b *DAGBuilder) Build() (*DAGWorkflow, error)

Build validates the DAG and creates a DAGWorkflow

func (*DAGBuilder) SetEntry

func (b *DAGBuilder) SetEntry(nodeID string) *DAGBuilder

SetEntry sets the entry node for the workflow

func (*DAGBuilder) WithDescription

func (b *DAGBuilder) WithDescription(desc string) *DAGBuilder

WithDescription sets the workflow description

func (*DAGBuilder) WithLogger

func (b *DAGBuilder) WithLogger(logger *zap.Logger) *DAGBuilder

WithLogger sets a custom logger

type DAGDefinition

type DAGDefinition struct {
	// Name is the workflow name
	Name string `json:"name" yaml:"name"`
	// Description describes the workflow
	Description string `json:"description" yaml:"description"`
	// Entry is the ID of the entry node
	Entry string `json:"entry" yaml:"entry"`
	// Nodes contains all node definitions
	Nodes []NodeDefinition `json:"nodes" yaml:"nodes"`
	// Metadata stores additional workflow information
	Metadata map[string]any `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}

DAGDefinition represents a serializable workflow definition

func FromJSON

func FromJSON(jsonStr string) (*DAGDefinition, error)

FromJSON creates a DAGDefinition from JSON string

func FromYAML

func FromYAML(yamlStr string) (*DAGDefinition, error)

FromYAML creates a DAGDefinition from YAML string

func LoadFromJSONFile

func LoadFromJSONFile(filename string) (*DAGDefinition, error)

LoadFromJSONFile loads a DAGDefinition from a JSON file

func LoadFromYAMLFile

func LoadFromYAMLFile(filename string) (*DAGDefinition, error)

LoadFromYAMLFile loads a DAGDefinition from a YAML file

func (*DAGDefinition) MarshalJSON

func (d *DAGDefinition) MarshalJSON() ([]byte, error)

MarshalJSON serializes a DAGDefinition to JSON

func (*DAGDefinition) MarshalYAML

func (d *DAGDefinition) MarshalYAML() (any, error)

MarshalYAML serializes a DAGDefinition to YAML

func (*DAGDefinition) SaveToJSONFile

func (d *DAGDefinition) SaveToJSONFile(filename string) error

SaveToJSONFile saves a DAGDefinition to a JSON file

func (*DAGDefinition) SaveToYAMLFile

func (d *DAGDefinition) SaveToYAMLFile(filename string) error

SaveToYAMLFile saves a DAGDefinition to a YAML file

func (*DAGDefinition) ToJSON

func (d *DAGDefinition) ToJSON() (string, error)

ToJSON converts a DAGDefinition to JSON string

func (*DAGDefinition) ToYAML

func (d *DAGDefinition) ToYAML() (string, error)

ToYAML converts a DAGDefinition to YAML string

func (*DAGDefinition) UnmarshalJSON

func (d *DAGDefinition) UnmarshalJSON(data []byte) error

UnmarshalJSON deserializes a DAGDefinition from JSON

func (*DAGDefinition) UnmarshalYAML

func (d *DAGDefinition) UnmarshalYAML(node *yaml.Node) error

UnmarshalYAML deserializes a DAGDefinition from YAML

type DAGExecutor

type DAGExecutor struct {
	// contains filtered or unexported fields
}

DAGExecutor executes DAG workflows with dependency resolution

func NewDAGExecutor

func NewDAGExecutor(checkpointMgr CheckpointManager, logger *zap.Logger) *DAGExecutor

NewDAGExecutor creates a new DAG executor

func (*DAGExecutor) Execute

func (e *DAGExecutor) Execute(ctx context.Context, graph *DAGGraph, input any) (any, error)

Execute runs the DAG workflow with dependency resolution. Bug fix (P0): executeMu ensures that concurrent Execute() calls on the same executor are serialized, preventing data races on shared execution state.

func (*DAGExecutor) GetCircuitBreakerStates

func (e *DAGExecutor) GetCircuitBreakerStates() map[string]CircuitState

GetCircuitBreakerStates 获取所有熔断器状态

func (*DAGExecutor) GetExecutionID

func (e *DAGExecutor) GetExecutionID() string

GetExecutionID returns the current execution ID

func (*DAGExecutor) GetHistory

func (e *DAGExecutor) GetHistory() *ExecutionHistory

GetHistory returns the execution history for the current execution

func (*DAGExecutor) GetHistoryStore

func (e *DAGExecutor) GetHistoryStore() *ExecutionHistoryStore

GetHistoryStore returns the history store

func (*DAGExecutor) GetNodeResult

func (e *DAGExecutor) GetNodeResult(nodeID string) (any, bool)

GetNodeResult retrieves the result of a completed node

func (*DAGExecutor) SetCircuitBreakerConfig

func (e *DAGExecutor) SetCircuitBreakerConfig(config CircuitBreakerConfig, handler CircuitBreakerEventHandler)

SetCircuitBreakerConfig 设置熔断器配置

func (*DAGExecutor) SetHistoryStore

func (e *DAGExecutor) SetHistoryStore(store *ExecutionHistoryStore)

SetHistoryStore sets a custom history store

type DAGGraph

type DAGGraph struct {
	// contains filtered or unexported fields
}

DAGGraph represents the workflow structure as a directed acyclic graph

func NewDAGGraph

func NewDAGGraph() *DAGGraph

NewDAGGraph creates a new empty DAG graph

func (*DAGGraph) AddEdge

func (g *DAGGraph) AddEdge(fromID, toID string)

AddEdge adds a directed edge from one node to another

func (*DAGGraph) AddNode

func (g *DAGGraph) AddNode(node *DAGNode)

AddNode adds a node to the graph

func (*DAGGraph) Edges

func (g *DAGGraph) Edges() map[string][]string

Edges returns all edges in the graph

func (*DAGGraph) GetEdges

func (g *DAGGraph) GetEdges(nodeID string) []string

GetEdges retrieves the outgoing edges for a node

func (*DAGGraph) GetEntry

func (g *DAGGraph) GetEntry() string

GetEntry returns the entry node ID

func (*DAGGraph) GetNode

func (g *DAGGraph) GetNode(nodeID string) (*DAGNode, bool)

GetNode retrieves a node by ID

func (*DAGGraph) Nodes

func (g *DAGGraph) Nodes() map[string]*DAGNode

Nodes returns all nodes in the graph

func (*DAGGraph) SetEntry

func (g *DAGGraph) SetEntry(nodeID string)

SetEntry sets the entry node for the graph

type DAGNode

type DAGNode struct {
	// ID is the unique identifier for this node
	ID string
	// Type specifies the node type
	Type NodeType
	// Step is the step to execute (for action nodes)
	Step Step
	// Condition evaluates branching logic (for conditional nodes)
	Condition ConditionFunc
	// LoopConfig defines loop behavior (for loop nodes)
	LoopConfig *LoopConfig
	// SubGraph is a nested workflow (for subgraph nodes)
	SubGraph *DAGGraph
	// ErrorConfig defines error handling behavior
	ErrorConfig *ErrorConfig
	// Metadata stores additional node information
	Metadata map[string]any
}

DAGNode represents a single node in the workflow graph

type DAGWorkflow

type DAGWorkflow struct {
	// contains filtered or unexported fields
}

DAGWorkflow represents a DAG-based workflow

func NewDAGWorkflow

func NewDAGWorkflow(name, description string, graph *DAGGraph) *DAGWorkflow

NewDAGWorkflow creates a new DAG workflow

func (*DAGWorkflow) Description

func (w *DAGWorkflow) Description() string

Description returns the workflow description

func (*DAGWorkflow) Execute

func (w *DAGWorkflow) Execute(ctx context.Context, input any) (any, error)

Execute executes the DAG workflow using DAGExecutor

func (*DAGWorkflow) GetMetadata

func (w *DAGWorkflow) GetMetadata(key string) (any, bool)

GetMetadata retrieves a metadata value

func (*DAGWorkflow) Graph

func (w *DAGWorkflow) Graph() *DAGGraph

Graph returns the underlying DAG graph

func (*DAGWorkflow) Name

func (w *DAGWorkflow) Name() string

Name returns the workflow name

func (*DAGWorkflow) SetExecutor

func (w *DAGWorkflow) SetExecutor(executor *DAGExecutor)

SetExecutor sets a custom executor for the workflow

func (*DAGWorkflow) SetMetadata

func (w *DAGWorkflow) SetMetadata(key string, value any)

SetMetadata sets a metadata value

func (*DAGWorkflow) ToDAGDefinition

func (w *DAGWorkflow) ToDAGDefinition() *DAGDefinition

ToDAGDefinition converts a DAGWorkflow to a DAGDefinition for serialization Note: This only captures the structure, not the runtime functions (conditions, iterators, steps)

type Duration added in v1.0.0

type Duration struct {
	time.Duration
}

Duration wraps time.Duration with human-readable JSON serialization. JSON output is a string like "30s", "5m", "1h30m" instead of nanoseconds.

func (Duration) MarshalJSON added in v1.0.0

func (d Duration) MarshalJSON() ([]byte, error)

MarshalJSON serializes Duration as a human-readable string.

func (*Duration) UnmarshalJSON added in v1.0.0

func (d *Duration) UnmarshalJSON(b []byte) error

UnmarshalJSON deserializes Duration from a string ("30s") or number (nanoseconds).

type EnhancedCheckpoint

type EnhancedCheckpoint struct {
	ID             string         `json:"id"`
	WorkflowID     string         `json:"workflow_id"`
	ThreadID       string         `json:"thread_id"`
	Version        int            `json:"version"`
	NodeID         string         `json:"node_id"`
	NodeResults    map[string]any `json:"node_results"`
	Variables      map[string]any `json:"variables"`
	PendingNodes   []string       `json:"pending_nodes"`
	CompletedNodes []string       `json:"completed_nodes"`
	Input          any            `json:"input"`
	CreatedAt      time.Time      `json:"created_at"`
	ParentID       string         `json:"parent_id,omitempty"`
	Metadata       map[string]any `json:"metadata,omitempty"`
	Snapshot       *GraphSnapshot `json:"snapshot,omitempty"`
}

EnhancedCheckpoint represents a workflow checkpoint with full state.

type EnhancedCheckpointManager

type EnhancedCheckpointManager struct {
	// contains filtered or unexported fields
}

EnhancedCheckpointManager manages workflow checkpoints with time-travel.

func NewEnhancedCheckpointManager

func NewEnhancedCheckpointManager(store CheckpointStore, logger *zap.Logger) *EnhancedCheckpointManager

NewEnhancedCheckpointManager creates a new checkpoint manager.

func (*EnhancedCheckpointManager) Compare

func (m *EnhancedCheckpointManager) Compare(ctx context.Context, threadID string, v1, v2 int) (*CheckpointDiff, error)

Compare compares two checkpoint versions.

func (*EnhancedCheckpointManager) CreateCheckpoint

func (m *EnhancedCheckpointManager) CreateCheckpoint(ctx context.Context, executor *DAGExecutor, graph *DAGGraph, threadID string, input any) (*EnhancedCheckpoint, error)

CreateCheckpoint creates a checkpoint from current execution state.

func (*EnhancedCheckpointManager) GetHistory

func (m *EnhancedCheckpointManager) GetHistory(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)

GetHistory returns checkpoint history for time-travel debugging.

func (*EnhancedCheckpointManager) ResumeFromCheckpoint

func (m *EnhancedCheckpointManager) ResumeFromCheckpoint(ctx context.Context, checkpointID string, graph *DAGGraph) (*DAGExecutor, error)

ResumeFromCheckpoint resumes workflow execution from a checkpoint.

func (*EnhancedCheckpointManager) Rollback

func (m *EnhancedCheckpointManager) Rollback(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)

Rollback rolls back to a specific version.

type ErrorConfig

type ErrorConfig struct {
	// Strategy specifies how to handle errors
	Strategy ErrorStrategy
	// MaxRetries is the maximum number of retry attempts (for retry strategy)
	MaxRetries int
	// RetryDelayMs is the delay between retries in milliseconds
	RetryDelayMs int
	// FallbackValue is the value to use when skipping a failed node
	FallbackValue any
}

ErrorConfig defines error handling behavior for a node

type ErrorDefinition added in v1.0.0

type ErrorDefinition struct {
	// Strategy specifies how to handle errors (fail_fast, skip, retry)
	Strategy string `json:"strategy" yaml:"strategy"`
	// MaxRetries is the maximum number of retry attempts (for retry strategy)
	MaxRetries int `json:"max_retries,omitempty" yaml:"max_retries,omitempty"`
	// RetryDelayMs is the delay between retries in milliseconds
	RetryDelayMs int `json:"retry_delay_ms,omitempty" yaml:"retry_delay_ms,omitempty"`
	// FallbackValue is the value to use when skipping a failed node
	FallbackValue any `json:"fallback_value,omitempty" yaml:"fallback_value,omitempty"`
}

ErrorDefinition represents a serializable error handling configuration

type ErrorStrategy

type ErrorStrategy string

ErrorStrategy defines how errors should be handled

const (
	// ErrorStrategyFailFast stops execution immediately on error
	ErrorStrategyFailFast ErrorStrategy = "fail_fast"
	// ErrorStrategySkip skips the failed node and continues
	ErrorStrategySkip ErrorStrategy = "skip"
	// ErrorStrategyRetry retries the failed node
	ErrorStrategyRetry ErrorStrategy = "retry"
)

type ExecutionContext

type ExecutionContext struct {
	// WorkflowID identifies the workflow being executed
	WorkflowID string `json:"workflow_id,omitempty"`
	// CurrentNode is the ID of the currently executing node
	CurrentNode string `json:"current_node,omitempty"`
	// NodeResults stores the results of completed nodes
	NodeResults map[string]any `json:"node_results,omitempty"`
	// Variables stores workflow variables
	Variables map[string]any `json:"variables,omitempty"`
	// StartTime is when the workflow execution started
	StartTime time.Time `json:"start_time,omitempty"`
	// LastUpdateTime is when the context was last updated
	LastUpdateTime time.Time `json:"last_update_time,omitempty"`
}

ExecutionContext captures the execution state for checkpointing

func NewExecutionContext

func NewExecutionContext(workflowID string) *ExecutionContext

NewExecutionContext creates a new execution context

func (*ExecutionContext) GetNodeResult

func (ec *ExecutionContext) GetNodeResult(nodeID string) (any, bool)

GetNodeResult retrieves the result of a completed node

func (*ExecutionContext) GetVariable

func (ec *ExecutionContext) GetVariable(key string) (any, bool)

GetVariable retrieves a workflow variable

func (*ExecutionContext) SetCurrentNode

func (ec *ExecutionContext) SetCurrentNode(nodeID string)

SetCurrentNode updates the currently executing node

func (*ExecutionContext) SetNodeResult

func (ec *ExecutionContext) SetNodeResult(nodeID string, result any)

SetNodeResult stores the result of a completed node

func (*ExecutionContext) SetVariable

func (ec *ExecutionContext) SetVariable(key string, value any)

SetVariable sets a workflow variable

type ExecutionHistory

type ExecutionHistory struct {
	ExecutionID string           `json:"execution_id"`
	WorkflowID  string           `json:"workflow_id"`
	StartTime   time.Time        `json:"start_time"`
	EndTime     time.Time        `json:"end_time"`
	Duration    time.Duration    `json:"duration"`
	Status      ExecutionStatus  `json:"status"`
	Nodes       []*NodeExecution `json:"nodes"`
	Error       string           `json:"error,omitempty"`
	Metadata    map[string]any   `json:"metadata,omitempty"`
	// contains filtered or unexported fields
}

ExecutionHistory records the complete execution path of a workflow

func NewExecutionHistory

func NewExecutionHistory(executionID, workflowID string) *ExecutionHistory

NewExecutionHistory creates a new execution history

func (*ExecutionHistory) Complete

func (h *ExecutionHistory) Complete(err error)

Complete marks the execution as completed

func (*ExecutionHistory) GetNodeByID

func (h *ExecutionHistory) GetNodeByID(nodeID string) *NodeExecution

GetNodeByID returns the execution record for a specific node

func (*ExecutionHistory) GetNodes

func (h *ExecutionHistory) GetNodes() []*NodeExecution

GetNodes returns a copy of the node executions

func (*ExecutionHistory) RecordNodeEnd

func (h *ExecutionHistory) RecordNodeEnd(node *NodeExecution, output any, err error)

RecordNodeEnd records the end of a node execution

func (*ExecutionHistory) RecordNodeStart

func (h *ExecutionHistory) RecordNodeStart(nodeID string, nodeType NodeType, input any) *NodeExecution

RecordNodeStart records the start of a node execution

type ExecutionHistoryStore

type ExecutionHistoryStore struct {
	// contains filtered or unexported fields
}

ExecutionHistoryStore stores and queries execution histories

func NewExecutionHistoryStore

func NewExecutionHistoryStore() *ExecutionHistoryStore

NewExecutionHistoryStore creates a new execution history store

func (*ExecutionHistoryStore) Get

func (s *ExecutionHistoryStore) Get(executionID string) (*ExecutionHistory, bool)

Get retrieves an execution history by ID

func (*ExecutionHistoryStore) ListByStatus

func (s *ExecutionHistoryStore) ListByStatus(status ExecutionStatus) []*ExecutionHistory

ListByStatus returns executions with a specific status

func (*ExecutionHistoryStore) ListByTimeRange

func (s *ExecutionHistoryStore) ListByTimeRange(start, end time.Time) []*ExecutionHistory

ListByTimeRange returns executions within a time range

func (*ExecutionHistoryStore) ListByWorkflow

func (s *ExecutionHistoryStore) ListByWorkflow(workflowID string) []*ExecutionHistory

ListByWorkflow returns all executions for a workflow

func (*ExecutionHistoryStore) Save

func (s *ExecutionHistoryStore) Save(history *ExecutionHistory)

Save saves an execution history

type ExecutionStatus

type ExecutionStatus string

ExecutionStatus represents the status of an execution

const (
	// ExecutionStatusRunning indicates the execution is in progress
	ExecutionStatusRunning ExecutionStatus = "running"
	// ExecutionStatusCompleted indicates the execution completed successfully
	ExecutionStatusCompleted ExecutionStatus = "completed"
	// ExecutionStatusFailed indicates the execution failed
	ExecutionStatusFailed ExecutionStatus = "failed"
)

type FuncAggregator

type FuncAggregator struct {
	// contains filtered or unexported fields
}

FuncAggregator 函数聚合器

func NewFuncAggregator

func NewFuncAggregator(fn AggregatorFunc) *FuncAggregator

NewFuncAggregator 创建函数聚合器

func (*FuncAggregator) Aggregate

func (a *FuncAggregator) Aggregate(ctx context.Context, results []TaskResult) (any, error)

type FuncHandler

type FuncHandler struct {
	// contains filtered or unexported fields
}

FuncHandler 函数处理器

func NewFuncHandler

func NewFuncHandler(name string, fn HandlerFunc) *FuncHandler

NewFuncHandler 创建函数处理器

func (*FuncHandler) Execute added in v1.0.0

func (h *FuncHandler) Execute(ctx context.Context, input any) (any, error)

func (*FuncHandler) Name

func (h *FuncHandler) Name() string

type FuncRouter

type FuncRouter struct {
	// contains filtered or unexported fields
}

FuncRouter 函数路由器

func NewFuncRouter

func NewFuncRouter(fn RouterFunc) *FuncRouter

NewFuncRouter 创建函数路由器

func (*FuncRouter) Route

func (r *FuncRouter) Route(ctx context.Context, input any) (string, error)

type FuncStep

type FuncStep struct {
	// contains filtered or unexported fields
}

FuncStep 函数步骤实现

func NewFuncStep

func NewFuncStep(name string, fn StepFunc) *FuncStep

NewFuncStep 创建函数步骤

func (*FuncStep) Execute

func (s *FuncStep) Execute(ctx context.Context, input any) (any, error)

func (*FuncStep) Name

func (s *FuncStep) Name() string

type FuncTask

type FuncTask struct {
	// contains filtered or unexported fields
}

FuncTask 函数任务

func NewFuncTask

func NewFuncTask(name string, fn TaskFunc) *FuncTask

NewFuncTask 创建函数任务

func (*FuncTask) Execute

func (t *FuncTask) Execute(ctx context.Context, input any) (any, error)

func (*FuncTask) Name

func (t *FuncTask) Name() string

type GraphSnapshot

type GraphSnapshot struct {
	Nodes     map[string]NodeSnapshot `json:"nodes"`
	Edges     map[string][]string     `json:"edges"`
	EntryNode string                  `json:"entry_node"`
}

GraphSnapshot captures the complete graph state.

type Handler

type Handler interface {
	Runnable
	// Name 返回处理器名称
	Name() string
}

Handler 处理器接口

type HandlerFunc

type HandlerFunc func(ctx context.Context, input any) (any, error)

HandlerFunc 处理器函数类型

type HumanInputHandler

type HumanInputHandler interface {
	// RequestInput sends a prompt to a human and waits for a response.
	// inputType hints at the expected response format (e.g. "text", "choice").
	// options provides selectable choices when inputType is "choice".
	RequestInput(ctx context.Context, prompt string, inputType string, options []string) (any, error)
}

HumanInputHandler abstracts human-in-the-loop interaction for workflow steps. Implement this interface to bridge workflow with your HITL management layer.

type HumanInputStep

type HumanInputStep struct {
	Prompt  string
	Type    string
	Options []string
	Timeout int
	Handler HumanInputHandler // Optional: inject to enable real HITL
}

HumanInputStep waits for human input. When Handler is set, it sends a request to the HITL handler and waits for a response. When Handler is nil, it returns a placeholder map (backward compatible).

func (*HumanInputStep) Execute

func (s *HumanInputStep) Execute(ctx context.Context, input any) (any, error)

func (*HumanInputStep) Name

func (s *HumanInputStep) Name() string

type InMemoryCheckpointStore

type InMemoryCheckpointStore struct {
	// contains filtered or unexported fields
}

InMemoryCheckpointStore provides in-memory storage.

func NewInMemoryCheckpointStore

func NewInMemoryCheckpointStore() *InMemoryCheckpointStore

NewInMemoryCheckpointStore creates a new in-memory store.

func (*InMemoryCheckpointStore) Delete

func (s *InMemoryCheckpointStore) Delete(ctx context.Context, id string) error

func (*InMemoryCheckpointStore) ListVersions

func (s *InMemoryCheckpointStore) ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)

func (*InMemoryCheckpointStore) Load

func (*InMemoryCheckpointStore) LoadLatest

func (s *InMemoryCheckpointStore) LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)

func (*InMemoryCheckpointStore) LoadVersion

func (s *InMemoryCheckpointStore) LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)

func (*InMemoryCheckpointStore) Save

type IteratorFunc

type IteratorFunc func(ctx context.Context, input any) ([]any, error)

IteratorFunc generates a collection of items for iteration

type LLMStep

type LLMStep struct {
	Model       string
	Prompt      string
	Temperature float64
	MaxTokens   int
	Provider    llm.Provider // Optional: inject to enable real LLM calls
}

LLMStep executes an LLM call. When Provider is set, it performs a real LLM completion request. When Provider is nil, it returns a placeholder map (backward compatible).

func (*LLMStep) Execute

func (s *LLMStep) Execute(ctx context.Context, input any) (any, error)

func (*LLMStep) Name

func (s *LLMStep) Name() string

type LoopConfig

type LoopConfig struct {
	// Type specifies the loop type (while, for, foreach)
	Type LoopType
	// MaxIterations limits the maximum number of iterations (0 = unlimited)
	MaxIterations int
	// Condition evaluates whether to continue looping (for while loops)
	Condition ConditionFunc
	// Iterator generates items to iterate over (for foreach loops)
	Iterator IteratorFunc
}

LoopConfig defines loop behavior

type LoopDefinition

type LoopDefinition struct {
	// Type is the loop type (while, for, foreach)
	Type string `json:"type" yaml:"type"`
	// MaxIterations limits the maximum number of iterations
	MaxIterations int `json:"max_iterations" yaml:"max_iterations"`
	// Condition is the condition name (for while loops)
	Condition string `json:"condition,omitempty" yaml:"condition,omitempty"`
}

LoopDefinition represents a serializable loop configuration

type LoopType

type LoopType string

LoopType defines the type of loop

const (
	// LoopTypeWhile executes while condition is true
	LoopTypeWhile LoopType = "while"
	// LoopTypeFor executes for a fixed number of iterations
	LoopTypeFor LoopType = "for"
	// LoopTypeForEach executes for each item in a collection
	LoopTypeForEach LoopType = "foreach"
)

type NativeAgentAdapter

type NativeAgentAdapter struct {
	// contains filtered or unexported fields
}

NativeAgentAdapter adapts an agent.Agent (with *Input/*Output signatures) to the AgentExecutor interface used by workflow steps.

Input conversion (any -> *agent.Input):

  • *agent.Input: passed through directly
  • string: wrapped as Input.Content
  • map[string]any: Content extracted from "content" key, rest goes to Context
  • other types: converted to string via fmt.Sprintf and set as Content

Output: returns the *agent.Output directly (callers can type-assert).

func NewNativeAgentAdapter

func NewNativeAgentAdapter(a agent.Agent) *NativeAgentAdapter

NewNativeAgentAdapter creates an adapter that bridges agent.Agent to AgentExecutor.

func (*NativeAgentAdapter) Execute

func (n *NativeAgentAdapter) Execute(ctx context.Context, input any) (any, error)

Execute implements AgentExecutor. It converts the workflow input to *agent.Input, calls agent.Execute, and returns the *agent.Output.

func (*NativeAgentAdapter) ID

func (n *NativeAgentAdapter) ID() string

ID implements AgentExecutor.

func (*NativeAgentAdapter) Name

func (n *NativeAgentAdapter) Name() string

Name implements AgentExecutor.

type NodeBuilder

type NodeBuilder struct {
	// contains filtered or unexported fields
}

NodeBuilder provides a fluent API for configuring individual nodes

func (*NodeBuilder) Done

func (nb *NodeBuilder) Done() *DAGBuilder

Done completes node configuration and returns to the DAGBuilder

func (*NodeBuilder) WithCondition

func (nb *NodeBuilder) WithCondition(cond ConditionFunc) *NodeBuilder

WithCondition sets the condition function for a conditional node

func (*NodeBuilder) WithErrorConfig

func (nb *NodeBuilder) WithErrorConfig(config ErrorConfig) *NodeBuilder

WithErrorConfig sets the error handling configuration for a node

func (*NodeBuilder) WithLoop

func (nb *NodeBuilder) WithLoop(config LoopConfig) *NodeBuilder

WithLoop sets the loop configuration for a loop node

func (*NodeBuilder) WithMetadata

func (nb *NodeBuilder) WithMetadata(key string, value any) *NodeBuilder

WithMetadata sets a metadata value

func (*NodeBuilder) WithOnFalse

func (nb *NodeBuilder) WithOnFalse(nodeIDs ...string) *NodeBuilder

WithOnFalse sets the nodes to execute when condition is false

func (*NodeBuilder) WithOnTrue

func (nb *NodeBuilder) WithOnTrue(nodeIDs ...string) *NodeBuilder

WithOnTrue sets the nodes to execute when condition is true

func (*NodeBuilder) WithStep

func (nb *NodeBuilder) WithStep(step Step) *NodeBuilder

WithStep sets the step for an action node

func (*NodeBuilder) WithSubGraph

func (nb *NodeBuilder) WithSubGraph(subGraph *DAGGraph) *NodeBuilder

WithSubGraph sets the subgraph for a subgraph node

type NodeConfig

type NodeConfig struct {
	// LLM node config
	Model       string  `json:"model,omitempty"`
	Prompt      string  `json:"prompt,omitempty"`
	Temperature float64 `json:"temperature,omitempty"`
	MaxTokens   int     `json:"max_tokens,omitempty"`

	// Tool node config
	ToolName   string         `json:"tool_name,omitempty"`
	ToolParams map[string]any `json:"tool_params,omitempty"`

	// Condition node config
	Condition  string `json:"condition,omitempty"`
	Expression string `json:"expression,omitempty"`

	// Loop node config
	LoopType      string `json:"loop_type,omitempty"`
	MaxIterations int    `json:"max_iterations,omitempty"`

	// Code node config
	Code     string `json:"code,omitempty"`
	Language string `json:"language,omitempty"`

	// Human input config
	InputPrompt string   `json:"input_prompt,omitempty"`
	InputType   string   `json:"input_type,omitempty"`
	Options     []string `json:"options,omitempty"`
	Timeout     int      `json:"timeout_seconds,omitempty"`

	// Subflow config
	SubflowID string `json:"subflow_id,omitempty"`
}

NodeConfig contains node-specific configuration.

type NodeDefinition

type NodeDefinition struct {
	// ID is the unique node identifier
	ID string `json:"id" yaml:"id"`
	// Type is the node type
	Type string `json:"type" yaml:"type"`
	// Step is the step name (for action nodes)
	Step string `json:"step,omitempty" yaml:"step,omitempty"`
	// Condition is the condition name (for conditional nodes)
	Condition string `json:"condition,omitempty" yaml:"condition,omitempty"`
	// Next lists the next nodes to execute (for action nodes)
	Next []string `json:"next,omitempty" yaml:"next,omitempty"`
	// OnTrue lists nodes to execute when condition is true
	OnTrue []string `json:"on_true,omitempty" yaml:"on_true,omitempty"`
	// OnFalse lists nodes to execute when condition is false
	OnFalse []string `json:"on_false,omitempty" yaml:"on_false,omitempty"`
	// Loop defines loop configuration (for loop nodes)
	Loop *LoopDefinition `json:"loop,omitempty" yaml:"loop,omitempty"`
	// SubGraph defines a nested workflow (for subgraph nodes)
	SubGraph *DAGDefinition `json:"subgraph,omitempty" yaml:"subgraph,omitempty"`
	// Error defines error handling configuration
	Error *ErrorDefinition `json:"error,omitempty" yaml:"error,omitempty"`
	// Metadata stores additional node information
	Metadata map[string]any `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}

NodeDefinition represents a serializable node definition

type NodeExecution

type NodeExecution struct {
	NodeID    string          `json:"node_id"`
	NodeType  NodeType        `json:"node_type"`
	StartTime time.Time       `json:"start_time"`
	EndTime   time.Time       `json:"end_time"`
	Duration  time.Duration   `json:"duration"`
	Status    ExecutionStatus `json:"status"`
	Input     any             `json:"input,omitempty"`
	Output    any             `json:"output,omitempty"`
	Error     string          `json:"error,omitempty"`
}

NodeExecution records the execution of a single node

type NodeOutput

type NodeOutput struct {
	Updates map[string]any `json:"updates"`
	NodeID  string         `json:"node_id"`
}

NodeOutput represents output from a graph node.

type NodeSnapshot

type NodeSnapshot struct {
	ID       string `json:"id"`
	Type     string `json:"type"`
	Status   string `json:"status"`
	Input    any    `json:"input,omitempty"`
	Output   any    `json:"output,omitempty"`
	Error    string `json:"error,omitempty"`
	Duration int64  `json:"duration_ms,omitempty"`
}

NodeSnapshot captures a node's state.

type NodeType

type NodeType string

NodeType defines the type of a DAG node

const (
	// NodeTypeAction executes a step
	NodeTypeAction NodeType = "action"
	// NodeTypeCondition performs conditional branching
	NodeTypeCondition NodeType = "condition"
	// NodeTypeLoop performs loop iteration
	NodeTypeLoop NodeType = "loop"
	// NodeTypeParallel executes nodes concurrently
	NodeTypeParallel NodeType = "parallel"
	// NodeTypeSubGraph executes a nested workflow
	NodeTypeSubGraph NodeType = "subgraph"
	// NodeTypeCheckpoint creates a checkpoint
	NodeTypeCheckpoint NodeType = "checkpoint"
)

type ParallelAgentStep

type ParallelAgentStep struct {
	// contains filtered or unexported fields
}

ParallelAgentStep executes multiple agents in parallel.

func NewParallelAgentStep

func NewParallelAgentStep(agents []AgentExecutor, merger func([]any) (any, error)) *ParallelAgentStep

NewParallelAgentStep creates a step that executes agents in parallel.

func (*ParallelAgentStep) Execute

func (p *ParallelAgentStep) Execute(ctx context.Context, input any) (any, error)

Execute runs all agents in parallel and merges results.

func (*ParallelAgentStep) Name

func (p *ParallelAgentStep) Name() string

Name implements the Step interface.

type ParallelWorkflow

type ParallelWorkflow struct {
	// contains filtered or unexported fields
}

ParallelWorkflow 并行工作流 将任务分割为多个子任务并行执行,然后聚合结果

func NewParallelWorkflow

func NewParallelWorkflow(name, description string, aggregator Aggregator, tasks ...Task) *ParallelWorkflow

NewParallelWorkflow 创建并行工作流

func (*ParallelWorkflow) AddTask

func (w *ParallelWorkflow) AddTask(task Task)

AddTask 添加任务

func (*ParallelWorkflow) Description

func (w *ParallelWorkflow) Description() string

func (*ParallelWorkflow) Execute

func (w *ParallelWorkflow) Execute(ctx context.Context, input any) (any, error)

Execute 执行并行工作流 1. 并行执行所有任务 2. 收集所有结果 3. 使用聚合器聚合结果

func (*ParallelWorkflow) Name

func (w *ParallelWorkflow) Name() string

func (*ParallelWorkflow) Tasks

func (w *ParallelWorkflow) Tasks() []Task

Tasks 返回所有任务

type PassthroughStep

type PassthroughStep struct{}

PassthroughStep passes input directly to output.

func (*PassthroughStep) Execute

func (s *PassthroughStep) Execute(ctx context.Context, input any) (any, error)

func (*PassthroughStep) Name

func (s *PassthroughStep) Name() string

type Port

type Port struct {
	ID   string `json:"id"`
	Name string `json:"name"`
	Type string `json:"type"` // string, number, boolean, object, array
}

Port represents an input/output port on a node.

type Position

type Position struct {
	X float64 `json:"x"`
	Y float64 `json:"y"`
}

Position represents node position in visual canvas.

type Reducer

type Reducer[T any] func(current T, update T) T

Reducer defines how to merge state updates from multiple nodes.

func AppendReducer

func AppendReducer[T any]() Reducer[[]T]

AppendReducer appends slices together.

func LastValueReducer

func LastValueReducer[T any]() Reducer[T]

LastValueReducer returns the most recent value (default).

func MaxReducer

func MaxReducer[T ~int | ~int64 | ~float64]() Reducer[T]

MaxReducer keeps the maximum value.

func MergeMapReducer

func MergeMapReducer[K comparable, V any]() Reducer[map[K]V]

MergeMapReducer merges maps, with update values taking precedence.

func SumReducer

func SumReducer[T ~int | ~int64 | ~float64]() Reducer[T]

SumReducer sums numeric values.

type Router

type Router interface {
	// Route 路由决策,返回路由键
	Route(ctx context.Context, input any) (string, error)
}

Router 路由器接口 根据输入决定使用哪个处理器

type RouterFunc

type RouterFunc func(ctx context.Context, input any) (string, error)

RouterFunc 路由函数类型

type RoutingWorkflow

type RoutingWorkflow struct {
	// contains filtered or unexported fields
}

RoutingWorkflow 路由工作流 根据输入分类,将任务路由到专门的处理器

func NewRoutingWorkflow

func NewRoutingWorkflow(name, description string, router Router) *RoutingWorkflow

NewRoutingWorkflow 创建路由工作流

func (*RoutingWorkflow) Description

func (w *RoutingWorkflow) Description() string

func (*RoutingWorkflow) Execute

func (w *RoutingWorkflow) Execute(ctx context.Context, input any) (any, error)

Execute 执行路由工作流 1. 使用路由器决定路由 2. 查找对应的处理器 3. 执行处理器

func (*RoutingWorkflow) Name

func (w *RoutingWorkflow) Name() string

func (*RoutingWorkflow) RegisterHandler

func (w *RoutingWorkflow) RegisterHandler(route string, handler Handler)

RegisterHandler 注册处理器

func (*RoutingWorkflow) Routes

func (w *RoutingWorkflow) Routes() []string

Routes 返回所有已注册的路由

func (*RoutingWorkflow) SetDefaultRoute

func (w *RoutingWorkflow) SetDefaultRoute(route string)

SetDefaultRoute 设置默认路由

type Runnable added in v1.0.0

type Runnable interface {
	Execute(ctx context.Context, input any) (any, error)
}

Runnable is the common execution interface shared by Step, Task, and Handler. It represents any unit of work that can be executed with input and produce output.

type StateGraph

type StateGraph struct {
	// contains filtered or unexported fields
}

StateGraph manages multiple channels as a unified state.

func NewStateGraph

func NewStateGraph() *StateGraph

NewStateGraph creates a new state graph.

func (*StateGraph) ApplyNodeOutput

func (sg *StateGraph) ApplyNodeOutput(output NodeOutput) error

ApplyNodeOutput applies a node's output to the state graph.

func (*StateGraph) Snapshot

func (sg *StateGraph) Snapshot() StateSnapshot

Snapshot creates a snapshot of the current state.

type StateSnapshot

type StateSnapshot struct {
	Values   map[string]any    `json:"values"`
	Versions map[string]uint64 `json:"versions"`
}

StateSnapshot captures the current state of all channels.

type Step

type Step interface {
	Runnable
	// Name 返回步骤名称
	Name() string
}

Step 工作流步骤接口

type StepFunc

type StepFunc func(ctx context.Context, input any) (any, error)

StepFunc 步骤函数类型

type Task

type Task interface {
	Runnable
	// Name 返回任务名称
	Name() string
}

Task 并行任务接口

type TaskFunc

type TaskFunc func(ctx context.Context, input any) (any, error)

TaskFunc 任务函数类型

type TaskResult

type TaskResult struct {
	TaskName string
	Result   any
	Error    error
}

TaskResult 任务结果

type Tool

type Tool interface {
	Name() string
	Execute(ctx context.Context, params map[string]any) (any, error)
}

Tool represents an executable tool within a workflow.

type ToolRegistry

type ToolRegistry interface {
	// GetTool returns a Tool by name. Returns nil, false if not found.
	GetTool(name string) (Tool, bool)
	// ExecuteTool looks up and executes a tool in one call.
	ExecuteTool(ctx context.Context, name string, params map[string]any) (any, error)
}

ToolRegistry abstracts tool lookup and execution for workflow steps. Implement this interface to bridge workflow with your tool management layer.

type ToolStep

type ToolStep struct {
	ToolName string
	Params   map[string]any
	Registry ToolRegistry // Optional: inject to enable real tool execution
}

ToolStep executes a tool call. When Registry is set, it performs a real tool execution. When Registry is nil, it returns a placeholder map (backward compatible).

func (*ToolStep) Execute

func (s *ToolStep) Execute(ctx context.Context, input any) (any, error)

func (*ToolStep) Name

func (s *ToolStep) Name() string

type Variable

type Variable struct {
	Name         string `json:"name"`
	Type         string `json:"type"`
	DefaultValue any    `json:"default_value,omitempty"`
	Description  string `json:"description,omitempty"`
}

Variable represents a workflow variable.

type VisualBuilder

type VisualBuilder struct {
	// contains filtered or unexported fields
}

VisualBuilder builds DAG workflows from visual definitions.

func NewVisualBuilder

func NewVisualBuilder() *VisualBuilder

NewVisualBuilder creates a new visual builder.

func (*VisualBuilder) Build

func (b *VisualBuilder) Build(vw *VisualWorkflow) (*DAGWorkflow, error)

Build converts a visual workflow to executable DAG.

func (*VisualBuilder) RegisterStep

func (b *VisualBuilder) RegisterStep(name string, step Step)

RegisterStep registers a step implementation.

type VisualEdge

type VisualEdge struct {
	ID         string `json:"id"`
	Source     string `json:"source"`
	SourcePort string `json:"source_port,omitempty"`
	Target     string `json:"target"`
	TargetPort string `json:"target_port,omitempty"`
	Label      string `json:"label,omitempty"`
	Condition  string `json:"condition,omitempty"` // For conditional edges
}

VisualEdge represents a connection between nodes.

type VisualNode

type VisualNode struct {
	ID       string         `json:"id"`
	Type     VisualNodeType `json:"type"`
	Label    string         `json:"label"`
	Position Position       `json:"position"`
	Config   NodeConfig     `json:"config"`
	Inputs   []Port         `json:"inputs,omitempty"`
	Outputs  []Port         `json:"outputs,omitempty"`
}

VisualNode represents a node in the visual workflow.

type VisualNodeType

type VisualNodeType string

VisualNodeType defines visual node types.

const (
	VNodeStart     VisualNodeType = "start"
	VNodeEnd       VisualNodeType = "end"
	VNodeLLM       VisualNodeType = "llm"
	VNodeTool      VisualNodeType = "tool"
	VNodeCondition VisualNodeType = "condition"
	VNodeLoop      VisualNodeType = "loop"
	VNodeParallel  VisualNodeType = "parallel"
	VNodeHuman     VisualNodeType = "human_input"
	VNodeCode      VisualNodeType = "code"
	VNodeSubflow   VisualNodeType = "subflow"
)

type VisualWorkflow

type VisualWorkflow struct {
	ID          string         `json:"id"`
	Name        string         `json:"name"`
	Description string         `json:"description"`
	Version     string         `json:"version"`
	Nodes       []VisualNode   `json:"nodes"`
	Edges       []VisualEdge   `json:"edges"`
	Variables   []Variable     `json:"variables,omitempty"`
	Metadata    map[string]any `json:"metadata,omitempty"`
	CreatedAt   time.Time      `json:"created_at"`
	UpdatedAt   time.Time      `json:"updated_at"`
}

VisualWorkflow represents a workflow designed in visual builder.

func Import

func Import(data []byte) (*VisualWorkflow, error)

Import imports visual workflow from JSON.

func (*VisualWorkflow) Export

func (vw *VisualWorkflow) Export() ([]byte, error)

Export exports visual workflow to JSON.

func (*VisualWorkflow) Validate

func (vw *VisualWorkflow) Validate() error

Validate validates the visual workflow.

type Workflow

type Workflow interface {
	Runnable
	// Name 返回工作流名称
	Name() string
	// Description 返回工作流描述
	Description() string
}

Workflow 工作流接口 Workflow 是预定义的步骤序列,提供可预测和一致的执行

type WorkflowStreamEmitter added in v1.0.0

type WorkflowStreamEmitter func(WorkflowStreamEvent)

WorkflowStreamEmitter is a callback that receives workflow stream events.

type WorkflowStreamEvent added in v1.0.0

type WorkflowStreamEvent struct {
	Type     WorkflowStreamEventType `json:"type"`
	NodeID   string                  `json:"node_id,omitempty"`
	NodeName string                  `json:"node_name,omitempty"`
	Data     any                     `json:"data,omitempty"`
	Error    error                   `json:"-"`
}

WorkflowStreamEvent carries information about a workflow execution event.

type WorkflowStreamEventType added in v1.0.0

type WorkflowStreamEventType string

WorkflowStreamEventType defines the type of workflow stream event.

const (
	// WorkflowEventNodeStart is emitted before a DAG node begins execution.
	WorkflowEventNodeStart WorkflowStreamEventType = "node_start"
	// WorkflowEventNodeComplete is emitted after a DAG node finishes successfully.
	WorkflowEventNodeComplete WorkflowStreamEventType = "node_complete"
	// WorkflowEventNodeError is emitted when a DAG node fails.
	WorkflowEventNodeError WorkflowStreamEventType = "node_error"
	// WorkflowEventStepProgress is emitted for intermediate step progress.
	WorkflowEventStepProgress WorkflowStreamEventType = "step_progress"
	// WorkflowEventToken is emitted for streaming token output from LLM steps.
	WorkflowEventToken WorkflowStreamEventType = "token"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL