workflow

package
v0.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2025 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RouterEventStarted   = "router_started"
	RouterEventCompleted = "router_completed"
	RouterEventFailed    = "router_failed"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentEventType

type AgentEventType string

AgentEventType Agent 事件类型

const (
	AgentEventStart         AgentEventType = "agent_start"
	AgentEventWorkflowStart AgentEventType = "workflow_start"
	AgentEventWorkflowEvent AgentEventType = "workflow_event"
	AgentEventResponse      AgentEventType = "agent_response"
	AgentEventComplete      AgentEventType = "agent_complete"
	AgentEventError         AgentEventType = "agent_error"
)

type AgentFactory

type AgentFactory interface {
	CreateAgent(ctx context.Context, ref *AgentRef, config map[string]interface{}) (workflow.Agent, error)
}

AgentFactory Agent工厂接口

type AgentRef

type AgentRef struct {
	ID       string                 `json:"id"`
	Template string                 `json:"template"`
	Config   map[string]interface{} `json:"config,omitempty"`
	Inputs   map[string]string      `json:"inputs,omitempty"`  // 输入映射
	Outputs  map[string]string      `json:"outputs,omitempty"` // 输出映射
}

AgentRef Agent引用

type AgentStep

type AgentStep struct {
	// contains filtered or unexported fields
}

func NewAgentStep

func NewAgentStep(name string, agent *agent.Agent) *AgentStep

func (*AgentStep) Config

func (s *AgentStep) Config() *StepConfig

func (*AgentStep) Description

func (s *AgentStep) Description() string

func (*AgentStep) Execute

func (s *AgentStep) Execute(ctx context.Context, input *StepInput) *stream.Reader[*StepOutput]

func (*AgentStep) ID

func (s *AgentStep) ID() string

func (*AgentStep) Name

func (s *AgentStep) Name() string

func (*AgentStep) Type

func (s *AgentStep) Type() StepType

func (*AgentStep) WithDescription

func (s *AgentStep) WithDescription(desc string) *AgentStep

func (*AgentStep) WithTimeout

func (s *AgentStep) WithTimeout(timeout time.Duration) *AgentStep

type AgentStreamEvent

type AgentStreamEvent struct {
	Type      AgentEventType
	Timestamp time.Time
	Data      map[string]interface{}
	Error     error
}

AgentStreamEvent Agent 流式事件

type AsyncBranch

type AsyncBranch struct {
	ID        string
	Name      string
	Status    AsyncBranchStatus
	CreatedAt time.Time
	StartedAt time.Time
	UpdatedAt time.Time
	Result    interface{}
	Error     error
	Metadata  map[string]interface{}
}

AsyncBranch 异步分支

type AsyncBranchStatus

type AsyncBranchStatus string

AsyncBranchStatus 异步分支状态

const (
	AsyncBranchStatusPending   AsyncBranchStatus = "pending"
	AsyncBranchStatusRunning   AsyncBranchStatus = "running"
	AsyncBranchStatusCompleted AsyncBranchStatus = "completed"
	AsyncBranchStatusFailed    AsyncBranchStatus = "failed"
	AsyncBranchStatusCancelled AsyncBranchStatus = "cancelled"
)

type AsyncMetrics

type AsyncMetrics struct {
	TotalBranches     int           `json:"total_branches"`
	ActiveBranches    int           `json:"active_branches"`
	CompletedBranches int           `json:"completed_branches"`
	TotalRequests     int64         `json:"total_requests"`
	AverageLatency    time.Duration `json:"average_latency"`
	MaxConcurrent     int           `json:"max_concurrent"`
}

AsyncMetrics 异步指标

type AsyncParallelAgent

type AsyncParallelAgent struct {
	// contains filtered or unexported fields
}

AsyncParallelAgent 异步并行Agent

func NewAsyncParallelAgent

func NewAsyncParallelAgent(name string, joinType JoinType, timeout time.Duration) *AsyncParallelAgent

NewAsyncParallelAgent 创建异步并行Agent

func (*AsyncParallelAgent) AddBranch

func (a *AsyncParallelAgent) AddBranch(branch *AsyncBranch)

AddBranch 动态添加分支

func (*AsyncParallelAgent) Execute

func (a *AsyncParallelAgent) Execute(ctx context.Context, message types.Message, yield func(*session.Event, error) bool) error

Execute 执行异步并行工作流

func (*AsyncParallelAgent) GetAsyncMetrics

func (a *AsyncParallelAgent) GetAsyncMetrics() *AsyncMetrics

GetAsyncMetrics 获取异步指标

type BackoffType

type BackoffType string

BackoffType 退避策略

const (
	BackoffTypeFixed       BackoffType = "fixed"       // 固定延迟
	BackoffTypeLinear      BackoffType = "linear"      // 线性增长
	BackoffTypeExponential BackoffType = "exponential" // 指数退避
)

type BranchCondition

type BranchCondition struct {
	Name      string                 `json:"name"`
	Condition string                 `json:"condition"` // 条件表达式
	Agent     *AgentRef              `json:"agent"`     // 分支Agent
	Weight    int                    `json:"weight"`    // 权重(用于概率选择)
	Priority  int                    `json:"priority"`  // 优先级
	Metadata  map[string]interface{} `json:"metadata"`
}

BranchCondition 分支条件

type BranchEvaluationResult

type BranchEvaluationResult struct {
	Name      string                 `json:"name"`
	Condition string                 `json:"condition"`
	Agent     *AgentRef              `json:"agent"`
	Matched   bool                   `json:"matched"`
	Error     error                  `json:"error,omitempty"`
	Duration  time.Duration          `json:"duration"`
	Metadata  map[string]interface{} `json:"metadata"`
}

BranchEvaluationResult 分支评估结果

type ConditionDef

type ConditionDef struct {
	Type   ConditionType   `json:"type"`             // and, or, not, custom
	Rules  []ConditionRule `json:"rules"`            // 条件规则
	Custom string          `json:"custom,omitempty"` // 自定义条件表达式
}

ConditionDef 条件定义

type ConditionLevel

type ConditionLevel struct {
	Name       string                 `json:"name"`
	Conditions []BranchCondition      `json:"conditions"`
	Level      int                    `json:"level"`
	Else       *ConditionLevel        `json:"else,omitempty"` // else分支
	Metadata   map[string]interface{} `json:"metadata"`
}

ConditionLevel 条件层级

type ConditionRule

type ConditionRule struct {
	Variable string      `json:"variable"` // 变量路径,如 "input.score"
	Operator string      `json:"operator"` // eq, ne, gt, gte, lt, lte, in, nin, contains, regex
	Value    interface{} `json:"value"`    // 比较值
}

ConditionRule 条件规则

type ConditionStep

type ConditionStep struct {
	// contains filtered or unexported fields
}

func NewConditionStep

func NewConditionStep(name string, condition func(*StepInput) bool, ifTrue, ifFalse Step) *ConditionStep

func (*ConditionStep) Config

func (s *ConditionStep) Config() *StepConfig

func (*ConditionStep) Description

func (s *ConditionStep) Description() string

func (*ConditionStep) Execute

func (s *ConditionStep) Execute(ctx context.Context, input *StepInput) *stream.Reader[*StepOutput]

func (*ConditionStep) ID

func (s *ConditionStep) ID() string

func (*ConditionStep) Name

func (s *ConditionStep) Name() string

func (*ConditionStep) Type

func (s *ConditionStep) Type() StepType

type ConditionType

type ConditionType string

ConditionType 条件类型

const (
	ConditionTypeAnd    ConditionType = "and"
	ConditionTypeOr     ConditionType = "or"
	ConditionTypeNot    ConditionType = "not"
	ConditionTypeCustom ConditionType = "custom"
)

type ConditionalAgent

type ConditionalAgent struct {
	// contains filtered or unexported fields
}

ConditionalAgent 条件分支Agent

func NewConditionalAgent

func NewConditionalAgent(config ConditionalConfig) (*ConditionalAgent, error)

NewConditionalAgent 创建条件Agent

func (*ConditionalAgent) Execute

func (c *ConditionalAgent) Execute(ctx context.Context, message string) *stream.Reader[*session.Event]

Execute 执行条件分支

func (*ConditionalAgent) Name

func (c *ConditionalAgent) Name() string

Name 返回Agent名称

type ConditionalConfig

type ConditionalConfig struct {
	Name        string                 `json:"name"`
	Conditions  []BranchCondition      `json:"conditions"`
	Default     *AgentRef              `json:"default,omitempty"`
	Variables   map[string]interface{} `json:"variables,omitempty"`
	EvalTimeout time.Duration          `json:"eval_timeout,omitempty"`
}

ConditionalConfig 条件Agent配置

type DSLBuilder

type DSLBuilder struct {
	// contains filtered or unexported fields
}

DSLBuilder DSL构建器

func NewDSLBuilder

func NewDSLBuilder(id, name string) *DSLBuilder

NewDSLBuilder 创建DSL构建器

func (*DSLBuilder) AddConditionNode

func (b *DSLBuilder) AddConditionNode(id, name string, condition *ConditionDef, position Position) *DSLBuilder

AddConditionNode 添加条件节点

func (*DSLBuilder) AddEdge

func (b *DSLBuilder) AddEdge(id, from, to string, label string, condition string) *DSLBuilder

AddEdge 添加边

func (*DSLBuilder) AddEndNode

func (b *DSLBuilder) AddEndNode(id string, position Position) *DSLBuilder

AddEndNode 添加结束节点

func (*DSLBuilder) AddInput

func (b *DSLBuilder) AddInput(name, varType, description string, required bool, defaultValue interface{}) *DSLBuilder

AddInput 添加输入

func (*DSLBuilder) AddLoopNode

func (b *DSLBuilder) AddLoopNode(id, name string, loop *LoopDef, position Position) *DSLBuilder

AddLoopNode 添加循环节点

func (*DSLBuilder) AddOutput

func (b *DSLBuilder) AddOutput(name, varType, description string) *DSLBuilder

AddOutput 添加输出

func (*DSLBuilder) AddParallelNode

func (b *DSLBuilder) AddParallelNode(id, name string, parallel *ParallelDef, position Position) *DSLBuilder

AddParallelNode 添加并行节点

func (*DSLBuilder) AddStartNode

func (b *DSLBuilder) AddStartNode(id string, position Position) *DSLBuilder

AddStartNode 添加开始节点

func (*DSLBuilder) AddTaskNode

func (b *DSLBuilder) AddTaskNode(id, name string, agent *AgentRef, position Position) *DSLBuilder

AddTaskNode 添加任务节点

func (*DSLBuilder) Build

func (b *DSLBuilder) Build() *WorkflowDefinition

Build 构建工作流定义

func (*DSLBuilder) SetConfig

func (b *DSLBuilder) SetConfig(config *WorkflowConfig) *DSLBuilder

SetConfig 设置配置

func (*DSLBuilder) SetDescription

func (b *DSLBuilder) SetDescription(desc string) *DSLBuilder

SetDescription 设置描述

type EdgeDef

type EdgeDef struct {
	ID        string            `json:"id"`
	From      string            `json:"from"` // 源节点ID
	To        string            `json:"to"`   // 目标节点ID
	Label     string            `json:"label,omitempty"`
	Condition string            `json:"condition,omitempty"` // 边条件
	Metadata  map[string]string `json:"metadata,omitempty"`
}

EdgeDef 边定义

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine 工作流执行引擎

func NewEngine

func NewEngine(config *EngineConfig) (*Engine, error)

NewEngine 创建工作流引擎

func (*Engine) CancelExecution

func (e *Engine) CancelExecution(executionID string) error

CancelExecution 取消执行

func (*Engine) Execute

func (e *Engine) Execute(ctx context.Context, workflowID string, inputs map[string]interface{}) (*WorkflowResult, error)

Execute 执行工作流

func (*Engine) ExecuteAsync

func (e *Engine) ExecuteAsync(ctx context.Context, workflowID string, inputs map[string]interface{}) (string, error)

ExecuteAsync 异步执行工作流

func (*Engine) GetExecution

func (e *Engine) GetExecution(executionID string) (*WorkflowExecution, error)

GetExecution 获取执行状态

func (*Engine) GetMetrics

func (e *Engine) GetMetrics() *EngineMetrics

GetMetrics 获取引擎指标

func (*Engine) ListExecutions

func (e *Engine) ListExecutions(workflowID string, status WorkflowStatus, limit int) ([]*WorkflowExecution, error)

ListExecutions 列出执行记录

func (*Engine) PauseExecution

func (e *Engine) PauseExecution(executionID string) error

PauseExecution 暂停执行

func (*Engine) ResumeExecution

func (e *Engine) ResumeExecution(executionID string) error

ResumeExecution 恢复执行

func (*Engine) SetDependencies

func (e *Engine) SetDependencies(factory AgentFactory, sessionMgr SessionManager, eventBus EventBus)

SetDependencies 设置依赖

type EngineConfig

type EngineConfig struct {
	// 并发配置
	MaxConcurrentWorkflows int `json:"max_concurrent_workflows"`
	MaxConcurrentNodes     int `json:"max_concurrent_nodes"`

	// 超时配置
	DefaultNodeTimeout     time.Duration `json:"default_node_timeout"`
	DefaultWorkflowTimeout time.Duration `json:"default_workflow_timeout"`

	// 重试配置
	DefaultRetryPolicy *RetryDef `json:"default_retry_policy"`

	// 缓存配置
	EnableResultCache bool          `json:"enable_result_cache"`
	CacheTTL          time.Duration `json:"cache_ttl"`

	// 监控配置
	EnableMetrics bool `json:"enable_metrics"`
	EnableTracing bool `json:"enable_tracing"`

	// 安全配置
	EnableSandbox bool `json:"enable_sandbox"`
	EnableAudit   bool `json:"enable_audit"`
}

EngineConfig 引擎配置

type EngineMetrics

type EngineMetrics struct {
	TotalExecutions      int64         `json:"total_executions"`
	RunningExecutions    int64         `json:"running_executions"`
	CompletedExecutions  int64         `json:"completed_executions"`
	FailedExecutions     int64         `json:"failed_executions"`
	AverageExecutionTime time.Duration `json:"average_execution_time"`
}

EngineMetrics 引擎指标

type ErrorHandling

type ErrorHandling string

ErrorHandling 错误处理策略

const (
	ErrorHandlingStop     ErrorHandling = "stop"     // 遇到错误停止所有分支
	ErrorHandlingContinue ErrorHandling = "continue" // 继续其他分支
	ErrorHandlingRetry    ErrorHandling = "retry"    // 重试失败的分支
	ErrorHandlingIgnore   ErrorHandling = "ignore"   // 忽略错误
)

type EventBus

type EventBus interface {
	Publish(ctx context.Context, event *WorkflowEvent) error
	Subscribe(eventType string, handler EventHandler) error
	Unsubscribe(eventType string, handler EventHandler) error
}

EventBus 事件总线接口

type EventHandler

type EventHandler func(ctx context.Context, event *WorkflowEvent) error

EventHandler 事件处理器

type ExpressionEvaluator

type ExpressionEvaluator struct {
	// contains filtered or unexported fields
}

ExpressionEvaluator 表达式求值器

func NewExpressionEvaluator

func NewExpressionEvaluator(variables map[string]interface{}) *ExpressionEvaluator

NewExpressionEvaluator 创建表达式求值器

func (*ExpressionEvaluator) EvaluateBool

func (e *ExpressionEvaluator) EvaluateBool(expression string) (bool, error)

EvaluateBool 评估布尔表达式

type FunctionStep

type FunctionStep struct {
	// contains filtered or unexported fields
}

func NewFunctionStep

func NewFunctionStep(name string, executor func(ctx context.Context, input *StepInput) (*StepOutput, error)) *FunctionStep

func SimpleFunction

func SimpleFunction(name string, fn func(input interface{}) (interface{}, error)) *FunctionStep

func TransformFunction

func TransformFunction(name string, transform func(input interface{}) interface{}) *FunctionStep

func (*FunctionStep) Config

func (s *FunctionStep) Config() *StepConfig

func (*FunctionStep) Description

func (s *FunctionStep) Description() string

func (*FunctionStep) Execute

func (s *FunctionStep) Execute(ctx context.Context, input *StepInput) *stream.Reader[*StepOutput]

func (*FunctionStep) ID

func (s *FunctionStep) ID() string

func (*FunctionStep) Name

func (s *FunctionStep) Name() string

func (*FunctionStep) Type

func (s *FunctionStep) Type() StepType

func (*FunctionStep) WithDescription

func (s *FunctionStep) WithDescription(desc string) *FunctionStep

func (*FunctionStep) WithTimeout

func (s *FunctionStep) WithTimeout(timeout time.Duration) *FunctionStep

type JoinType

type JoinType string

JoinType 连接类型

const (
	JoinTypeWait     JoinType = "wait"     // 等待所有分支完成
	JoinTypeFirst    JoinType = "first"    // 等待第一个分支完成
	JoinTypeSuccess  JoinType = "success"  // 等待一个成功分支完成
	JoinTypeMajority JoinType = "majority" // 等待多数分支完成
)

type LoggingConfig

type LoggingConfig struct {
	Level   string   `json:"level"`   // debug, info, warn, error
	Format  string   `json:"format"`  // json, text
	Output  string   `json:"output"`  // stdout, file, syslog
	Exclude []string `json:"exclude"` // 排除的节点
	Include []string `json:"include"` // 包含的节点
}

LoggingConfig 日志配置

type LoopDef

type LoopDef struct {
	Type      LoopType `json:"type"`      // for, while, until, foreach
	Variable  string   `json:"variable"`  // 循环变量名
	Iterator  string   `json:"iterator"`  // 迭代器表达式
	Condition string   `json:"condition"` // 循环条件
	MaxLoops  int      `json:"max_loops"` // 最大循环次数(0=无限制)
}

LoopDef 循环定义

type LoopStep

type LoopStep struct {
	// contains filtered or unexported fields
}

func NewLoopStep

func NewLoopStep(name string, body Step, maxIterations int) *LoopStep

func (*LoopStep) Config

func (s *LoopStep) Config() *StepConfig

func (*LoopStep) Description

func (s *LoopStep) Description() string

func (*LoopStep) Execute

func (s *LoopStep) Execute(ctx context.Context, input *StepInput) *stream.Reader[*StepOutput]

func (*LoopStep) ID

func (s *LoopStep) ID() string

func (*LoopStep) Name

func (s *LoopStep) Name() string

func (*LoopStep) Type

func (s *LoopStep) Type() StepType

func (*LoopStep) WithStopCondition

func (s *LoopStep) WithStopCondition(condition func(*StepOutput) bool) *LoopStep

type LoopType

type LoopType string

LoopType 循环类型

const (
	LoopTypeFor     LoopType = "for"     // for i in range(10)
	LoopTypeWhile   LoopType = "while"   // while condition
	LoopTypeUntil   LoopType = "until"   // until condition
	LoopTypeForEach LoopType = "foreach" // foreach item in list
)

type MonitoringConfig

type MonitoringConfig struct {
	EnableMetrics bool   `json:"enable_metrics"`
	MetricsPath   string `json:"metrics_path"`
	EnableTracing bool   `json:"enable_tracing"`
	TracingPath   string `json:"tracing_path"`
}

MonitoringConfig 监控配置

type MultiLevelConditionalAgent

type MultiLevelConditionalAgent struct {
	// contains filtered or unexported fields
}

MultiLevelConditionalAgent 多级条件Agent(嵌套条件)

func NewMultiLevelConditionalAgent

func NewMultiLevelConditionalAgent(config MultiLevelConditionalConfig) (*MultiLevelConditionalAgent, error)

NewMultiLevelConditionalAgent 创建多级条件Agent

func (*MultiLevelConditionalAgent) Execute

Execute 执行多级条件

func (*MultiLevelConditionalAgent) Name

Name 返回Agent名称

type MultiLevelConditionalConfig

type MultiLevelConditionalConfig struct {
	Name      string                 `json:"name"`
	Levels    []ConditionLevel       `json:"levels"`
	Variables map[string]interface{} `json:"variables,omitempty"`
	MaxDepth  int                    `json:"max_depth"`
}

MultiLevelConditionalConfig 多级条件Agent配置

type NodeDef

type NodeDef struct {
	ID        string                 `json:"id"`
	Name      string                 `json:"name"`
	Type      NodeType               `json:"type"`
	Position  Position               `json:"position"`
	Config    map[string]interface{} `json:"config,omitempty"`
	Agent     *AgentRef              `json:"agent,omitempty"`     // Agent节点
	Condition *ConditionDef          `json:"condition,omitempty"` // 条件节点
	Loop      *LoopDef               `json:"loop,omitempty"`      // 循环节点
	Parallel  *ParallelDef           `json:"parallel,omitempty"`  // 并行节点
	Timeout   time.Duration          `json:"timeout,omitempty"`   // 超时时间
	Retry     *RetryDef              `json:"retry,omitempty"`     // 重试配置
}

NodeDef 节点定义

type NodeRef

type NodeRef struct {
	ID     string `json:"id"`
	Name   string `json:"name"`
	Branch string `json:"branch,omitempty"`
}

NodeRef 节点引用

type NodeResult

type NodeResult struct {
	NodeID     string                 `json:"node_id"`
	NodeName   string                 `json:"node_name"`
	NodeType   NodeType               `json:"node_type"`
	Status     WorkflowStatus         `json:"status"`
	StartTime  time.Time              `json:"start_time"`
	EndTime    time.Time              `json:"end_time"`
	Duration   time.Duration          `json:"duration"`
	Inputs     map[string]interface{} `json:"inputs"`
	Outputs    map[string]interface{} `json:"outputs"`
	Error      string                 `json:"error,omitempty"`
	RetryCount int                    `json:"retry_count"`
	Metadata   map[string]interface{} `json:"metadata"`
}

NodeResult 节点执行结果

type NodeType

type NodeType string

NodeType 节点类型

const (
	NodeTypeStart     NodeType = "start"     // 开始节点
	NodeTypeEnd       NodeType = "end"       // 结束节点
	NodeTypeTask      NodeType = "task"      // 任务节点(Agent)
	NodeTypeCondition NodeType = "condition" // 条件节点
	NodeTypeLoop      NodeType = "loop"      // 循环节点
	NodeTypeParallel  NodeType = "parallel"  // 并行节点
	NodeTypeMerge     NodeType = "merge"     // 合并节点
	NodeTypeTimeout   NodeType = "timeout"   // 超时节点
	NodeTypeError     NodeType = "error"     // 错误处理节点
	NodeTypeSubflow   NodeType = "subflow"   // 子工作流节点
)

type ParallelBranch

type ParallelBranch struct {
	Name    string
	ID      string
	Agent   AgentRef
	Timeout time.Duration
}

ParallelBranch 并行分支

type ParallelBranchResult

type ParallelBranchResult struct {
	Branch   ParallelBranch
	Success  bool
	Duration time.Duration
	Output   interface{}
	Error    error
	Metrics  map[string]interface{}
}

ParallelBranchResult 并行分支执行结果

type ParallelConditionalAgent

type ParallelConditionalAgent struct {
	// contains filtered or unexported fields
}

ParallelConditionalAgent 并行条件Agent(同时评估多个条件)

func NewParallelConditionalAgent

func NewParallelConditionalAgent(config ParallelConditionalConfig) (*ParallelConditionalAgent, error)

NewParallelConditionalAgent 创建并行条件Agent

func (*ParallelConditionalAgent) Execute

Execute 执行并行条件评估

func (*ParallelConditionalAgent) Name

func (p *ParallelConditionalAgent) Name() string

Name 返回Agent名称

type ParallelConditionalConfig

type ParallelConditionalConfig struct {
	Name        string                 `json:"name"`
	Conditions  []BranchCondition      `json:"conditions"`
	Default     *AgentRef              `json:"default,omitempty"`
	MaxParallel int                    `json:"max_parallel"`
	Timeout     time.Duration          `json:"timeout"`
	Variables   map[string]interface{} `json:"variables,omitempty"`
	Strategy    ParallelStrategy       `json:"strategy"` // first, all, majority
}

ParallelConditionalConfig 并行条件Agent配置

type ParallelDef

type ParallelDef struct {
	Type     ParallelType `json:"type"`      // all, any, race
	Branches []NodeRef    `json:"branches"`  // 并行分支
	JoinType JoinType     `json:"join_type"` // wait, first, success, majority
}

ParallelDef 并行定义

type ParallelMetrics

type ParallelMetrics struct {
	TotalBranches     int           `json:"total_branches"`
	CompletedBranches int           `json:"completed_branches"`
	FailedBranches    int           `json:"failed_branches"`
	TotalDuration     time.Duration `json:"total_duration"`
	AverageDuration   time.Duration `json:"average_duration"`
	MaxDuration       time.Duration `json:"max_duration"`
	MinDuration       time.Duration `json:"min_duration"`
	StartTime         time.Time     `json:"start_time"`
	EndTime           time.Time     `json:"end_time"`
}

ParallelMetrics 并行执行指标

type ParallelStep

type ParallelStep struct {
	// contains filtered or unexported fields
}

func NewParallelStep

func NewParallelStep(name string, steps ...Step) *ParallelStep

func (*ParallelStep) Config

func (s *ParallelStep) Config() *StepConfig

func (*ParallelStep) Description

func (s *ParallelStep) Description() string

func (*ParallelStep) Execute

func (s *ParallelStep) Execute(ctx context.Context, input *StepInput) *stream.Reader[*StepOutput]

func (*ParallelStep) ID

func (s *ParallelStep) ID() string

func (*ParallelStep) Name

func (s *ParallelStep) Name() string

func (*ParallelStep) Type

func (s *ParallelStep) Type() StepType

type ParallelStrategy

type ParallelStrategy string

ParallelStrategy 并行策略

const (
	StrategyFirst    ParallelStrategy = "first"    // 第一个成功的结果
	StrategyAll      ParallelStrategy = "all"      // 所有结果
	StrategyMajority ParallelStrategy = "majority" // 多数结果
)

type ParallelType

type ParallelType string

ParallelType 并行类型

const (
	ParallelTypeAll  ParallelType = "all"  // 执行所有分支
	ParallelTypeAny  ParallelType = "any"  // 执行任一分支
	ParallelTypeRace ParallelType = "race" // 竞争执行,最快的获胜
)

type ParallelWorkFlowAgent

type ParallelWorkFlowAgent struct {
	// contains filtered or unexported fields
}

ParallelWorkFlowAgent 并行工作流Agent

func NewParallelWorkFlowAgent

func NewParallelWorkFlowAgent(name string, branches []ParallelBranch, joinType JoinType, timeout time.Duration) *ParallelWorkFlowAgent

NewParallelWorkFlowAgent 创建并行工作流Agent

func (*ParallelWorkFlowAgent) Execute

func (p *ParallelWorkFlowAgent) Execute(ctx context.Context, message types.Message, yield func(*session.Event, error) bool) error

Execute 执行并行工作流

func (*ParallelWorkFlowAgent) GetMetrics

func (p *ParallelWorkFlowAgent) GetMetrics() *ParallelMetrics

GetMetrics 获取并行执行指标

func (*ParallelWorkFlowAgent) Name

func (p *ParallelWorkFlowAgent) Name() string

Name 返回Agent名称

type Position

type Position struct {
	X int `json:"x"`
	Y int `json:"y"`
}

Position 位置信息

type RetryDef

type RetryDef struct {
	MaxAttempts int           `json:"max_attempts"` // 最大重试次数
	Delay       time.Duration `json:"delay"`        // 重试延迟
	Backoff     BackoffType   `json:"backoff"`      // 退避策略
	MaxDelay    time.Duration `json:"max_delay"`    // 最大延迟
}

RetryDef 重试定义

type RoomStep added in v0.13.0

type RoomStep struct {
	// contains filtered or unexported fields
}

func NewRoomStep added in v0.13.0

func NewRoomStep(name string, room *core.Room) *RoomStep

func (*RoomStep) Config added in v0.13.0

func (s *RoomStep) Config() *StepConfig

func (*RoomStep) Description added in v0.13.0

func (s *RoomStep) Description() string

func (*RoomStep) Execute added in v0.13.0

func (s *RoomStep) Execute(ctx context.Context, input *StepInput) *stream.Reader[*StepOutput]

func (*RoomStep) ID added in v0.13.0

func (s *RoomStep) ID() string

func (*RoomStep) Name added in v0.13.0

func (s *RoomStep) Name() string

func (*RoomStep) Type added in v0.13.0

func (s *RoomStep) Type() StepType

func (*RoomStep) WithDescription added in v0.13.0

func (s *RoomStep) WithDescription(desc string) *RoomStep

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router 动态路由器 - 根据输入动态选择要执行的步骤 类似 agno 的 Router,支持返回多个步骤并顺序链接执行

func ChainRouter

func ChainRouter(name string, selector func(*StepInput) []string, routes map[string]Step) *Router

ChainRouter 创建链式路由器 根据条件选择多个步骤顺序执行

func DynamicRouter

func DynamicRouter(name string, selector func(*StepInput) []Step) *Router

DynamicRouter 创建动态路由器 完全自定义的步骤选择逻辑

func NewRouter

func NewRouter(name string, selector func(*StepInput) []Step, choices []Step) *Router

NewRouter 创建新的 Router

func SimpleRouter

func SimpleRouter(name string, condition func(*StepInput) string, routes map[string]Step) *Router

SimpleRouter 创建简单的条件路由器 根据条件选择单个步骤执行

func (*Router) Config

func (r *Router) Config() *StepConfig

func (*Router) Description

func (r *Router) Description() string

func (*Router) Execute

func (r *Router) Execute(ctx context.Context, input *StepInput) *stream.Reader[*StepOutput]

Execute 执行 Router - 选择步骤并顺序链接执行

func (*Router) ExecuteStream

func (r *Router) ExecuteStream(ctx context.Context, input *StepInput, streamEvents bool) *stream.Reader[interface{}]

ExecuteStream 流式执行 Router - 支持实时事件流

func (*Router) ID

func (r *Router) ID() string

func (*Router) Name

func (r *Router) Name() string

func (*Router) Type

func (r *Router) Type() StepType

func (*Router) WithDescription

func (r *Router) WithDescription(desc string) *Router

WithDescription 设置描述

func (*Router) WithTimeout

func (r *Router) WithTimeout(timeout time.Duration) *Router

WithTimeout 设置超时

type RouterEvent

type RouterEvent struct {
	Type          string
	RouterName    string
	SelectedSteps []string
	ExecutedSteps int
	Timestamp     time.Time
}

type RouterStep

type RouterStep struct {
	// contains filtered or unexported fields
}

func NewRouterStep

func NewRouterStep(name string, router func(*StepInput) string, routes map[string]Step) *RouterStep

func (*RouterStep) Config

func (s *RouterStep) Config() *StepConfig

func (*RouterStep) Description

func (s *RouterStep) Description() string

func (*RouterStep) Execute

func (s *RouterStep) Execute(ctx context.Context, input *StepInput) *stream.Reader[*StepOutput]

func (*RouterStep) ID

func (s *RouterStep) ID() string

func (*RouterStep) Name

func (s *RouterStep) Name() string

func (*RouterStep) Type

func (s *RouterStep) Type() StepType

func (*RouterStep) WithDefault

func (s *RouterStep) WithDefault(step Step) *RouterStep

type RunEvent

type RunEvent struct {
	Type         WorkflowEventType
	EventID      string
	WorkflowID   string
	WorkflowName string
	RunID        string
	StepID       string
	StepName     string
	Data         interface{}
	Timestamp    time.Time
	Metadata     map[string]interface{}
}

RunEvent Workflow 运行事件

type RunMetrics

type RunMetrics struct {
	TotalExecutionTime float64
	TotalSteps         int
	SuccessfulSteps    int
	FailedSteps        int
	SkippedSteps       int
	TotalInputTokens   int
	TotalOutputTokens  int
	TotalTokens        int
	StepMetrics        map[string]*StepMetrics
}

RunMetrics Workflow 运行指标

type RunStatus

type RunStatus string

RunStatus 运行状态

const (
	RunStatusPending   RunStatus = "pending"
	RunStatusRunning   RunStatus = "running"
	RunStatusCompleted RunStatus = "completed"
	RunStatusFailed    RunStatus = "failed"
	RunStatusCancelled RunStatus = "cancelled"
)

type SecurityConfig

type SecurityConfig struct {
	EnableAuth     bool     `json:"enable_auth"`
	AllowedRoles   []string `json:"allowed_roles"`
	DataEncryption bool     `json:"data_encryption"`
	AuditLogging   bool     `json:"audit_logging"`
	SandboxMode    bool     `json:"sandbox_mode"`
}

SecurityConfig 安全配置

type SecurityManager

type SecurityManager struct {
	EnableAuth   bool     `json:"enable_auth"`
	AllowedRoles []string `json:"allowed_roles"`
	SandboxMode  bool     `json:"sandbox_mode"`
	EnableAudit  bool     `json:"enable_audit"`
}

SecurityManager 安全管理器

type SessionManager

type SessionManager interface {
	CreateSession(ctx context.Context, workflowID string) (*session.Session, error)
	GetSession(sessionID string) (*session.Session, error)
	CloseSession(sessionID string) error
}

SessionManager 会话管理器接口

type Step

type Step interface {
	ID() string
	Name() string
	Type() StepType
	Description() string
	Execute(ctx context.Context, input *StepInput) *stream.Reader[*StepOutput]
	Config() *StepConfig
}

Step 步骤接口

type StepConfig

type StepConfig struct {
	ID                    string
	Name                  string
	Description           string
	Type                  StepType
	MaxRetries            int
	Timeout               time.Duration
	SkipOnError           bool
	StrictInputValidation bool
	Metadata              map[string]interface{}
}

StepConfig 步骤配置

type StepInput

type StepInput struct {
	Input               interface{}
	PreviousStepContent interface{}
	PreviousStepOutputs map[string]*StepOutput
	AdditionalData      map[string]interface{}
	SessionState        map[string]interface{}
	Images              []interface{}
	Videos              []interface{}
	Audio               []interface{}
	Files               []interface{}
	WorkflowSession     *WorkflowSession
}

StepInput 步骤输入

func (*StepInput) GetInputAsString

func (si *StepInput) GetInputAsString() string

func (*StepInput) GetStepContent

func (si *StepInput) GetStepContent(stepName string) interface{}

func (*StepInput) GetStepOutput

func (si *StepInput) GetStepOutput(stepName string) *StepOutput

type StepMetrics

type StepMetrics struct {
	ExecutionTime float64
	InputTokens   int
	OutputTokens  int
	TotalTokens   int
	RetryCount    int
	Custom        map[string]interface{}
}

StepMetrics 步骤指标

type StepOutput

type StepOutput struct {
	StepID      string
	StepName    string
	StepType    StepType
	Content     interface{}
	Error       error
	Metadata    map[string]interface{}
	Metrics     *StepMetrics
	NestedSteps []*StepOutput
	StartTime   time.Time
	EndTime     time.Time
	Duration    float64
}

StepOutput 步骤输出

type StepType

type StepType string

StepType 步骤类型

const (
	StepTypeAgent     StepType = "agent"
	StepTypeRoom      StepType = "room"
	StepTypeFunction  StepType = "function"
	StepTypeCondition StepType = "condition"
	StepTypeLoop      StepType = "loop"
	StepTypeParallel  StepType = "parallel"
	StepTypeRouter    StepType = "router"
	StepTypeSteps     StepType = "steps"
)

type StepsGroup

type StepsGroup struct {
	// contains filtered or unexported fields
}

func NewStepsGroup

func NewStepsGroup(name string, steps ...Step) *StepsGroup

func (*StepsGroup) Config

func (s *StepsGroup) Config() *StepConfig

func (*StepsGroup) Description

func (s *StepsGroup) Description() string

func (*StepsGroup) Execute

func (s *StepsGroup) Execute(ctx context.Context, input *StepInput) *stream.Reader[*StepOutput]

func (*StepsGroup) ID

func (s *StepsGroup) ID() string

func (*StepsGroup) Name

func (s *StepsGroup) Name() string

func (*StepsGroup) Type

func (s *StepsGroup) Type() StepType

type SwitchAgent

type SwitchAgent struct {
	// contains filtered or unexported fields
}

SwitchAgent Switch分支Agent(类似编程语言的switch语句)

func NewSwitchAgent

func NewSwitchAgent(config SwitchConfig) (*SwitchAgent, error)

NewSwitchAgent 创建Switch Agent

func (*SwitchAgent) Execute

func (s *SwitchAgent) Execute(ctx context.Context, message string) *stream.Reader[*session.Event]

Execute 执行Switch分支

func (*SwitchAgent) Name

func (s *SwitchAgent) Name() string

Name 返回Agent名称

type SwitchCase

type SwitchCase struct {
	Value       string    `json:"value"`       // 匹配值
	Agent       *AgentRef `json:"agent"`       // 分支Agent
	Name        string    `json:"name"`        // 分支名称
	Fallthrough bool      `json:"fallthrough"` // 是否继续匹配下一个case
}

SwitchCase Switch分支

type SwitchConfig

type SwitchConfig struct {
	Name     string       `json:"name"`
	Variable string       `json:"variable"` // switch变量名
	Cases    []SwitchCase `json:"cases"`
	Default  *AgentRef    `json:"default,omitempty"`
}

SwitchConfig SwitchAgent配置

type VariableDef

type VariableDef struct {
	Name        string      `json:"name"`
	Type        string      `json:"type"` // string, number, boolean, object, array
	Description string      `json:"description"`
	Required    bool        `json:"required"`
	Default     interface{} `json:"default,omitempty"`
	Validation  string      `json:"validation,omitempty"` // JSON Schema or validation rules
}

VariableDef 变量定义

type Workflow

type Workflow struct {
	// 标识
	ID          string
	Name        string
	Description string

	// 步骤
	Steps []Step

	// 数据库
	DB store.Store

	// 会话
	SessionID    string
	UserID       string
	SessionState map[string]interface{}
	CacheSession bool

	// 配置
	MaxRetries int
	Timeout    time.Duration
	RetryDelay time.Duration

	// 流式
	Stream               bool
	StreamEvents         bool
	StreamExecutorEvents bool

	// 调试
	DebugMode bool

	// 存储
	StoreEvents          bool
	StoreExecutorOutputs bool
	SkipEvents           []WorkflowEventType

	// 输入验证
	InputSchema interface{} // Type for input validation

	// 元数据
	Metadata map[string]interface{}

	// 历史
	AddWorkflowHistory bool
	NumHistoryRuns     int
	// contains filtered or unexported fields
}

Workflow 统一的 Workflow 执行系统

func New

func New(name string) *Workflow

New 创建新的 Workflow

func (*Workflow) AddStep

func (w *Workflow) AddStep(step Step) *Workflow

AddStep 添加步骤

func (*Workflow) AgenticExecute

func (wf *Workflow) AgenticExecute(ctx context.Context, agent *WorkflowAgent, input string) (string, error)

AgenticExecute Agentic 方式执行 - Agent 决定何时运行 workflow

func (*Workflow) AgenticExecuteStream

func (wf *Workflow) AgenticExecuteStream(ctx context.Context, agent *WorkflowAgent, input string) <-chan AgentStreamEvent

AgenticExecuteStream Agentic 方式流式执行

func (*Workflow) CreateSession

func (w *Workflow) CreateSession(sessionID, userID string) *WorkflowSession

CreateSession 创建会话

func (*Workflow) Execute

func (w *Workflow) Execute(ctx context.Context, input *WorkflowInput) *stream.Reader[*RunEvent]

Execute 执行 Workflow

func (*Workflow) GetLastRun

func (w *Workflow) GetLastRun() (*WorkflowRun, error)

GetLastRun 获取最后一次运行

func (*Workflow) GetOrCreateSession

func (w *Workflow) GetOrCreateSession(sessionID, userID string) *WorkflowSession

GetOrCreateSession 获取或创建会话

func (*Workflow) GetRun

func (w *Workflow) GetRun(runID string) (*WorkflowRun, error)

GetRun 获取运行记录

func (*Workflow) GetSession

func (w *Workflow) GetSession(sessionID string) (*WorkflowSession, error)

GetSession 获取会话

func (*Workflow) GetWorkflowData

func (w *Workflow) GetWorkflowData() map[string]interface{}

GetWorkflowData 获取 Workflow 数据

func (*Workflow) InitializeSession

func (w *Workflow) InitializeSession(sessionID, userID string) (string, string)

InitializeSession 初始化会话

func (*Workflow) SaveRun

func (w *Workflow) SaveRun(run *WorkflowRun) error

SaveRun 保存运行记录

func (*Workflow) SaveSession

func (w *Workflow) SaveSession(session *WorkflowSession) error

SaveSession 保存会话

func (*Workflow) Validate

func (w *Workflow) Validate() error

Validate 验证配置

func (*Workflow) ValidateInput

func (w *Workflow) ValidateInput(input interface{}) error

ValidateInput 验证输入

func (*Workflow) WithAgent

func (wf *Workflow) WithAgent(agent *WorkflowAgent) *Workflow

WithAgent 为 Workflow 设置 Agent(Agentic Workflow)

func (*Workflow) WithDB

func (w *Workflow) WithDB(db store.Store) *Workflow

WithDB 设置数据库

func (*Workflow) WithDebug

func (w *Workflow) WithDebug() *Workflow

WithDebug 启用调试

func (*Workflow) WithHistory

func (w *Workflow) WithHistory(numRuns int) *Workflow

WithHistory 启用历史记录

func (*Workflow) WithMetadata

func (w *Workflow) WithMetadata(key string, value interface{}) *Workflow

WithMetadata 添加元数据

func (*Workflow) WithSession

func (w *Workflow) WithSession(sessionID string) *Workflow

WithSession 设置会话

func (*Workflow) WithStream

func (w *Workflow) WithStream() *Workflow

WithStream 启用流式

func (*Workflow) WithTimeout

func (w *Workflow) WithTimeout(timeout time.Duration) *Workflow

WithTimeout 设置超时

type WorkflowAgent

type WorkflowAgent struct {
	ID                 string
	Name               string
	Instructions       string
	Model              string
	AddWorkflowHistory bool
	NumHistoryRuns     int
	// contains filtered or unexported fields
}

WorkflowAgent 专门用于 Workflow 编排的受限 Agent A restricted Agent class specifically designed for workflow orchestration.

func NewWorkflowAgent

func NewWorkflowAgent(model, instructions string, addHistory bool, numRuns int) *WorkflowAgent

NewWorkflowAgent 创建新的 WorkflowAgent

func (*WorkflowAgent) AttachWorkflow

func (wa *WorkflowAgent) AttachWorkflow(wf *Workflow) *WorkflowAgent

AttachWorkflow 将 workflow 附加到 agent

func (*WorkflowAgent) CreateWorkflowTool

func (wa *WorkflowAgent) CreateWorkflowTool(
	session *WorkflowSession,
	executionInput *WorkflowInput,
	stream bool,
) WorkflowToolFunc

CreateWorkflowTool 创建 workflow 执行工具

func (*WorkflowAgent) EnableHistory

func (wa *WorkflowAgent) EnableHistory(enable bool) *WorkflowAgent

EnableHistory 启用/禁用历史记录

func (*WorkflowAgent) GetWorkflow

func (wa *WorkflowAgent) GetWorkflow() *Workflow

GetWorkflow 获取关联的 workflow

func (*WorkflowAgent) GetWorkflowHistory

func (wa *WorkflowAgent) GetWorkflowHistory() []WorkflowHistoryItem

GetWorkflowHistory 获取 workflow 历史

func (*WorkflowAgent) Run

func (wa *WorkflowAgent) Run(ctx context.Context, input string) (string, error)

Run 运行 WorkflowAgent

func (*WorkflowAgent) RunStream

func (wa *WorkflowAgent) RunStream(ctx context.Context, input string) <-chan AgentStreamEvent

RunStream 流式运行 WorkflowAgent

func (*WorkflowAgent) WithHistorySize

func (wa *WorkflowAgent) WithHistorySize(num int) *WorkflowAgent

WithHistorySize 设置历史记录数量

func (*WorkflowAgent) WithInstructions

func (wa *WorkflowAgent) WithInstructions(instructions string) *WorkflowAgent

WithInstructions 设置指令

func (*WorkflowAgent) WithModel

func (wa *WorkflowAgent) WithModel(model string) *WorkflowAgent

WithModel 设置模型

type WorkflowConfig

type WorkflowConfig struct {
	// 超时配置
	DefaultTimeout time.Duration `json:"default_timeout"`

	// 重试配置
	DefaultRetry *RetryDef `json:"default_retry,omitempty"`

	// 并发配置
	MaxConcurrency int `json:"max_concurrency"`

	// 监控配置
	Monitoring *MonitoringConfig `json:"monitoring,omitempty"`

	// 日志配置
	Logging *LoggingConfig `json:"logging,omitempty"`

	// 安全配置
	Security *SecurityConfig `json:"security,omitempty"`
}

WorkflowConfig 工作流配置

type WorkflowContext

type WorkflowContext struct {
	// 基本信息
	WorkflowID  string         `json:"workflow_id"`
	ExecutionID string         `json:"execution_id"`
	StartTime   time.Time      `json:"start_time"`
	Status      WorkflowStatus `json:"status"`

	// 变量存储
	Variables map[string]interface{} `json:"variables"`
	Inputs    map[string]interface{} `json:"inputs"`
	Outputs   map[string]interface{} `json:"outputs"`

	// 执行状态
	CurrentNode string           `json:"current_node"`
	Completed   map[string]bool  `json:"completed"`
	Failed      map[string]error `json:"failed"`

	// 元数据
	Metadata map[string]interface{} `json:"metadata"`

	// 上下文
	Context context.Context  `json:"-"`
	Session *session.Session `json:"-"`
}

WorkflowContext 工作流执行上下文

type WorkflowDefinition

type WorkflowDefinition struct {
	// 基本信息
	ID          string   `json:"id"`
	Name        string   `json:"name"`
	Description string   `json:"description"`
	Version     string   `json:"version"`
	Tags        []string `json:"tags"`

	// 输入输出定义
	Inputs  []VariableDef `json:"inputs"`
	Outputs []VariableDef `json:"outputs"`

	// 工作流图
	Nodes []NodeDef `json:"nodes"`
	Edges []EdgeDef `json:"edges"`

	// 执行配置
	Config *WorkflowConfig `json:"config,omitempty"`

	// 元数据
	Metadata map[string]string `json:"metadata,omitempty"`
}

WorkflowDefinition 工作流定义

func ParseFromJSON

func ParseFromJSON(data []byte) (*WorkflowDefinition, error)

ParseFromJSON 从JSON解析工作流定义

func ParseFromYAML

func ParseFromYAML(data []byte) (*WorkflowDefinition, error)

ParseFromYAML 从YAML解析工作流定义

func (*WorkflowDefinition) ToJSON

func (w *WorkflowDefinition) ToJSON() ([]byte, error)

ToJSON 转换为JSON

func (*WorkflowDefinition) ToYAML

func (w *WorkflowDefinition) ToYAML() ([]byte, error)

ToYAML 转换为YAML

type WorkflowError

type WorkflowError struct {
	NodeID    string    `json:"node_id"`
	NodeName  string    `json:"node_name"`
	Error     string    `json:"error"`
	Timestamp time.Time `json:"timestamp"`
	Retryable bool      `json:"retryable"`
}

WorkflowError 工作流错误

type WorkflowEvent

type WorkflowEvent struct {
	Type        string                 `json:"type"`
	ExecutionID string                 `json:"execution_id"`
	NodeID      string                 `json:"node_id"`
	Timestamp   time.Time              `json:"timestamp"`
	Data        map[string]interface{} `json:"data"`
}

WorkflowEvent 工作流事件

type WorkflowEventType

type WorkflowEventType string

WorkflowEventType 事件类型

const (
	EventWorkflowStarted   WorkflowEventType = "workflow_started"
	EventStepStarted       WorkflowEventType = "step_started"
	EventStepProgress      WorkflowEventType = "step_progress"
	EventStepCompleted     WorkflowEventType = "step_completed"
	EventStepFailed        WorkflowEventType = "step_failed"
	EventStepSkipped       WorkflowEventType = "step_skipped"
	EventWorkflowCompleted WorkflowEventType = "workflow_completed"
	EventWorkflowFailed    WorkflowEventType = "workflow_failed"
	EventWorkflowCancelled WorkflowEventType = "workflow_cancelled"
)

type WorkflowExecution

type WorkflowExecution struct {
	// 基本信息
	ID         string              `json:"id"`
	WorkflowID string              `json:"workflow_id"`
	Definition *WorkflowDefinition `json:"definition"`
	Status     WorkflowStatus      `json:"status"`
	Context    *WorkflowContext    `json:"context"`

	// 执行状态
	CurrentNodes   []string               `json:"current_nodes"`
	CompletedNodes map[string]bool        `json:"completed_nodes"`
	FailedNodes    map[string]error       `json:"failed_nodes"`
	NodeResults    map[string]*NodeResult `json:"node_results"`

	// 时间信息
	StartTime    time.Time `json:"start_time"`
	EndTime      time.Time `json:"end_time"`
	LastActivity time.Time `json:"last_activity"`

	// 错误处理
	Errors   []WorkflowError `json:"errors"`
	Warnings []string        `json:"warnings"`
	// contains filtered or unexported fields
}

WorkflowExecution 工作流执行实例

type WorkflowExecutorVisualizer

type WorkflowExecutorVisualizer struct {
	// contains filtered or unexported fields
}

WorkflowExecutorVisualizer 工作流执行可视化器

func NewWorkflowExecutorVisualizer

func NewWorkflowExecutorVisualizer(execution *WorkflowContext) *WorkflowExecutorVisualizer

NewWorkflowExecutorVisualizer 创建执行可视化器

func (*WorkflowExecutorVisualizer) GenerateExecutionState

func (v *WorkflowExecutorVisualizer) GenerateExecutionState() string

GenerateExecutionState 生成执行状态的可视化

func (*WorkflowExecutorVisualizer) GenerateProgressDiagram

func (v *WorkflowExecutorVisualizer) GenerateProgressDiagram() string

GenerateProgressDiagram 生成进度图

type WorkflowHistoryItem

type WorkflowHistoryItem struct {
	RunID     string
	Input     interface{}
	Output    interface{}
	Status    string
	StartTime time.Time
	EndTime   time.Time
	Duration  float64
	Metrics   *RunMetrics
}

WorkflowHistoryItem Workflow 历史项

type WorkflowInput

type WorkflowInput struct {
	Input          interface{}
	AdditionalData map[string]interface{}
	Images         []interface{}
	Videos         []interface{}
	Audio          []interface{}
	Files          []interface{}
	SessionID      string
	UserID         string
	SessionState   map[string]interface{}
}

WorkflowInput Workflow 输入

type WorkflowMetrics

type WorkflowMetrics struct {
	TotalNodes      int           `json:"total_nodes"`
	CompletedNodes  int           `json:"completed_nodes"`
	FailedNodes     int           `json:"failed_nodes"`
	SkippedNodes    int           `json:"skipped_nodes"`
	TotalDuration   time.Duration `json:"total_duration"`
	AverageNodeTime time.Duration `json:"average_node_time"`
	MaxNodeTime     time.Duration `json:"max_node_time"`
	MinNodeTime     time.Duration `json:"min_node_time"`
}

WorkflowMetrics 工作流指标

type WorkflowOutput

type WorkflowOutput struct {
	RunID        string
	WorkflowID   string
	WorkflowName string
	Content      interface{}
	Error        error
	StepOutputs  map[string]*StepOutput
	SessionID    string
	SessionState map[string]interface{}
	Metrics      *RunMetrics
	Status       RunStatus
	StartTime    time.Time
	EndTime      time.Time
	Duration     float64
}

WorkflowOutput Workflow 输出

type WorkflowResult

type WorkflowResult struct {
	ExecutionID string                 `json:"execution_id"`
	WorkflowID  string                 `json:"workflow_id"`
	Status      WorkflowStatus         `json:"status"`
	StartTime   time.Time              `json:"start_time"`
	EndTime     time.Time              `json:"end_time"`
	Duration    time.Duration          `json:"duration"`
	Outputs     map[string]interface{} `json:"outputs"`
	Errors      []WorkflowError        `json:"errors,omitempty"`
	Metrics     *WorkflowMetrics       `json:"metrics,omitempty"`
	Trace       []WorkflowStep         `json:"trace,omitempty"`
}

WorkflowResult 工作流执行结果

type WorkflowRun

type WorkflowRun struct {
	RunID       string
	SessionID   string
	WorkflowID  string
	Input       interface{}
	Output      interface{}
	StepOutputs map[string]*StepOutput
	Status      RunStatus
	Error       string
	Metrics     *RunMetrics
	StartTime   time.Time
	EndTime     time.Time
	Duration    float64
}

WorkflowRun Workflow 运行记录

type WorkflowSession

type WorkflowSession struct {
	ID         string
	WorkflowID string
	State      map[string]interface{}
	History    []*WorkflowRun
	CreatedAt  time.Time
	UpdatedAt  time.Time
}

WorkflowSession Workflow 会话

type WorkflowStatus

type WorkflowStatus string

WorkflowStatus 工作流状态

const (
	StatusPending   WorkflowStatus = "pending"   // 等待执行
	StatusRunning   WorkflowStatus = "running"   // 执行中
	StatusPaused    WorkflowStatus = "paused"    // 暂停
	StatusCompleted WorkflowStatus = "completed" // 完成
	StatusFailed    WorkflowStatus = "failed"    // 失败
	StatusCancelled WorkflowStatus = "cancelled" // 取消
	StatusTimeout   WorkflowStatus = "timeout"   // 超时
)

type WorkflowStep

type WorkflowStep struct {
	NodeID     string                 `json:"node_id"`
	NodeName   string                 `json:"node_name"`
	NodeType   NodeType               `json:"node_type"`
	Status     WorkflowStatus         `json:"status"`
	StartTime  time.Time              `json:"start_time"`
	EndTime    time.Time              `json:"end_time"`
	Duration   time.Duration          `json:"duration"`
	Inputs     map[string]interface{} `json:"inputs"`
	Outputs    map[string]interface{} `json:"outputs"`
	Error      string                 `json:"error,omitempty"`
	RetryCount int                    `json:"retry_count"`
	Metadata   map[string]interface{} `json:"metadata"`
}

WorkflowStep 工作流步骤

type WorkflowToolFunc

type WorkflowToolFunc func(ctx context.Context, query string) (interface{}, error)

WorkflowToolFunc Workflow 工具函数类型

type WorkflowVisualizer

type WorkflowVisualizer struct {
	// contains filtered or unexported fields
}

WorkflowVisualizer 工作流可视化器

func NewWorkflowVisualizer

func NewWorkflowVisualizer(workflow *WorkflowDefinition) *WorkflowVisualizer

NewWorkflowVisualizer 创建工作流可视化器

func (*WorkflowVisualizer) GenerateASCII

func (v *WorkflowVisualizer) GenerateASCII() string

GenerateASCII 生成ASCII艺术格式的工作流图

func (*WorkflowVisualizer) GenerateDOT

func (v *WorkflowVisualizer) GenerateDOT() string

GenerateDOT 生成DOT格式的可视化图形

func (*WorkflowVisualizer) GenerateMermaid

func (v *WorkflowVisualizer) GenerateMermaid() string

GenerateMermaid 生成Mermaid格式的流程图

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL