Documentation
¶
Index ¶
- type CodecBlockExtractor
- type DGAEdge
- type DGAEvaluationContext
- func (c *DGAEvaluationContext) All() map[string]any
- func (c *DGAEvaluationContext) Get(key string) (any, bool)
- func (c *DGAEvaluationContext) Iteration() int
- func (c *DGAEvaluationContext) WithIteration(iteration int) EvaluationContext
- func (c *DGAEvaluationContext) WithNode(node Node) EvaluationContext
- func (c *DGAEvaluationContext) WithParams(params map[string]any) EvaluationContext
- func (c *DGAEvaluationContext) WithPipeline(pipeline Pipeline) EvaluationContext
- type DGAGraph
- func (dga *DGAGraph) AddEdge(edge Edge) error
- func (dga *DGAGraph) AddEntryNode(nodeID string)
- func (dga *DGAGraph) AddExitNode(nodeID string)
- func (dga *DGAGraph) AddVertex(node Node)
- func (dga *DGAGraph) BackEdges() []Edge
- func (dga *DGAGraph) Edges() []Edge
- func (dga *DGAGraph) EntryNodes() []string
- func (dga *DGAGraph) ExitNodes() []string
- func (dga *DGAGraph) GetEdge(srcID, destID string) (Edge, bool)
- func (dga *DGAGraph) GetNode(nodeID string) (Node, bool)
- func (dga *DGAGraph) HasCycle() bool
- func (dga *DGAGraph) IncomingEdges(nodeID string) []Edge
- func (dga *DGAGraph) IsCyclic() bool
- func (dga *DGAGraph) LoopNodeSet(backEdge Edge) map[string]bool
- func (dga *DGAGraph) Nodes() map[string]Node
- func (dga *DGAGraph) OutgoingEdges(nodeID string) []Edge
- func (dga *DGAGraph) RemoveEdge(srcID, destID string) error
- func (dga *DGAGraph) RemoveVertex(nodeID string) error
- func (dga *DGAGraph) Traversal(ctx context.Context, evalCtx EvaluationContext, fn TraversalFn) error
- func (dga *DGAGraph) TraversalSteps(evalCtx EvaluationContext) [][]string
- type DGANode
- func (dgaNode *DGANode) EnsureIds()
- func (dgaNode *DGANode) Get(key string) string
- func (dgaNode *DGANode) GetConfig() map[string]any
- func (dgaNode *DGANode) GetExecutor() string
- func (dgaNode *DGANode) GetRuntimeStatus() *core.NodeRuntimeStatus
- func (dgaNode *DGANode) GetStepRuntimeStatus(stepName string) *core.StepRuntimeStatus
- func (dgaNode *DGANode) GetSteps() []core.Step
- func (dgaNode *DGANode) Id() string
- func (dgaNode *DGANode) PipelineId() string
- func (dgaNode *DGANode) Set(key string, value any)
- func (dgaNode *DGANode) SetRuntimeStatus(status *core.NodeRuntimeStatus)
- func (dgaNode *DGANode) SetStepRuntimeStatus(stepStatus *core.StepRuntimeStatus)
- func (dgaNode *DGANode) Status() string
- type Edge
- type EdgeModification
- type EdgeRemoval
- type EvaluationContext
- type Event
- type ExecutorProvider
- type Graph
- type GraphModifications
- type GraphReader
- type Listener
- type ListeningFn
- type Metadata
- type Node
- type OutputExtractor
- type Pipeline
- type PipelineImpl
- func (p *PipelineImpl) Cancel()
- func (p *PipelineImpl) CurrentNode() Node
- func (p *PipelineImpl) Done() <-chan struct{}
- func (p *PipelineImpl) DoneChanForTest() chan struct{}
- func (p *PipelineImpl) GetGraph() Graph
- func (p *PipelineImpl) GetTemplateEngine() template.TemplateEngine
- func (p *PipelineImpl) Id() string
- func (p *PipelineImpl) IsModifiable() bool
- func (p *PipelineImpl) Listening(fn Listener)
- func (p *PipelineImpl) Metadata() Metadata
- func (p *PipelineImpl) Notify()
- func (p *PipelineImpl) NotifyEvent(event Event)
- func (p *PipelineImpl) ParamForTest() map[string]core.FieldItem
- func (p *PipelineImpl) Pause() error
- func (p *PipelineImpl) Resume(ctx context.Context) error
- func (p *PipelineImpl) Run(ctx context.Context) error
- func (p *PipelineImpl) SetExecutorProvider(provider ExecutorProvider)
- func (p *PipelineImpl) SetGraph(graph Graph)
- func (p *PipelineImpl) SetMaxLoopIterations(max int)
- func (p *PipelineImpl) SetMetadata(store metadata.MetadataStore)
- func (p *PipelineImpl) SetParam(param map[string]interface{})
- func (p *PipelineImpl) SetParamForTest(param map[string]interface{})
- func (p *PipelineImpl) SetPusher(pusher logger.Pusher)
- func (p *PipelineImpl) SetStatusForTest(status string)
- func (p *PipelineImpl) SetTemplateEngine(engine template.TemplateEngine)
- func (p *PipelineImpl) Status() string
- type PipelineSnapshotter
- type RegexExtractor
- type Snapshotter
- type TraversalFn
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CodecBlockExtractor ¶
type CodecBlockExtractor struct {
// contains filtered or unexported fields
}
CodecBlockExtractor 代码块提取器 只识别 ```flowx-yaml 代码块,支持行尾注释提取 description
func NewCodecBlockExtractor ¶
func NewCodecBlockExtractor(maxSize int) *CodecBlockExtractor
NewCodecBlockExtractor 创建代码块提取器
type DGAEdge ¶
type DGAEdge struct {
// contains filtered or unexported fields
}
DGAEdge 是Edge接口的实现
func (*DGAEdge) Evaluate ¶
func (e *DGAEdge) Evaluate(ctx EvaluationContext) (bool, error)
Evaluate 评估条件表达式 如果表达式为空,返回true(无条件边总是可以通过) 如果有表达式但没有设置模板引擎,使用默认的Pongo2模板引擎
func (*DGAEdge) SetEngine ¶
func (e *DGAEdge) SetEngine(engine template.TemplateEngine)
SetEngine 设置模板引擎(用于延迟初始化)
type DGAEvaluationContext ¶
type DGAEvaluationContext struct {
// contains filtered or unexported fields
}
DGAEvaluationContext 是EvaluationContext接口的实现
func (*DGAEvaluationContext) All ¶
func (c *DGAEvaluationContext) All() map[string]any
All 返回上下文中所有数据的副本 合并了:基础数据、节点数据、流水线数据 将 "NodeID.key" 格式的扁平键转换为嵌套结构
func (*DGAEvaluationContext) Get ¶
func (c *DGAEvaluationContext) Get(key string) (any, bool)
Get 从上下文中获取值
func (*DGAEvaluationContext) Iteration ¶
func (c *DGAEvaluationContext) Iteration() int
Iteration 返回当前迭代计数器值
func (*DGAEvaluationContext) WithIteration ¶
func (c *DGAEvaluationContext) WithIteration(iteration int) EvaluationContext
WithIteration 设置迭代计数器并返回新的上下文(链式调用)
func (*DGAEvaluationContext) WithNode ¶
func (c *DGAEvaluationContext) WithNode(node Node) EvaluationContext
WithNode 设置当前节点并返回新的上下文(链式调用)
func (*DGAEvaluationContext) WithParams ¶
func (c *DGAEvaluationContext) WithParams(params map[string]any) EvaluationContext
WithParams 添加参数到上下文并返回新的上下文(链式调用)
func (*DGAEvaluationContext) WithPipeline ¶
func (c *DGAEvaluationContext) WithPipeline(pipeline Pipeline) EvaluationContext
WithPipeline 设置流水线并返回新的上下文(链式调用)
type DGAGraph ¶
type DGAGraph struct {
// contains filtered or unexported fields
}
保存了流水线的图结构
func NewDGAGraph ¶
func NewDGAGraph() *DGAGraph
func (*DGAGraph) AddEntryNode ¶
addEntryNode 添加入口节点(从 [*] 指向的节点)
func (*DGAGraph) AddExitNode ¶
addExitNode 添加出口节点(指向 [*] 的节点)
func (*DGAGraph) IncomingEdges ¶
IncomingEdges 返回指向指定节点的所有边
func (*DGAGraph) LoopNodeSet ¶
LoopNodeSet 计算回边涉及的循环节点集合 从回边的 target 出发,沿 forward edges BFS 到达 source 为止的所有节点
func (*DGAGraph) OutgoingEdges ¶
OutgoingEdges 返回从指定节点出发的所有边
func (*DGAGraph) RemoveEdge ¶
RemoveEdge 删除指定的边
func (*DGAGraph) RemoveVertex ¶
RemoveVertex 删除节点及其所有关联边
func (*DGAGraph) Traversal ¶
func (dga *DGAGraph) Traversal(ctx context.Context, evalCtx EvaluationContext, fn TraversalFn) error
Traversal 对图执行广度优先遍历 为图中的每个节点执行提供的 TraversalFn 函数 支持有环图和无环图:使用 forwardGraph(排除回边)计算层级 同一层级内的节点并发执行,层级之间串行执行
func (*DGAGraph) TraversalSteps ¶
func (dga *DGAGraph) TraversalSteps(evalCtx EvaluationContext) [][]string
TraversalSteps 计算 BFS 层级执行计划 返回 [][]string,每个子切片是一层可并发执行的节点ID列表
type DGANode ¶
type DGANode struct {
// contains filtered or unexported fields
}
func NewDGANode ¶
NewDGANode creates a new DGANode with the specified id and state, initializing an empty property map.
func NewDGANodeWithConfig ¶
func NewDGANodeWithConfig(id, state, executor, image string, steps []core.Step, config map[string]any) *DGANode
NewDGANodeWithConfig creates a new DGANode with full configuration.
func (*DGANode) GetExecutor ¶
func (*DGANode) GetRuntimeStatus ¶
func (dgaNode *DGANode) GetRuntimeStatus() *core.NodeRuntimeStatus
GetRuntimeStatus 获取运行时状态
func (*DGANode) GetStepRuntimeStatus ¶
func (dgaNode *DGANode) GetStepRuntimeStatus(stepName string) *core.StepRuntimeStatus
GetStepRuntimeStatus 获取指定步骤的运行时状态
func (*DGANode) PipelineId ¶
func (*DGANode) SetRuntimeStatus ¶
func (dgaNode *DGANode) SetRuntimeStatus(status *core.NodeRuntimeStatus)
SetRuntimeStatus 设置运行时状态
func (*DGANode) SetStepRuntimeStatus ¶
func (dgaNode *DGANode) SetStepRuntimeStatus(stepStatus *core.StepRuntimeStatus)
SetStepRuntimeStatus 设置步骤运行时状态
type Edge ¶
type Edge interface {
// Source 返回边的源节点
Source() Node
// Target 返回边的目标节点
Target() Node
// Expression 返回边的条件表达式(pongo2模板语法)
// 如果返回空字符串,表示无条件边,总是可以遍历
Expression() string
// Evaluate 评估条件表达式,返回bool表示是否通过
Evaluate(ctx EvaluationContext) (bool, error)
// ID 返回边的唯一标识符(格式:source->target)
ID() string
}
Edge 表示DAG中的边,支持条件表达式
func NewConditionalEdge ¶
NewConditionalEdge 创建一条条件边
type EdgeModification ¶
type EdgeModification struct {
Source string `yaml:"source"`
Target string `yaml:"target"`
Expression string `yaml:"expression,omitempty"`
}
EdgeModification 表示一条要添加的边
type EdgeRemoval ¶
EdgeRemoval 表示一条要删除的边
type EvaluationContext ¶
type EvaluationContext interface {
Get(key string) (any, bool)
All() map[string]any
WithNode(node Node) EvaluationContext
WithPipeline(pipeline Pipeline) EvaluationContext
WithParams(params map[string]any) EvaluationContext
WithIteration(iteration int) EvaluationContext
Iteration() int
}
EvaluationContext 表达式求值上下文
func NewEvaluationContext ¶
func NewEvaluationContext() EvaluationContext
NewEvaluationContext 创建一个新的求值上下文
type Event ¶
type Event string
流水线事件
var ( //监听事件 PipelineInit Event = core.EventPipelineInit // 流水线初始化 PipelineStart Event = core.EventPipelineStart // 流水线开始执行 PipelineFinish Event = core.EventPipelineFinish // 流水线完成 PipelineExecutorPrepare Event = core.EventPipelineExecutorPrepare // 流水线执行器开始准备 PipelineExecutorPrepareDone Event = core.EventPipelineExecutorPrepareDone // 流水线执行器准备完毕 PipelineNodeStart Event = core.EventPipelineNodeStart // 节点开始 PipelineNodeFinish Event = core.EventPipelineNodeFinish // 节点完成 PipelineNodeFailed Event = core.EventPipelineNodeFailed // 节点执行失败 PipelinePaused Event = core.EventPipelinePaused // 流水线暂停 PipelineResumed Event = core.EventPipelineResumed // 流水线恢复 PipelineGraphModified Event = core.EventPipelineGraphModified // 图被修改 )
type ExecutorProvider ¶
type ExecutorProvider = executor.ExecutorProvider
ExecutorProvider Executor提供者接口 从executor/core导入
type GraphModifications ¶
type GraphModifications struct {
// AddNodes 要添加的节点配置列表
AddNodes []core.NodeConfig `yaml:"addNodes"`
// RemoveNodes 要删除的节点 ID 列表(同时删除关联边)
RemoveNodes []string `yaml:"removeNodes"`
// AddEdges 要添加的边列表
AddEdges []EdgeModification `yaml:"addEdges"`
// RemoveEdges 要删除的边列表
RemoveEdges []EdgeRemoval `yaml:"removeEdges"`
// AddGraph 可选的 Mermaid stateDiagram-v2 片段,用于批量添加边
AddGraph string `yaml:"addGraph"`
}
GraphModifications 表示一组要原子应用的图修改操作
type GraphReader ¶
type GraphReader interface {
//Nodes
Nodes() map[string]Node
//Edges 返回所有的边
Edges() []Edge
//Traversal 遍历图结构
Traversal(ctx context.Context, evalCtx EvaluationContext, fn TraversalFn) error
//GetNode 根据节点ID查找节点
GetNode(nodeID string) (Node, bool)
//GetEdge 根据源节点和目标节点ID查找边
GetEdge(srcID, destID string) (Edge, bool)
//IncomingEdges 返回指向指定节点的所有边
IncomingEdges(nodeID string) []Edge
//OutgoingEdges 返回从指定节点出发的所有边
OutgoingEdges(nodeID string) []Edge
}
type Listener ¶
type Listener interface {
// 处理对应的事件将事件发生的对应的流水线和对应的事件作为参数传入
Handle(p Pipeline, event Event)
// 获取当前注册的Event
Events() []Event
}
我们将整个流水线的运行过程中的事件抽象成对应的Event 这样我们就能再外部监听Event
type Node ¶
type Node interface {
//ID 获取节点唯一id
Id() string
//PipelineId 获取节点所属的流水线id
PipelineId() string
//Status 获取节点状态
Status() string
//Get 获取节点属性数据
Get(key string) string
// Set 设置节点属性数据
Set(key string, value any)
// GetExecutor 获取节点执行器名称
GetExecutor() string
// GetSteps 获取节点执行步骤
GetSteps() []core.Step
// GetConfig 获取节点配置
GetConfig() map[string]any
// 运行时状态管理
GetRuntimeStatus() *core.NodeRuntimeStatus
SetRuntimeStatus(status *core.NodeRuntimeStatus)
// 步骤状态管理
GetStepRuntimeStatus(stepName string) *core.StepRuntimeStatus
SetStepRuntimeStatus(stepStatus *core.StepRuntimeStatus)
// 初始化ID
EnsureIds()
}
type OutputExtractor ¶
type OutputExtractor interface {
// Extract 从命令输出中提取数据
// output: 命令完整输出
// 返回: 提取的键值对(值为 FieldItem)
Extract(output string) (map[string]core.FieldItem, error)
}
OutputExtractor 输出提取器接口
type Pipeline ¶
type Pipeline interface {
//ID 流水线的id
Id() string
//GetGraph 返回图结构
GetGraph() Graph
//SetGraph 设置图结构
SetGraph(graph Graph)
//Status 返回流水线的整体状态
Status() string
//SetMetadata 设置元数据
SetMetadata(store metadata.MetadataStore)
//Metadata 获取元数据
Metadata() Metadata
//Listening 流水线执行事件监听设置
Listening(listener Listener)
//Done流水线是否执行完成
Done() <-chan struct{}
//Run执行流水线
Run(ctx context.Context) error
//Notify 执行的步骤通知流水线
Notify()
//Cancel 取消流水线
Cancel()
//SetExecutorProvider 设置Executor提供者
SetExecutorProvider(provider ExecutorProvider)
//SetTemplateEngine 设置模板引擎
SetTemplateEngine(engine template.TemplateEngine)
//GetTemplateEngine 获取模板引擎
GetTemplateEngine() template.TemplateEngine
//SetPusher 设置日志推送器
SetPusher(pusher logger.Pusher)
//Pause 暂停流水线,等待当前层执行完成后暂停
Pause() error
//Resume 恢复暂停的流水线
Resume(ctx context.Context) error
//IsModifiable 判断当前是否可修改图
IsModifiable() bool
//CurrentNode 返回当前正在执行的节点(如有)
CurrentNode() Node
}
func NewPipeline ¶
type PipelineImpl ¶
type PipelineImpl struct {
// contains filtered or unexported fields
}
func (*PipelineImpl) CurrentNode ¶
func (p *PipelineImpl) CurrentNode() Node
CurrentNode 返回当前正在执行的节点(如有)
func (*PipelineImpl) DoneChanForTest ¶
func (p *PipelineImpl) DoneChanForTest() chan struct{}
DoneChanForTest returns the doneChan for testing purposes.
func (*PipelineImpl) GetTemplateEngine ¶
func (p *PipelineImpl) GetTemplateEngine() template.TemplateEngine
GetTemplateEngine 获取模板引擎
func (*PipelineImpl) IsModifiable ¶
func (p *PipelineImpl) IsModifiable() bool
IsModifiable 判断当前是否可修改图
func (*PipelineImpl) Listening ¶
func (p *PipelineImpl) Listening(fn Listener)
Listening 设置流水线执行事件监听器
func (*PipelineImpl) Notify ¶
func (p *PipelineImpl) Notify()
Notify 这个主要是在运行过程中节点状态或者流水线状态变化,就会触发这个函数 节点 我们就可以在这里做一些处理 执行ListeningFn函数
func (*PipelineImpl) NotifyEvent ¶
func (p *PipelineImpl) NotifyEvent(event Event)
NotifyEvent 通知监听器特定事件
func (*PipelineImpl) ParamForTest ¶
func (p *PipelineImpl) ParamForTest() map[string]core.FieldItem
ParamForTest returns the pipeline param for testing purposes.
func (*PipelineImpl) Resume ¶
func (p *PipelineImpl) Resume(ctx context.Context) error
Resume 恢复暂停的流水线
func (*PipelineImpl) SetExecutorProvider ¶
func (p *PipelineImpl) SetExecutorProvider(provider ExecutorProvider)
SetExecutorProvider 设置Executor提供者
func (*PipelineImpl) SetMaxLoopIterations ¶
func (p *PipelineImpl) SetMaxLoopIterations(max int)
SetMaxLoopIterations 设置循环图最大迭代次数
func (*PipelineImpl) SetMetadata ¶
func (p *PipelineImpl) SetMetadata(store metadata.MetadataStore)
SetMetadata 设置流水线的元数据存储
func (*PipelineImpl) SetParam ¶
func (p *PipelineImpl) SetParam(param map[string]interface{})
SetParam 设置 param 值
func (*PipelineImpl) SetParamForTest ¶
func (p *PipelineImpl) SetParamForTest(param map[string]interface{})
SetParamForTest sets the pipeline param for testing purposes.
func (*PipelineImpl) SetPusher ¶
func (p *PipelineImpl) SetPusher(pusher logger.Pusher)
SetPusher 设置日志推送器
func (*PipelineImpl) SetStatusForTest ¶
func (p *PipelineImpl) SetStatusForTest(status string)
SetStatusForTest sets the pipeline status for testing purposes. This is intentionally exported to allow integration tests in other packages to set up specific pipeline states without modifying the production API.
func (*PipelineImpl) SetTemplateEngine ¶
func (p *PipelineImpl) SetTemplateEngine(engine template.TemplateEngine)
SetTemplateEngine 设置模板引擎
type PipelineSnapshotter ¶
type PipelineSnapshotter struct{}
PipelineSnapshotter 实现
func NewPipelineSnapshotter ¶
func NewPipelineSnapshotter() *PipelineSnapshotter
func (*PipelineSnapshotter) TakeSnapshot ¶
func (ps *PipelineSnapshotter) TakeSnapshot(pipeline Pipeline, originalConfig *core.PipelineConfig) (*core.PipelineConfig, error)
TakeSnapshot 将当前状态序列化为 PipelineConfig
func (*PipelineSnapshotter) ToYAML ¶
func (ps *PipelineSnapshotter) ToYAML(config *core.PipelineConfig) (string, error)
ToYAML 将配置转换为 YAML
type RegexExtractor ¶
type RegexExtractor struct {
// contains filtered or unexported fields
}
RegexExtractor 正则表达式提取器
func NewRegexExtractor ¶
func NewRegexExtractor(patterns map[string]string, maxSize int) (*RegexExtractor, error)
NewRegexExtractor 创建正则表达式提取器
type Snapshotter ¶
type Snapshotter interface {
// TakeSnapshot 从 Pipeline 生成带状态的配置
TakeSnapshot(pipeline Pipeline, originalConfig *core.PipelineConfig) (*core.PipelineConfig, error)
// ToYAML 将配置转换为 YAML
ToYAML(config *core.PipelineConfig) (string, error)
}
Snapshotter 状态快照接口