Documentation
¶
Index ¶
- Variables
- func RegisterChannel[T any](sg *StateGraph, channel *Channel[T])
- func ValidateDAGDefinition(def *DAGDefinition) error
- func WithWorkflowStreamEmitter(ctx context.Context, emitter WorkflowStreamEmitter) context.Context
- type AgentAdapter
- type AgentAdapterOption
- type AgentExecutor
- type AgentInterface
- type AgentRouter
- type AgentStep
- type AgentStepOption
- type Aggregator
- type AggregatorFunc
- type Annotation
- type ChainWorkflow
- type Channel
- type ChannelOption
- type ChannelReader
- type CheckpointDiff
- type CheckpointManager
- type CheckpointStore
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerEvent
- type CircuitBreakerEventHandler
- type CircuitBreakerRegistry
- type CircuitState
- type CodeStep
- type ConditionFunc
- type ConditionalAgentStep
- func (c *ConditionalAgentStep) Default(agent AgentExecutor) *ConditionalAgentStep
- func (c *ConditionalAgentStep) Execute(ctx context.Context, input any) (any, error)
- func (c *ConditionalAgentStep) Name() string
- func (c *ConditionalAgentStep) When(check func(ctx context.Context, input any) bool, agent AgentExecutor) *ConditionalAgentStep
- type DAGBuilder
- func (b *DAGBuilder) AddEdge(from, to string) *DAGBuilder
- func (b *DAGBuilder) AddNode(id string, nodeType NodeType) *NodeBuilder
- func (b *DAGBuilder) Build() (*DAGWorkflow, error)
- func (b *DAGBuilder) SetEntry(nodeID string) *DAGBuilder
- func (b *DAGBuilder) WithDescription(desc string) *DAGBuilder
- func (b *DAGBuilder) WithLogger(logger *zap.Logger) *DAGBuilder
- type DAGDefinition
- func (d *DAGDefinition) MarshalJSON() ([]byte, error)
- func (d *DAGDefinition) MarshalYAML() (any, error)
- func (d *DAGDefinition) SaveToJSONFile(filename string) error
- func (d *DAGDefinition) SaveToYAMLFile(filename string) error
- func (d *DAGDefinition) ToJSON() (string, error)
- func (d *DAGDefinition) ToYAML() (string, error)
- func (d *DAGDefinition) UnmarshalJSON(data []byte) error
- func (d *DAGDefinition) UnmarshalYAML(node *yaml.Node) error
- type DAGExecutor
- func (e *DAGExecutor) Execute(ctx context.Context, graph *DAGGraph, input any) (any, error)
- func (e *DAGExecutor) GetCircuitBreakerStates() map[string]CircuitState
- func (e *DAGExecutor) GetExecutionID() string
- func (e *DAGExecutor) GetHistory() *ExecutionHistory
- func (e *DAGExecutor) GetHistoryStore() *ExecutionHistoryStore
- func (e *DAGExecutor) GetNodeResult(nodeID string) (any, bool)
- func (e *DAGExecutor) SetCircuitBreakerConfig(config CircuitBreakerConfig, handler CircuitBreakerEventHandler)
- func (e *DAGExecutor) SetHistoryStore(store *ExecutionHistoryStore)
- type DAGGraph
- func (g *DAGGraph) AddEdge(fromID, toID string)
- func (g *DAGGraph) AddNode(node *DAGNode)
- func (g *DAGGraph) Edges() map[string][]string
- func (g *DAGGraph) GetEdges(nodeID string) []string
- func (g *DAGGraph) GetEntry() string
- func (g *DAGGraph) GetNode(nodeID string) (*DAGNode, bool)
- func (g *DAGGraph) Nodes() map[string]*DAGNode
- func (g *DAGGraph) SetEntry(nodeID string)
- type DAGNode
- type DAGWorkflow
- func (w *DAGWorkflow) Description() string
- func (w *DAGWorkflow) Execute(ctx context.Context, input any) (any, error)
- func (w *DAGWorkflow) GetMetadata(key string) (any, bool)
- func (w *DAGWorkflow) Graph() *DAGGraph
- func (w *DAGWorkflow) Name() string
- func (w *DAGWorkflow) SetExecutor(executor *DAGExecutor)
- func (w *DAGWorkflow) SetMetadata(key string, value any)
- func (w *DAGWorkflow) ToDAGDefinition() *DAGDefinition
- type Duration
- type EnhancedCheckpoint
- type EnhancedCheckpointManager
- func (m *EnhancedCheckpointManager) Compare(ctx context.Context, threadID string, v1, v2 int) (*CheckpointDiff, error)
- func (m *EnhancedCheckpointManager) CreateCheckpoint(ctx context.Context, executor *DAGExecutor, graph *DAGGraph, threadID string, ...) (*EnhancedCheckpoint, error)
- func (m *EnhancedCheckpointManager) GetHistory(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
- func (m *EnhancedCheckpointManager) ResumeFromCheckpoint(ctx context.Context, checkpointID string, graph *DAGGraph) (*DAGExecutor, error)
- func (m *EnhancedCheckpointManager) Rollback(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
- type ErrorConfig
- type ErrorDefinition
- type ErrorStrategy
- type ExecutionContext
- func (ec *ExecutionContext) GetNodeResult(nodeID string) (any, bool)
- func (ec *ExecutionContext) GetVariable(key string) (any, bool)
- func (ec *ExecutionContext) SetCurrentNode(nodeID string)
- func (ec *ExecutionContext) SetNodeResult(nodeID string, result any)
- func (ec *ExecutionContext) SetVariable(key string, value any)
- type ExecutionHistory
- func (h *ExecutionHistory) Complete(err error)
- func (h *ExecutionHistory) GetNodeByID(nodeID string) *NodeExecution
- func (h *ExecutionHistory) GetNodes() []*NodeExecution
- func (h *ExecutionHistory) RecordNodeEnd(node *NodeExecution, output any, err error)
- func (h *ExecutionHistory) RecordNodeStart(nodeID string, nodeType NodeType, input any) *NodeExecution
- type ExecutionHistoryStore
- func (s *ExecutionHistoryStore) Get(executionID string) (*ExecutionHistory, bool)
- func (s *ExecutionHistoryStore) ListByStatus(status ExecutionStatus) []*ExecutionHistory
- func (s *ExecutionHistoryStore) ListByTimeRange(start, end time.Time) []*ExecutionHistory
- func (s *ExecutionHistoryStore) ListByWorkflow(workflowID string) []*ExecutionHistory
- func (s *ExecutionHistoryStore) Save(history *ExecutionHistory)
- type ExecutionStatus
- type FuncAggregator
- type FuncHandler
- type FuncRouter
- type FuncStep
- type FuncTask
- type GraphSnapshot
- type Handler
- type HandlerFunc
- type HumanInputHandler
- type HumanInputStep
- type InMemoryCheckpointStore
- func (s *InMemoryCheckpointStore) Delete(ctx context.Context, id string) error
- func (s *InMemoryCheckpointStore) ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
- func (s *InMemoryCheckpointStore) Load(ctx context.Context, id string) (*EnhancedCheckpoint, error)
- func (s *InMemoryCheckpointStore) LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)
- func (s *InMemoryCheckpointStore) LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
- func (s *InMemoryCheckpointStore) Save(ctx context.Context, cp *EnhancedCheckpoint) error
- type IteratorFunc
- type LLMStep
- type LoopConfig
- type LoopDefinition
- type LoopType
- type NativeAgentAdapter
- type NodeBuilder
- func (nb *NodeBuilder) Done() *DAGBuilder
- func (nb *NodeBuilder) WithCondition(cond ConditionFunc) *NodeBuilder
- func (nb *NodeBuilder) WithErrorConfig(config ErrorConfig) *NodeBuilder
- func (nb *NodeBuilder) WithLoop(config LoopConfig) *NodeBuilder
- func (nb *NodeBuilder) WithMetadata(key string, value any) *NodeBuilder
- func (nb *NodeBuilder) WithOnFalse(nodeIDs ...string) *NodeBuilder
- func (nb *NodeBuilder) WithOnTrue(nodeIDs ...string) *NodeBuilder
- func (nb *NodeBuilder) WithStep(step Step) *NodeBuilder
- func (nb *NodeBuilder) WithSubGraph(subGraph *DAGGraph) *NodeBuilder
- type NodeConfig
- type NodeDefinition
- type NodeExecution
- type NodeOutput
- type NodeSnapshot
- type NodeType
- type ParallelAgentStep
- type ParallelWorkflow
- type PassthroughStep
- type Port
- type Position
- type Reducer
- type Router
- type RouterFunc
- type RoutingWorkflow
- func (w *RoutingWorkflow) Description() string
- func (w *RoutingWorkflow) Execute(ctx context.Context, input any) (any, error)
- func (w *RoutingWorkflow) Name() string
- func (w *RoutingWorkflow) RegisterHandler(route string, handler Handler)
- func (w *RoutingWorkflow) Routes() []string
- func (w *RoutingWorkflow) SetDefaultRoute(route string)
- type Runnable
- type StateGraph
- type StateSnapshot
- type Step
- type StepFunc
- type Task
- type TaskFunc
- type TaskResult
- type Tool
- type ToolRegistry
- type ToolStep
- type Variable
- type VisualBuilder
- type VisualEdge
- type VisualNode
- type VisualNodeType
- type VisualWorkflow
- type Workflow
- type WorkflowStreamEmitter
- type WorkflowStreamEvent
- type WorkflowStreamEventType
Constants ¶
This section is empty.
Variables ¶
var ErrNotConfigured = errors.New("step dependency not configured")
ErrNotConfigured is returned when a step's required dependency (Provider, Registry, Handler) has not been injected. Callers can check for this with errors.Is(err, ErrNotConfigured).
Functions ¶
func RegisterChannel ¶
func RegisterChannel[T any](sg *StateGraph, channel *Channel[T])
RegisterChannel registers a channel with the state graph.
func ValidateDAGDefinition ¶
func ValidateDAGDefinition(def *DAGDefinition) error
ValidateDAGDefinition validates a loaded DAGDefinition
func WithWorkflowStreamEmitter ¶ added in v1.0.0
func WithWorkflowStreamEmitter(ctx context.Context, emitter WorkflowStreamEmitter) context.Context
WithWorkflowStreamEmitter stores a WorkflowStreamEmitter in the context.
Types ¶
type AgentAdapter ¶
type AgentAdapter struct {
// contains filtered or unexported fields
}
AgentAdapter adapts an AgentInterface to the AgentExecutor interface, allowing any agent implementation to be used in workflow steps.
func NewAgentAdapter ¶
func NewAgentAdapter(agent AgentInterface, opts ...AgentAdapterOption) *AgentAdapter
NewAgentAdapter creates an AgentAdapter that implements AgentExecutor.
type AgentAdapterOption ¶
type AgentAdapterOption func(*AgentAdapter)
AgentAdapterOption configures an AgentAdapter.
func WithAgentInputMapper ¶
func WithAgentInputMapper(mapper func(any) (string, error)) AgentAdapterOption
WithAgentInputMapper sets a custom function to convert workflow input to agent string input.
func WithAgentOutputMapper ¶
func WithAgentOutputMapper(mapper func(string) (any, error)) AgentAdapterOption
WithAgentOutputMapper sets a custom function to convert agent string output to workflow output.
type AgentExecutor ¶
type AgentExecutor interface {
types.Executor
// Name returns the agent's display name.
Name() string
}
AgentExecutor defines the interface for agent execution in workflows. It embeds types.Executor (the minimal common agent contract) and adds Name() which is required by workflow steps (e.g., AgentStep default naming).
type AgentInterface ¶
type AgentInterface interface {
// Execute runs the agent with a string prompt and returns a string response.
Execute(ctx context.Context, input string) (string, error)
// ID returns the agent's unique identifier.
ID() string
// Name returns the agent's display name.
Name() string
}
AgentInterface is a minimal agent contract defined in the workflow package to avoid importing the agent package (which would cause circular imports). The agent.Agent interface uses (ctx, *Input) -> (*Output, error), but this interface uses plain string I/O for simplicity. Callers can use AgentAdapterOption functions to customize input/output conversion.
Note: This interface intentionally differs from types.Executor in its Execute signature (string -> string vs any -> any). It exists for agents that only support string-based I/O. Use AgentAdapter to bridge an AgentInterface implementation to the AgentExecutor (types.Executor) contract.
type AgentRouter ¶
type AgentRouter struct {
// contains filtered or unexported fields
}
AgentRouter routes tasks to appropriate agents based on criteria.
func NewAgentRouter ¶
func NewAgentRouter(selector func(ctx context.Context, input any, agents map[string]AgentExecutor) (AgentExecutor, error)) *AgentRouter
NewAgentRouter creates a new AgentRouter.
func (*AgentRouter) Execute ¶
Execute implements the Step interface by routing to the appropriate agent.
func (*AgentRouter) RegisterAgent ¶
func (r *AgentRouter) RegisterAgent(agent AgentExecutor)
RegisterAgent registers an agent with the router.
type AgentStep ¶
type AgentStep struct {
// contains filtered or unexported fields
}
AgentStep wraps an AgentExecutor as a workflow Step. This allows agents to be used as steps in workflow chains.
func NewAgentStep ¶
func NewAgentStep(agent AgentExecutor, opts ...AgentStepOption) *AgentStep
NewAgentStep creates a new AgentStep from an AgentExecutor.
type AgentStepOption ¶
type AgentStepOption func(*AgentStep)
AgentStepOption configures an AgentStep.
func WithInputMapper ¶
func WithInputMapper(mapper func(any) (any, error)) AgentStepOption
WithInputMapper sets a function to transform input before agent execution.
func WithOutputMapper ¶
func WithOutputMapper(mapper func(any) (any, error)) AgentStepOption
WithOutputMapper sets a function to transform output after agent execution.
func WithStepName ¶
func WithStepName(name string) AgentStepOption
WithStepName sets a custom name for the step.
type Aggregator ¶
type Aggregator interface {
// Aggregate 聚合结果
Aggregate(ctx context.Context, results []TaskResult) (any, error)
}
Aggregator 聚合器接口 将多个任务的结果聚合为最终输出
type AggregatorFunc ¶
type AggregatorFunc func(ctx context.Context, results []TaskResult) (any, error)
AggregatorFunc 聚合器函数类型
type Annotation ¶
Annotation provides type-safe state definition.
func NewAnnotation ¶
func NewAnnotation[T any](name string, defaultVal T, reducer Reducer[T]) Annotation[T]
NewAnnotation creates a new annotation.
func (Annotation[T]) CreateChannel ¶
func (a Annotation[T]) CreateChannel() *Channel[T]
CreateChannel creates a channel from an annotation.
type ChainWorkflow ¶
type ChainWorkflow struct {
// contains filtered or unexported fields
}
ChainWorkflow 提示词链工作流 将任务分解为固定的步骤序列,每个步骤处理前一步的输出
func NewChainWorkflow ¶
func NewChainWorkflow(name, description string, steps ...Step) *ChainWorkflow
NewChainWorkflow 创建提示词链工作流
func (*ChainWorkflow) Description ¶
func (w *ChainWorkflow) Description() string
func (*ChainWorkflow) Name ¶
func (w *ChainWorkflow) Name() string
type Channel ¶
type Channel[T any] struct { // contains filtered or unexported fields }
Channel represents a state channel with optional reducer.
func GetChannel ¶
func GetChannel[T any](sg *StateGraph, name string) (*Channel[T], error)
GetChannel retrieves a typed channel by name.
func NewChannel ¶
func NewChannel[T any](name string, initial T, opts ...ChannelOption[T]) *Channel[T]
NewChannel creates a new state channel.
func (*Channel[T]) GetAny ¶ added in v1.0.0
GetAny returns the current value as any, implementing ChannelReader.
type ChannelOption ¶
ChannelOption configures a channel.
func WithHistory ¶
func WithHistory[T any](max int) ChannelOption[T]
WithHistory enables history tracking with max entries.
func WithReducer ¶
func WithReducer[T any](r Reducer[T]) ChannelOption[T]
WithReducer sets a custom reducer for the channel.
type ChannelReader ¶ added in v1.0.0
ChannelReader is a non-generic interface for reading channel values. This is needed because Go's type system does not allow matching generic method signatures like Get() T via interface{ Get() any }.
type CheckpointDiff ¶
type CheckpointDiff struct {
Version1 int `json:"version1"`
Version2 int `json:"version2"`
AddedNodes []string `json:"added_nodes"`
RemovedNodes []string `json:"removed_nodes"`
ChangedNodes []string `json:"changed_nodes"`
TimeDifference time.Duration `json:"time_difference"`
}
CheckpointDiff represents differences between checkpoints.
type CheckpointManager ¶
type CheckpointManager interface {
SaveCheckpoint(ctx context.Context, checkpoint *EnhancedCheckpoint) error
}
CheckpointManager interface for checkpoint integration
type CheckpointStore ¶
type CheckpointStore interface {
Save(ctx context.Context, checkpoint *EnhancedCheckpoint) error
Load(ctx context.Context, checkpointID string) (*EnhancedCheckpoint, error)
LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)
LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
Delete(ctx context.Context, checkpointID string) error
}
CheckpointStore defines storage interface for enhanced checkpoints (Workflow layer).
Note: Two CheckpointStore interfaces exist in the project, operating on different types:
- agent.CheckpointStore — operates on *agent.Checkpoint (agent state, List/DeleteThread/Rollback)
- workflow.CheckpointStore (this) — operates on *workflow.EnhancedCheckpoint (DAG node results, time-travel)
They cannot be unified because the checkpoint structs have different fields (agent state vs DAG execution state).
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker 熔断器实现
func NewCircuitBreaker ¶
func NewCircuitBreaker( nodeID string, config CircuitBreakerConfig, eventHandler CircuitBreakerEventHandler, logger *zap.Logger, ) *CircuitBreaker
NewCircuitBreaker 创建熔断器
func (*CircuitBreaker) AllowRequest ¶
func (cb *CircuitBreaker) AllowRequest() (bool, error)
AllowRequest 检查是否允许请求通过
func (*CircuitBreaker) GetFailures ¶
func (cb *CircuitBreaker) GetFailures() int
GetFailures 获取当前失败次数
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
// FailureThreshold 连续失败次数阈值,达到后触发熔断
FailureThreshold int `json:"failure_threshold"`
// RecoveryTimeout 熔断后等待恢复的时间
RecoveryTimeout Duration `json:"recovery_timeout"`
// HalfOpenMaxProbes 半开状态允许的探测请求数
HalfOpenMaxProbes int `json:"half_open_max_probes"`
// SuccessThresholdInHalfOpen 半开状态下连续成功多少次后恢复
SuccessThresholdInHalfOpen int `json:"success_threshold_in_half_open"`
}
CircuitBreakerConfig 熔断器配置
func DefaultCircuitBreakerConfig ¶
func DefaultCircuitBreakerConfig() CircuitBreakerConfig
DefaultCircuitBreakerConfig 默认熔断器配置
type CircuitBreakerEvent ¶
type CircuitBreakerEvent struct {
NodeID string `json:"node_id"`
OldState CircuitState `json:"old_state"`
NewState CircuitState `json:"new_state"`
Timestamp time.Time `json:"timestamp"`
Reason string `json:"reason"`
Failures int `json:"failures"`
}
CircuitBreakerEvent 熔断器状态变更事件
type CircuitBreakerEventHandler ¶
type CircuitBreakerEventHandler interface {
OnStateChange(event CircuitBreakerEvent)
}
CircuitBreakerEventHandler 事件处理器接口
type CircuitBreakerRegistry ¶
type CircuitBreakerRegistry struct {
// contains filtered or unexported fields
}
CircuitBreakerRegistry 熔断器注册表,管理所有节点的熔断器
func NewCircuitBreakerRegistry ¶
func NewCircuitBreakerRegistry( config CircuitBreakerConfig, eventHandler CircuitBreakerEventHandler, logger *zap.Logger, ) *CircuitBreakerRegistry
NewCircuitBreakerRegistry 创建熔断器注册表
func (*CircuitBreakerRegistry) GetAllStates ¶
func (r *CircuitBreakerRegistry) GetAllStates() map[string]CircuitState
GetAllStates 获取所有熔断器状态
func (*CircuitBreakerRegistry) GetOrCreate ¶
func (r *CircuitBreakerRegistry) GetOrCreate(nodeID string) *CircuitBreaker
GetOrCreate 获取或创建节点的熔断器
func (*CircuitBreakerRegistry) ResetAll ¶
func (r *CircuitBreakerRegistry) ResetAll()
ResetAll 重置所有熔断器
type CircuitState ¶
type CircuitState int
CircuitState represents the state of a circuit breaker. This is an independent definition equivalent to circuitbreaker.State in llm/circuitbreaker/. The workflow package maintains its own copy to avoid depending on the llm package (dependency direction: llm <- workflow, not workflow -> llm).
const ( // CircuitClosed 正常状态,允许请求通过 CircuitClosed CircuitState = iota // CircuitOpen 熔断状态,拒绝所有请求 CircuitOpen // CircuitHalfOpen 半开状态,允许探测请求 CircuitHalfOpen )
func (CircuitState) String ¶
func (s CircuitState) String() string
type ConditionFunc ¶
ConditionFunc evaluates a condition and returns true or false
type ConditionalAgentStep ¶
type ConditionalAgentStep struct {
// contains filtered or unexported fields
}
ConditionalAgentStep executes different agents based on conditions.
func NewConditionalAgentStep ¶
func NewConditionalAgentStep() *ConditionalAgentStep
NewConditionalAgentStep creates a conditional agent step.
func (*ConditionalAgentStep) Default ¶
func (c *ConditionalAgentStep) Default(agent AgentExecutor) *ConditionalAgentStep
Default sets the default agent when no conditions match.
func (*ConditionalAgentStep) Name ¶
func (c *ConditionalAgentStep) Name() string
Name implements the Step interface.
func (*ConditionalAgentStep) When ¶
func (c *ConditionalAgentStep) When(check func(ctx context.Context, input any) bool, agent AgentExecutor) *ConditionalAgentStep
When adds a condition-agent pair.
type DAGBuilder ¶
type DAGBuilder struct {
// contains filtered or unexported fields
}
DAGBuilder provides a fluent API for constructing DAG workflows
func NewDAGBuilder ¶
func NewDAGBuilder(name string) *DAGBuilder
NewDAGBuilder creates a new DAG builder with the given name
func (*DAGBuilder) AddEdge ¶
func (b *DAGBuilder) AddEdge(from, to string) *DAGBuilder
AddEdge adds a directed edge from one node to another
func (*DAGBuilder) AddNode ¶
func (b *DAGBuilder) AddNode(id string, nodeType NodeType) *NodeBuilder
AddNode adds a node to the graph and returns a NodeBuilder for configuration
func (*DAGBuilder) Build ¶
func (b *DAGBuilder) Build() (*DAGWorkflow, error)
Build validates the DAG and creates a DAGWorkflow
func (*DAGBuilder) SetEntry ¶
func (b *DAGBuilder) SetEntry(nodeID string) *DAGBuilder
SetEntry sets the entry node for the workflow
func (*DAGBuilder) WithDescription ¶
func (b *DAGBuilder) WithDescription(desc string) *DAGBuilder
WithDescription sets the workflow description
func (*DAGBuilder) WithLogger ¶
func (b *DAGBuilder) WithLogger(logger *zap.Logger) *DAGBuilder
WithLogger sets a custom logger
type DAGDefinition ¶
type DAGDefinition struct {
// Name is the workflow name
Name string `json:"name" yaml:"name"`
// Description describes the workflow
Description string `json:"description" yaml:"description"`
// Entry is the ID of the entry node
Entry string `json:"entry" yaml:"entry"`
// Nodes contains all node definitions
Nodes []NodeDefinition `json:"nodes" yaml:"nodes"`
// Metadata stores additional workflow information
Metadata map[string]any `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}
DAGDefinition represents a serializable workflow definition
func FromJSON ¶
func FromJSON(jsonStr string) (*DAGDefinition, error)
FromJSON creates a DAGDefinition from JSON string
func FromYAML ¶
func FromYAML(yamlStr string) (*DAGDefinition, error)
FromYAML creates a DAGDefinition from YAML string
func LoadFromJSONFile ¶
func LoadFromJSONFile(filename string) (*DAGDefinition, error)
LoadFromJSONFile loads a DAGDefinition from a JSON file
func LoadFromYAMLFile ¶
func LoadFromYAMLFile(filename string) (*DAGDefinition, error)
LoadFromYAMLFile loads a DAGDefinition from a YAML file
func (*DAGDefinition) MarshalJSON ¶
func (d *DAGDefinition) MarshalJSON() ([]byte, error)
MarshalJSON serializes a DAGDefinition to JSON
func (*DAGDefinition) MarshalYAML ¶
func (d *DAGDefinition) MarshalYAML() (any, error)
MarshalYAML serializes a DAGDefinition to YAML
func (*DAGDefinition) SaveToJSONFile ¶
func (d *DAGDefinition) SaveToJSONFile(filename string) error
SaveToJSONFile saves a DAGDefinition to a JSON file
func (*DAGDefinition) SaveToYAMLFile ¶
func (d *DAGDefinition) SaveToYAMLFile(filename string) error
SaveToYAMLFile saves a DAGDefinition to a YAML file
func (*DAGDefinition) ToJSON ¶
func (d *DAGDefinition) ToJSON() (string, error)
ToJSON converts a DAGDefinition to JSON string
func (*DAGDefinition) ToYAML ¶
func (d *DAGDefinition) ToYAML() (string, error)
ToYAML converts a DAGDefinition to YAML string
func (*DAGDefinition) UnmarshalJSON ¶
func (d *DAGDefinition) UnmarshalJSON(data []byte) error
UnmarshalJSON deserializes a DAGDefinition from JSON
func (*DAGDefinition) UnmarshalYAML ¶
func (d *DAGDefinition) UnmarshalYAML(node *yaml.Node) error
UnmarshalYAML deserializes a DAGDefinition from YAML
type DAGExecutor ¶
type DAGExecutor struct {
// contains filtered or unexported fields
}
DAGExecutor executes DAG workflows with dependency resolution
func NewDAGExecutor ¶
func NewDAGExecutor(checkpointMgr CheckpointManager, logger *zap.Logger) *DAGExecutor
NewDAGExecutor creates a new DAG executor
func (*DAGExecutor) Execute ¶
Execute runs the DAG workflow with dependency resolution. Bug fix (P0): executeMu ensures that concurrent Execute() calls on the same executor are serialized, preventing data races on shared execution state.
func (*DAGExecutor) GetCircuitBreakerStates ¶
func (e *DAGExecutor) GetCircuitBreakerStates() map[string]CircuitState
GetCircuitBreakerStates 获取所有熔断器状态
func (*DAGExecutor) GetExecutionID ¶
func (e *DAGExecutor) GetExecutionID() string
GetExecutionID returns the current execution ID
func (*DAGExecutor) GetHistory ¶
func (e *DAGExecutor) GetHistory() *ExecutionHistory
GetHistory returns the execution history for the current execution
func (*DAGExecutor) GetHistoryStore ¶
func (e *DAGExecutor) GetHistoryStore() *ExecutionHistoryStore
GetHistoryStore returns the history store
func (*DAGExecutor) GetNodeResult ¶
func (e *DAGExecutor) GetNodeResult(nodeID string) (any, bool)
GetNodeResult retrieves the result of a completed node
func (*DAGExecutor) SetCircuitBreakerConfig ¶
func (e *DAGExecutor) SetCircuitBreakerConfig(config CircuitBreakerConfig, handler CircuitBreakerEventHandler)
SetCircuitBreakerConfig 设置熔断器配置
func (*DAGExecutor) SetHistoryStore ¶
func (e *DAGExecutor) SetHistoryStore(store *ExecutionHistoryStore)
SetHistoryStore sets a custom history store
type DAGGraph ¶
type DAGGraph struct {
// contains filtered or unexported fields
}
DAGGraph represents the workflow structure as a directed acyclic graph
type DAGNode ¶
type DAGNode struct {
// ID is the unique identifier for this node
ID string
// Type specifies the node type
Type NodeType
// Step is the step to execute (for action nodes)
Step Step
// Condition evaluates branching logic (for conditional nodes)
Condition ConditionFunc
// LoopConfig defines loop behavior (for loop nodes)
LoopConfig *LoopConfig
// SubGraph is a nested workflow (for subgraph nodes)
SubGraph *DAGGraph
// ErrorConfig defines error handling behavior
ErrorConfig *ErrorConfig
// Metadata stores additional node information
Metadata map[string]any
}
DAGNode represents a single node in the workflow graph
type DAGWorkflow ¶
type DAGWorkflow struct {
// contains filtered or unexported fields
}
DAGWorkflow represents a DAG-based workflow
func NewDAGWorkflow ¶
func NewDAGWorkflow(name, description string, graph *DAGGraph) *DAGWorkflow
NewDAGWorkflow creates a new DAG workflow
func (*DAGWorkflow) Description ¶
func (w *DAGWorkflow) Description() string
Description returns the workflow description
func (*DAGWorkflow) GetMetadata ¶
func (w *DAGWorkflow) GetMetadata(key string) (any, bool)
GetMetadata retrieves a metadata value
func (*DAGWorkflow) Graph ¶
func (w *DAGWorkflow) Graph() *DAGGraph
Graph returns the underlying DAG graph
func (*DAGWorkflow) SetExecutor ¶
func (w *DAGWorkflow) SetExecutor(executor *DAGExecutor)
SetExecutor sets a custom executor for the workflow
func (*DAGWorkflow) SetMetadata ¶
func (w *DAGWorkflow) SetMetadata(key string, value any)
SetMetadata sets a metadata value
func (*DAGWorkflow) ToDAGDefinition ¶
func (w *DAGWorkflow) ToDAGDefinition() *DAGDefinition
ToDAGDefinition converts a DAGWorkflow to a DAGDefinition for serialization Note: This only captures the structure, not the runtime functions (conditions, iterators, steps)
type Duration ¶ added in v1.0.0
Duration wraps time.Duration with human-readable JSON serialization. JSON output is a string like "30s", "5m", "1h30m" instead of nanoseconds.
func (Duration) MarshalJSON ¶ added in v1.0.0
MarshalJSON serializes Duration as a human-readable string.
func (*Duration) UnmarshalJSON ¶ added in v1.0.0
UnmarshalJSON deserializes Duration from a string ("30s") or number (nanoseconds).
type EnhancedCheckpoint ¶
type EnhancedCheckpoint struct {
ID string `json:"id"`
WorkflowID string `json:"workflow_id"`
ThreadID string `json:"thread_id"`
Version int `json:"version"`
NodeID string `json:"node_id"`
NodeResults map[string]any `json:"node_results"`
Variables map[string]any `json:"variables"`
PendingNodes []string `json:"pending_nodes"`
CompletedNodes []string `json:"completed_nodes"`
Input any `json:"input"`
CreatedAt time.Time `json:"created_at"`
ParentID string `json:"parent_id,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Snapshot *GraphSnapshot `json:"snapshot,omitempty"`
}
EnhancedCheckpoint represents a workflow checkpoint with full state.
type EnhancedCheckpointManager ¶
type EnhancedCheckpointManager struct {
// contains filtered or unexported fields
}
EnhancedCheckpointManager manages workflow checkpoints with time-travel.
func NewEnhancedCheckpointManager ¶
func NewEnhancedCheckpointManager(store CheckpointStore, logger *zap.Logger) *EnhancedCheckpointManager
NewEnhancedCheckpointManager creates a new checkpoint manager.
func (*EnhancedCheckpointManager) Compare ¶
func (m *EnhancedCheckpointManager) Compare(ctx context.Context, threadID string, v1, v2 int) (*CheckpointDiff, error)
Compare compares two checkpoint versions.
func (*EnhancedCheckpointManager) CreateCheckpoint ¶
func (m *EnhancedCheckpointManager) CreateCheckpoint(ctx context.Context, executor *DAGExecutor, graph *DAGGraph, threadID string, input any) (*EnhancedCheckpoint, error)
CreateCheckpoint creates a checkpoint from current execution state.
func (*EnhancedCheckpointManager) GetHistory ¶
func (m *EnhancedCheckpointManager) GetHistory(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
GetHistory returns checkpoint history for time-travel debugging.
func (*EnhancedCheckpointManager) ResumeFromCheckpoint ¶
func (m *EnhancedCheckpointManager) ResumeFromCheckpoint(ctx context.Context, checkpointID string, graph *DAGGraph) (*DAGExecutor, error)
ResumeFromCheckpoint resumes workflow execution from a checkpoint.
func (*EnhancedCheckpointManager) Rollback ¶
func (m *EnhancedCheckpointManager) Rollback(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
Rollback rolls back to a specific version.
type ErrorConfig ¶
type ErrorConfig struct {
// Strategy specifies how to handle errors
Strategy ErrorStrategy
// MaxRetries is the maximum number of retry attempts (for retry strategy)
MaxRetries int
// RetryDelayMs is the delay between retries in milliseconds
RetryDelayMs int
// FallbackValue is the value to use when skipping a failed node
FallbackValue any
}
ErrorConfig defines error handling behavior for a node
type ErrorDefinition ¶ added in v1.0.0
type ErrorDefinition struct {
// Strategy specifies how to handle errors (fail_fast, skip, retry)
Strategy string `json:"strategy" yaml:"strategy"`
// MaxRetries is the maximum number of retry attempts (for retry strategy)
MaxRetries int `json:"max_retries,omitempty" yaml:"max_retries,omitempty"`
// RetryDelayMs is the delay between retries in milliseconds
RetryDelayMs int `json:"retry_delay_ms,omitempty" yaml:"retry_delay_ms,omitempty"`
// FallbackValue is the value to use when skipping a failed node
FallbackValue any `json:"fallback_value,omitempty" yaml:"fallback_value,omitempty"`
}
ErrorDefinition represents a serializable error handling configuration
type ErrorStrategy ¶
type ErrorStrategy string
ErrorStrategy defines how errors should be handled
const ( // ErrorStrategyFailFast stops execution immediately on error ErrorStrategyFailFast ErrorStrategy = "fail_fast" // ErrorStrategySkip skips the failed node and continues ErrorStrategySkip ErrorStrategy = "skip" // ErrorStrategyRetry retries the failed node ErrorStrategyRetry ErrorStrategy = "retry" )
type ExecutionContext ¶
type ExecutionContext struct {
// WorkflowID identifies the workflow being executed
WorkflowID string `json:"workflow_id,omitempty"`
// CurrentNode is the ID of the currently executing node
CurrentNode string `json:"current_node,omitempty"`
// NodeResults stores the results of completed nodes
NodeResults map[string]any `json:"node_results,omitempty"`
// Variables stores workflow variables
Variables map[string]any `json:"variables,omitempty"`
// StartTime is when the workflow execution started
StartTime time.Time `json:"start_time,omitempty"`
// LastUpdateTime is when the context was last updated
LastUpdateTime time.Time `json:"last_update_time,omitempty"`
}
ExecutionContext captures the execution state for checkpointing
func NewExecutionContext ¶
func NewExecutionContext(workflowID string) *ExecutionContext
NewExecutionContext creates a new execution context
func (*ExecutionContext) GetNodeResult ¶
func (ec *ExecutionContext) GetNodeResult(nodeID string) (any, bool)
GetNodeResult retrieves the result of a completed node
func (*ExecutionContext) GetVariable ¶
func (ec *ExecutionContext) GetVariable(key string) (any, bool)
GetVariable retrieves a workflow variable
func (*ExecutionContext) SetCurrentNode ¶
func (ec *ExecutionContext) SetCurrentNode(nodeID string)
SetCurrentNode updates the currently executing node
func (*ExecutionContext) SetNodeResult ¶
func (ec *ExecutionContext) SetNodeResult(nodeID string, result any)
SetNodeResult stores the result of a completed node
func (*ExecutionContext) SetVariable ¶
func (ec *ExecutionContext) SetVariable(key string, value any)
SetVariable sets a workflow variable
type ExecutionHistory ¶
type ExecutionHistory struct {
ExecutionID string `json:"execution_id"`
WorkflowID string `json:"workflow_id"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration time.Duration `json:"duration"`
Status ExecutionStatus `json:"status"`
Nodes []*NodeExecution `json:"nodes"`
Error string `json:"error,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
// contains filtered or unexported fields
}
ExecutionHistory records the complete execution path of a workflow
func NewExecutionHistory ¶
func NewExecutionHistory(executionID, workflowID string) *ExecutionHistory
NewExecutionHistory creates a new execution history
func (*ExecutionHistory) Complete ¶
func (h *ExecutionHistory) Complete(err error)
Complete marks the execution as completed
func (*ExecutionHistory) GetNodeByID ¶
func (h *ExecutionHistory) GetNodeByID(nodeID string) *NodeExecution
GetNodeByID returns the execution record for a specific node
func (*ExecutionHistory) GetNodes ¶
func (h *ExecutionHistory) GetNodes() []*NodeExecution
GetNodes returns a copy of the node executions
func (*ExecutionHistory) RecordNodeEnd ¶
func (h *ExecutionHistory) RecordNodeEnd(node *NodeExecution, output any, err error)
RecordNodeEnd records the end of a node execution
func (*ExecutionHistory) RecordNodeStart ¶
func (h *ExecutionHistory) RecordNodeStart(nodeID string, nodeType NodeType, input any) *NodeExecution
RecordNodeStart records the start of a node execution
type ExecutionHistoryStore ¶
type ExecutionHistoryStore struct {
// contains filtered or unexported fields
}
ExecutionHistoryStore stores and queries execution histories
func NewExecutionHistoryStore ¶
func NewExecutionHistoryStore() *ExecutionHistoryStore
NewExecutionHistoryStore creates a new execution history store
func (*ExecutionHistoryStore) Get ¶
func (s *ExecutionHistoryStore) Get(executionID string) (*ExecutionHistory, bool)
Get retrieves an execution history by ID
func (*ExecutionHistoryStore) ListByStatus ¶
func (s *ExecutionHistoryStore) ListByStatus(status ExecutionStatus) []*ExecutionHistory
ListByStatus returns executions with a specific status
func (*ExecutionHistoryStore) ListByTimeRange ¶
func (s *ExecutionHistoryStore) ListByTimeRange(start, end time.Time) []*ExecutionHistory
ListByTimeRange returns executions within a time range
func (*ExecutionHistoryStore) ListByWorkflow ¶
func (s *ExecutionHistoryStore) ListByWorkflow(workflowID string) []*ExecutionHistory
ListByWorkflow returns all executions for a workflow
func (*ExecutionHistoryStore) Save ¶
func (s *ExecutionHistoryStore) Save(history *ExecutionHistory)
Save saves an execution history
type ExecutionStatus ¶
type ExecutionStatus string
ExecutionStatus represents the status of an execution
const ( // ExecutionStatusRunning indicates the execution is in progress ExecutionStatusRunning ExecutionStatus = "running" // ExecutionStatusCompleted indicates the execution completed successfully ExecutionStatusCompleted ExecutionStatus = "completed" // ExecutionStatusFailed indicates the execution failed ExecutionStatusFailed ExecutionStatus = "failed" )
type FuncAggregator ¶
type FuncAggregator struct {
// contains filtered or unexported fields
}
FuncAggregator 函数聚合器
func NewFuncAggregator ¶
func NewFuncAggregator(fn AggregatorFunc) *FuncAggregator
NewFuncAggregator 创建函数聚合器
func (*FuncAggregator) Aggregate ¶
func (a *FuncAggregator) Aggregate(ctx context.Context, results []TaskResult) (any, error)
type FuncHandler ¶
type FuncHandler struct {
// contains filtered or unexported fields
}
FuncHandler 函数处理器
func NewFuncHandler ¶
func NewFuncHandler(name string, fn HandlerFunc) *FuncHandler
NewFuncHandler 创建函数处理器
func (*FuncHandler) Name ¶
func (h *FuncHandler) Name() string
type FuncRouter ¶
type FuncRouter struct {
// contains filtered or unexported fields
}
FuncRouter 函数路由器
type GraphSnapshot ¶
type GraphSnapshot struct {
Nodes map[string]NodeSnapshot `json:"nodes"`
Edges map[string][]string `json:"edges"`
EntryNode string `json:"entry_node"`
}
GraphSnapshot captures the complete graph state.
type HandlerFunc ¶
HandlerFunc 处理器函数类型
type HumanInputHandler ¶
type HumanInputHandler interface {
// RequestInput sends a prompt to a human and waits for a response.
// inputType hints at the expected response format (e.g. "text", "choice").
// options provides selectable choices when inputType is "choice".
RequestInput(ctx context.Context, prompt string, inputType string, options []string) (any, error)
}
HumanInputHandler abstracts human-in-the-loop interaction for workflow steps. Implement this interface to bridge workflow with your HITL management layer.
type HumanInputStep ¶
type HumanInputStep struct {
Prompt string
Type string
Options []string
Timeout int
Handler HumanInputHandler // Optional: inject to enable real HITL
}
HumanInputStep waits for human input. When Handler is set, it sends a request to the HITL handler and waits for a response. When Handler is nil, it returns a placeholder map (backward compatible).
func (*HumanInputStep) Name ¶
func (s *HumanInputStep) Name() string
type InMemoryCheckpointStore ¶
type InMemoryCheckpointStore struct {
// contains filtered or unexported fields
}
InMemoryCheckpointStore provides in-memory storage.
func NewInMemoryCheckpointStore ¶
func NewInMemoryCheckpointStore() *InMemoryCheckpointStore
NewInMemoryCheckpointStore creates a new in-memory store.
func (*InMemoryCheckpointStore) Delete ¶
func (s *InMemoryCheckpointStore) Delete(ctx context.Context, id string) error
func (*InMemoryCheckpointStore) ListVersions ¶
func (s *InMemoryCheckpointStore) ListVersions(ctx context.Context, threadID string) ([]*EnhancedCheckpoint, error)
func (*InMemoryCheckpointStore) Load ¶
func (s *InMemoryCheckpointStore) Load(ctx context.Context, id string) (*EnhancedCheckpoint, error)
func (*InMemoryCheckpointStore) LoadLatest ¶
func (s *InMemoryCheckpointStore) LoadLatest(ctx context.Context, threadID string) (*EnhancedCheckpoint, error)
func (*InMemoryCheckpointStore) LoadVersion ¶
func (s *InMemoryCheckpointStore) LoadVersion(ctx context.Context, threadID string, version int) (*EnhancedCheckpoint, error)
func (*InMemoryCheckpointStore) Save ¶
func (s *InMemoryCheckpointStore) Save(ctx context.Context, cp *EnhancedCheckpoint) error
type IteratorFunc ¶
IteratorFunc generates a collection of items for iteration
type LLMStep ¶
type LLMStep struct {
Model string
Prompt string
Temperature float64
MaxTokens int
Provider llm.Provider // Optional: inject to enable real LLM calls
}
LLMStep executes an LLM call. When Provider is set, it performs a real LLM completion request. When Provider is nil, it returns a placeholder map (backward compatible).
type LoopConfig ¶
type LoopConfig struct {
// Type specifies the loop type (while, for, foreach)
Type LoopType
// MaxIterations limits the maximum number of iterations (0 = unlimited)
MaxIterations int
// Condition evaluates whether to continue looping (for while loops)
Condition ConditionFunc
// Iterator generates items to iterate over (for foreach loops)
Iterator IteratorFunc
}
LoopConfig defines loop behavior
type LoopDefinition ¶
type LoopDefinition struct {
// Type is the loop type (while, for, foreach)
Type string `json:"type" yaml:"type"`
// MaxIterations limits the maximum number of iterations
MaxIterations int `json:"max_iterations" yaml:"max_iterations"`
// Condition is the condition name (for while loops)
Condition string `json:"condition,omitempty" yaml:"condition,omitempty"`
}
LoopDefinition represents a serializable loop configuration
type NativeAgentAdapter ¶
type NativeAgentAdapter struct {
// contains filtered or unexported fields
}
NativeAgentAdapter adapts an agent.Agent (with *Input/*Output signatures) to the AgentExecutor interface used by workflow steps.
Input conversion (any -> *agent.Input):
- *agent.Input: passed through directly
- string: wrapped as Input.Content
- map[string]any: Content extracted from "content" key, rest goes to Context
- other types: converted to string via fmt.Sprintf and set as Content
Output: returns the *agent.Output directly (callers can type-assert).
func NewNativeAgentAdapter ¶
func NewNativeAgentAdapter(a agent.Agent) *NativeAgentAdapter
NewNativeAgentAdapter creates an adapter that bridges agent.Agent to AgentExecutor.
func (*NativeAgentAdapter) Execute ¶
Execute implements AgentExecutor. It converts the workflow input to *agent.Input, calls agent.Execute, and returns the *agent.Output.
func (*NativeAgentAdapter) ID ¶
func (n *NativeAgentAdapter) ID() string
ID implements AgentExecutor.
func (*NativeAgentAdapter) Name ¶
func (n *NativeAgentAdapter) Name() string
Name implements AgentExecutor.
type NodeBuilder ¶
type NodeBuilder struct {
// contains filtered or unexported fields
}
NodeBuilder provides a fluent API for configuring individual nodes
func (*NodeBuilder) Done ¶
func (nb *NodeBuilder) Done() *DAGBuilder
Done completes node configuration and returns to the DAGBuilder
func (*NodeBuilder) WithCondition ¶
func (nb *NodeBuilder) WithCondition(cond ConditionFunc) *NodeBuilder
WithCondition sets the condition function for a conditional node
func (*NodeBuilder) WithErrorConfig ¶
func (nb *NodeBuilder) WithErrorConfig(config ErrorConfig) *NodeBuilder
WithErrorConfig sets the error handling configuration for a node
func (*NodeBuilder) WithLoop ¶
func (nb *NodeBuilder) WithLoop(config LoopConfig) *NodeBuilder
WithLoop sets the loop configuration for a loop node
func (*NodeBuilder) WithMetadata ¶
func (nb *NodeBuilder) WithMetadata(key string, value any) *NodeBuilder
WithMetadata sets a metadata value
func (*NodeBuilder) WithOnFalse ¶
func (nb *NodeBuilder) WithOnFalse(nodeIDs ...string) *NodeBuilder
WithOnFalse sets the nodes to execute when condition is false
func (*NodeBuilder) WithOnTrue ¶
func (nb *NodeBuilder) WithOnTrue(nodeIDs ...string) *NodeBuilder
WithOnTrue sets the nodes to execute when condition is true
func (*NodeBuilder) WithStep ¶
func (nb *NodeBuilder) WithStep(step Step) *NodeBuilder
WithStep sets the step for an action node
func (*NodeBuilder) WithSubGraph ¶
func (nb *NodeBuilder) WithSubGraph(subGraph *DAGGraph) *NodeBuilder
WithSubGraph sets the subgraph for a subgraph node
type NodeConfig ¶
type NodeConfig struct {
// LLM node config
Model string `json:"model,omitempty"`
Prompt string `json:"prompt,omitempty"`
Temperature float64 `json:"temperature,omitempty"`
MaxTokens int `json:"max_tokens,omitempty"`
// Tool node config
ToolName string `json:"tool_name,omitempty"`
ToolParams map[string]any `json:"tool_params,omitempty"`
// Condition node config
Condition string `json:"condition,omitempty"`
Expression string `json:"expression,omitempty"`
// Loop node config
LoopType string `json:"loop_type,omitempty"`
MaxIterations int `json:"max_iterations,omitempty"`
// Code node config
Code string `json:"code,omitempty"`
Language string `json:"language,omitempty"`
// Human input config
InputPrompt string `json:"input_prompt,omitempty"`
InputType string `json:"input_type,omitempty"`
Options []string `json:"options,omitempty"`
Timeout int `json:"timeout_seconds,omitempty"`
// Subflow config
SubflowID string `json:"subflow_id,omitempty"`
}
NodeConfig contains node-specific configuration.
type NodeDefinition ¶
type NodeDefinition struct {
// ID is the unique node identifier
ID string `json:"id" yaml:"id"`
// Type is the node type
Type string `json:"type" yaml:"type"`
// Step is the step name (for action nodes)
Step string `json:"step,omitempty" yaml:"step,omitempty"`
// Condition is the condition name (for conditional nodes)
Condition string `json:"condition,omitempty" yaml:"condition,omitempty"`
// Next lists the next nodes to execute (for action nodes)
Next []string `json:"next,omitempty" yaml:"next,omitempty"`
// OnTrue lists nodes to execute when condition is true
OnTrue []string `json:"on_true,omitempty" yaml:"on_true,omitempty"`
// OnFalse lists nodes to execute when condition is false
OnFalse []string `json:"on_false,omitempty" yaml:"on_false,omitempty"`
// Loop defines loop configuration (for loop nodes)
Loop *LoopDefinition `json:"loop,omitempty" yaml:"loop,omitempty"`
// SubGraph defines a nested workflow (for subgraph nodes)
SubGraph *DAGDefinition `json:"subgraph,omitempty" yaml:"subgraph,omitempty"`
// Error defines error handling configuration
Error *ErrorDefinition `json:"error,omitempty" yaml:"error,omitempty"`
// Metadata stores additional node information
Metadata map[string]any `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}
NodeDefinition represents a serializable node definition
type NodeExecution ¶
type NodeExecution struct {
NodeID string `json:"node_id"`
NodeType NodeType `json:"node_type"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration time.Duration `json:"duration"`
Status ExecutionStatus `json:"status"`
Input any `json:"input,omitempty"`
Output any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
}
NodeExecution records the execution of a single node
type NodeOutput ¶
NodeOutput represents output from a graph node.
type NodeSnapshot ¶
type NodeSnapshot struct {
ID string `json:"id"`
Type string `json:"type"`
Status string `json:"status"`
Input any `json:"input,omitempty"`
Output any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
Duration int64 `json:"duration_ms,omitempty"`
}
NodeSnapshot captures a node's state.
type NodeType ¶
type NodeType string
NodeType defines the type of a DAG node
const ( // NodeTypeAction executes a step NodeTypeAction NodeType = "action" // NodeTypeCondition performs conditional branching NodeTypeCondition NodeType = "condition" // NodeTypeLoop performs loop iteration NodeTypeLoop NodeType = "loop" // NodeTypeParallel executes nodes concurrently NodeTypeParallel NodeType = "parallel" // NodeTypeSubGraph executes a nested workflow NodeTypeSubGraph NodeType = "subgraph" // NodeTypeCheckpoint creates a checkpoint NodeTypeCheckpoint NodeType = "checkpoint" )
type ParallelAgentStep ¶
type ParallelAgentStep struct {
// contains filtered or unexported fields
}
ParallelAgentStep executes multiple agents in parallel.
func NewParallelAgentStep ¶
func NewParallelAgentStep(agents []AgentExecutor, merger func([]any) (any, error)) *ParallelAgentStep
NewParallelAgentStep creates a step that executes agents in parallel.
func (*ParallelAgentStep) Name ¶
func (p *ParallelAgentStep) Name() string
Name implements the Step interface.
type ParallelWorkflow ¶
type ParallelWorkflow struct {
// contains filtered or unexported fields
}
ParallelWorkflow 并行工作流 将任务分割为多个子任务并行执行,然后聚合结果
func NewParallelWorkflow ¶
func NewParallelWorkflow(name, description string, aggregator Aggregator, tasks ...Task) *ParallelWorkflow
NewParallelWorkflow 创建并行工作流
func (*ParallelWorkflow) Description ¶
func (w *ParallelWorkflow) Description() string
func (*ParallelWorkflow) Name ¶
func (w *ParallelWorkflow) Name() string
type PassthroughStep ¶
type PassthroughStep struct{}
PassthroughStep passes input directly to output.
func (*PassthroughStep) Name ¶
func (s *PassthroughStep) Name() string
type Port ¶
type Port struct {
ID string `json:"id"`
Name string `json:"name"`
Type string `json:"type"` // string, number, boolean, object, array
}
Port represents an input/output port on a node.
type Reducer ¶
type Reducer[T any] func(current T, update T) T
Reducer defines how to merge state updates from multiple nodes.
func AppendReducer ¶
AppendReducer appends slices together.
func LastValueReducer ¶
LastValueReducer returns the most recent value (default).
func MaxReducer ¶
MaxReducer keeps the maximum value.
func MergeMapReducer ¶
func MergeMapReducer[K comparable, V any]() Reducer[map[K]V]
MergeMapReducer merges maps, with update values taking precedence.
type RouterFunc ¶
RouterFunc 路由函数类型
type RoutingWorkflow ¶
type RoutingWorkflow struct {
// contains filtered or unexported fields
}
RoutingWorkflow 路由工作流 根据输入分类,将任务路由到专门的处理器
func NewRoutingWorkflow ¶
func NewRoutingWorkflow(name, description string, router Router) *RoutingWorkflow
NewRoutingWorkflow 创建路由工作流
func (*RoutingWorkflow) Description ¶
func (w *RoutingWorkflow) Description() string
func (*RoutingWorkflow) Name ¶
func (w *RoutingWorkflow) Name() string
func (*RoutingWorkflow) RegisterHandler ¶
func (w *RoutingWorkflow) RegisterHandler(route string, handler Handler)
RegisterHandler 注册处理器
func (*RoutingWorkflow) SetDefaultRoute ¶
func (w *RoutingWorkflow) SetDefaultRoute(route string)
SetDefaultRoute 设置默认路由
type Runnable ¶ added in v1.0.0
Runnable is the common execution interface shared by Step, Task, and Handler. It represents any unit of work that can be executed with input and produce output.
type StateGraph ¶
type StateGraph struct {
// contains filtered or unexported fields
}
StateGraph manages multiple channels as a unified state.
func (*StateGraph) ApplyNodeOutput ¶
func (sg *StateGraph) ApplyNodeOutput(output NodeOutput) error
ApplyNodeOutput applies a node's output to the state graph.
func (*StateGraph) Snapshot ¶
func (sg *StateGraph) Snapshot() StateSnapshot
Snapshot creates a snapshot of the current state.
type StateSnapshot ¶
type StateSnapshot struct {
Values map[string]any `json:"values"`
Versions map[string]uint64 `json:"versions"`
}
StateSnapshot captures the current state of all channels.
type Tool ¶
type Tool interface {
Name() string
Execute(ctx context.Context, params map[string]any) (any, error)
}
Tool represents an executable tool within a workflow.
type ToolRegistry ¶
type ToolRegistry interface {
// GetTool returns a Tool by name. Returns nil, false if not found.
GetTool(name string) (Tool, bool)
// ExecuteTool looks up and executes a tool in one call.
ExecuteTool(ctx context.Context, name string, params map[string]any) (any, error)
}
ToolRegistry abstracts tool lookup and execution for workflow steps. Implement this interface to bridge workflow with your tool management layer.
type ToolStep ¶
type ToolStep struct {
ToolName string
Params map[string]any
Registry ToolRegistry // Optional: inject to enable real tool execution
}
ToolStep executes a tool call. When Registry is set, it performs a real tool execution. When Registry is nil, it returns a placeholder map (backward compatible).
type Variable ¶
type Variable struct {
Name string `json:"name"`
Type string `json:"type"`
DefaultValue any `json:"default_value,omitempty"`
Description string `json:"description,omitempty"`
}
Variable represents a workflow variable.
type VisualBuilder ¶
type VisualBuilder struct {
// contains filtered or unexported fields
}
VisualBuilder builds DAG workflows from visual definitions.
func NewVisualBuilder ¶
func NewVisualBuilder() *VisualBuilder
NewVisualBuilder creates a new visual builder.
func (*VisualBuilder) Build ¶
func (b *VisualBuilder) Build(vw *VisualWorkflow) (*DAGWorkflow, error)
Build converts a visual workflow to executable DAG.
func (*VisualBuilder) RegisterStep ¶
func (b *VisualBuilder) RegisterStep(name string, step Step)
RegisterStep registers a step implementation.
type VisualEdge ¶
type VisualEdge struct {
ID string `json:"id"`
Source string `json:"source"`
SourcePort string `json:"source_port,omitempty"`
Target string `json:"target"`
TargetPort string `json:"target_port,omitempty"`
Label string `json:"label,omitempty"`
Condition string `json:"condition,omitempty"` // For conditional edges
}
VisualEdge represents a connection between nodes.
type VisualNode ¶
type VisualNode struct {
ID string `json:"id"`
Type VisualNodeType `json:"type"`
Label string `json:"label"`
Position Position `json:"position"`
Config NodeConfig `json:"config"`
Inputs []Port `json:"inputs,omitempty"`
Outputs []Port `json:"outputs,omitempty"`
}
VisualNode represents a node in the visual workflow.
type VisualNodeType ¶
type VisualNodeType string
VisualNodeType defines visual node types.
const ( VNodeStart VisualNodeType = "start" VNodeEnd VisualNodeType = "end" VNodeLLM VisualNodeType = "llm" VNodeTool VisualNodeType = "tool" VNodeCondition VisualNodeType = "condition" VNodeLoop VisualNodeType = "loop" VNodeParallel VisualNodeType = "parallel" VNodeHuman VisualNodeType = "human_input" VNodeCode VisualNodeType = "code" VNodeSubflow VisualNodeType = "subflow" )
type VisualWorkflow ¶
type VisualWorkflow struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Version string `json:"version"`
Nodes []VisualNode `json:"nodes"`
Edges []VisualEdge `json:"edges"`
Variables []Variable `json:"variables,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
VisualWorkflow represents a workflow designed in visual builder.
func Import ¶
func Import(data []byte) (*VisualWorkflow, error)
Import imports visual workflow from JSON.
func (*VisualWorkflow) Export ¶
func (vw *VisualWorkflow) Export() ([]byte, error)
Export exports visual workflow to JSON.
func (*VisualWorkflow) Validate ¶
func (vw *VisualWorkflow) Validate() error
Validate validates the visual workflow.
type Workflow ¶
type Workflow interface {
Runnable
// Name 返回工作流名称
Name() string
// Description 返回工作流描述
Description() string
}
Workflow 工作流接口 Workflow 是预定义的步骤序列,提供可预测和一致的执行
type WorkflowStreamEmitter ¶ added in v1.0.0
type WorkflowStreamEmitter func(WorkflowStreamEvent)
WorkflowStreamEmitter is a callback that receives workflow stream events.
type WorkflowStreamEvent ¶ added in v1.0.0
type WorkflowStreamEvent struct {
Type WorkflowStreamEventType `json:"type"`
NodeID string `json:"node_id,omitempty"`
NodeName string `json:"node_name,omitempty"`
Data any `json:"data,omitempty"`
Error error `json:"-"`
}
WorkflowStreamEvent carries information about a workflow execution event.
type WorkflowStreamEventType ¶ added in v1.0.0
type WorkflowStreamEventType string
WorkflowStreamEventType defines the type of workflow stream event.
const ( // WorkflowEventNodeStart is emitted before a DAG node begins execution. WorkflowEventNodeStart WorkflowStreamEventType = "node_start" // WorkflowEventNodeComplete is emitted after a DAG node finishes successfully. WorkflowEventNodeComplete WorkflowStreamEventType = "node_complete" // WorkflowEventNodeError is emitted when a DAG node fails. WorkflowEventNodeError WorkflowStreamEventType = "node_error" // WorkflowEventStepProgress is emitted for intermediate step progress. WorkflowEventStepProgress WorkflowStreamEventType = "step_progress" // WorkflowEventToken is emitted for streaming token output from LLM steps. WorkflowEventToken WorkflowStreamEventType = "token" )