dag

package
v0.0.0-...-c693505 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CodecBlockExtractor

type CodecBlockExtractor struct {
	// contains filtered or unexported fields
}

CodecBlockExtractor 代码块提取器 只识别 ```flowx-yaml 代码块,支持行尾注释提取 description

func NewCodecBlockExtractor

func NewCodecBlockExtractor(maxSize int) *CodecBlockExtractor

NewCodecBlockExtractor 创建代码块提取器

func (*CodecBlockExtractor) Extract

func (e *CodecBlockExtractor) Extract(output string) (map[string]core.FieldItem, error)

Extract 从输出中提取代码块 解析 ```flowx-yaml 代码块,提取行尾注释作为 description

type DGAEdge

type DGAEdge struct {
	// contains filtered or unexported fields
}

DGAEdge 是Edge接口的实现

func (*DGAEdge) Evaluate

func (e *DGAEdge) Evaluate(ctx EvaluationContext) (bool, error)

Evaluate 评估条件表达式 如果表达式为空,返回true(无条件边总是可以通过) 如果有表达式但没有设置模板引擎,使用默认的Pongo2模板引擎

func (*DGAEdge) Expression

func (e *DGAEdge) Expression() string

Expression 返回边的条件表达式

func (*DGAEdge) ID

func (e *DGAEdge) ID() string

ID 返回边的唯一标识符

func (*DGAEdge) SetEngine

func (e *DGAEdge) SetEngine(engine template.TemplateEngine)

SetEngine 设置模板引擎(用于延迟初始化)

func (*DGAEdge) Source

func (e *DGAEdge) Source() Node

Source 返回边的源节点

func (*DGAEdge) Target

func (e *DGAEdge) Target() Node

Target 返回边的目标节点

type DGAEvaluationContext

type DGAEvaluationContext struct {
	// contains filtered or unexported fields
}

DGAEvaluationContext 是EvaluationContext接口的实现

func (*DGAEvaluationContext) All

func (c *DGAEvaluationContext) All() map[string]any

All 返回上下文中所有数据的副本 合并了:基础数据、节点数据、流水线数据 将 "NodeID.key" 格式的扁平键转换为嵌套结构

func (*DGAEvaluationContext) Get

func (c *DGAEvaluationContext) Get(key string) (any, bool)

Get 从上下文中获取值

func (*DGAEvaluationContext) Iteration

func (c *DGAEvaluationContext) Iteration() int

Iteration 返回当前迭代计数器值

func (*DGAEvaluationContext) WithIteration

func (c *DGAEvaluationContext) WithIteration(iteration int) EvaluationContext

WithIteration 设置迭代计数器并返回新的上下文(链式调用)

func (*DGAEvaluationContext) WithNode

func (c *DGAEvaluationContext) WithNode(node Node) EvaluationContext

WithNode 设置当前节点并返回新的上下文(链式调用)

func (*DGAEvaluationContext) WithParams

func (c *DGAEvaluationContext) WithParams(params map[string]any) EvaluationContext

WithParams 添加参数到上下文并返回新的上下文(链式调用)

func (*DGAEvaluationContext) WithPipeline

func (c *DGAEvaluationContext) WithPipeline(pipeline Pipeline) EvaluationContext

WithPipeline 设置流水线并返回新的上下文(链式调用)

type DGAGraph

type DGAGraph struct {
	// contains filtered or unexported fields
}

保存了流水线的图结构

func NewDGAGraph

func NewDGAGraph() *DGAGraph

func (*DGAGraph) AddEdge

func (dga *DGAGraph) AddEdge(edge Edge) error

AddEdge 向图中添加边

func (*DGAGraph) AddEntryNode

func (dga *DGAGraph) AddEntryNode(nodeID string)

addEntryNode 添加入口节点(从 [*] 指向的节点)

func (*DGAGraph) AddExitNode

func (dga *DGAGraph) AddExitNode(nodeID string)

addExitNode 添加出口节点(指向 [*] 的节点)

func (*DGAGraph) AddVertex

func (dga *DGAGraph) AddVertex(node Node)

AddVertex 向图中添加顶点(节点) 检查是否存在循环;如果存在循环,则返回 core.ErrHasCycle 否则返回 nil

func (*DGAGraph) BackEdges

func (dga *DGAGraph) BackEdges() []Edge

BackEdges 返回所有被接受的回边

func (*DGAGraph) Edges

func (dga *DGAGraph) Edges() []Edge

Edges 返回所有的边

func (*DGAGraph) EntryNodes

func (dga *DGAGraph) EntryNodes() []string

EntryNodes 返回入口节点列表

func (*DGAGraph) ExitNodes

func (dga *DGAGraph) ExitNodes() []string

ExitNodes 返回出口节点列表

func (*DGAGraph) GetEdge

func (dga *DGAGraph) GetEdge(srcID, destID string) (Edge, bool)

GetEdge 根据源节点和目标节点ID查找边

func (*DGAGraph) GetNode

func (dga *DGAGraph) GetNode(nodeID string) (Node, bool)

GetNode 根据节点ID查找节点

func (*DGAGraph) HasCycle

func (dga *DGAGraph) HasCycle() bool

HasCycle 检查图中是否存在循环

func (*DGAGraph) IncomingEdges

func (dga *DGAGraph) IncomingEdges(nodeID string) []Edge

IncomingEdges 返回指向指定节点的所有边

func (*DGAGraph) IsCyclic

func (dga *DGAGraph) IsCyclic() bool

IsCyclic 返回图是否有被接受的回边(条件循环节点)

func (*DGAGraph) LoopNodeSet

func (dga *DGAGraph) LoopNodeSet(backEdge Edge) map[string]bool

LoopNodeSet 计算回边涉及的循环节点集合 从回边的 target 出发,沿 forward edges BFS 到达 source 为止的所有节点

func (*DGAGraph) Nodes

func (dga *DGAGraph) Nodes() map[string]Node

Nodes 返回所有的节点map

func (*DGAGraph) OutgoingEdges

func (dga *DGAGraph) OutgoingEdges(nodeID string) []Edge

OutgoingEdges 返回从指定节点出发的所有边

func (*DGAGraph) RemoveEdge

func (dga *DGAGraph) RemoveEdge(srcID, destID string) error

RemoveEdge 删除指定的边

func (*DGAGraph) RemoveVertex

func (dga *DGAGraph) RemoveVertex(nodeID string) error

RemoveVertex 删除节点及其所有关联边

func (*DGAGraph) Traversal

func (dga *DGAGraph) Traversal(ctx context.Context, evalCtx EvaluationContext, fn TraversalFn) error

Traversal 对图执行广度优先遍历 为图中的每个节点执行提供的 TraversalFn 函数 支持有环图和无环图:使用 forwardGraph(排除回边)计算层级 同一层级内的节点并发执行,层级之间串行执行

func (*DGAGraph) TraversalSteps

func (dga *DGAGraph) TraversalSteps(evalCtx EvaluationContext) [][]string

TraversalSteps 计算 BFS 层级执行计划 返回 [][]string,每个子切片是一层可并发执行的节点ID列表

type DGANode

type DGANode struct {
	// contains filtered or unexported fields
}

func NewDGANode

func NewDGANode(id, state string) *DGANode

NewDGANode creates a new DGANode with the specified id and state, initializing an empty property map.

func NewDGANodeWithConfig

func NewDGANodeWithConfig(id, state, executor, image string, steps []core.Step, config map[string]any) *DGANode

NewDGANodeWithConfig creates a new DGANode with full configuration.

func (*DGANode) EnsureIds

func (dgaNode *DGANode) EnsureIds()

EnsureIds 确保节点和步骤都有ID

func (*DGANode) Get

func (dgaNode *DGANode) Get(key string) string

func (*DGANode) GetConfig

func (dgaNode *DGANode) GetConfig() map[string]any

func (*DGANode) GetExecutor

func (dgaNode *DGANode) GetExecutor() string

func (*DGANode) GetRuntimeStatus

func (dgaNode *DGANode) GetRuntimeStatus() *core.NodeRuntimeStatus

GetRuntimeStatus 获取运行时状态

func (*DGANode) GetStepRuntimeStatus

func (dgaNode *DGANode) GetStepRuntimeStatus(stepName string) *core.StepRuntimeStatus

GetStepRuntimeStatus 获取指定步骤的运行时状态

func (*DGANode) GetSteps

func (dgaNode *DGANode) GetSteps() []core.Step

func (*DGANode) Id

func (dgaNode *DGANode) Id() string

func (*DGANode) PipelineId

func (dgaNode *DGANode) PipelineId() string

func (*DGANode) Set

func (dgaNode *DGANode) Set(key string, value any)

func (*DGANode) SetRuntimeStatus

func (dgaNode *DGANode) SetRuntimeStatus(status *core.NodeRuntimeStatus)

SetRuntimeStatus 设置运行时状态

func (*DGANode) SetStepRuntimeStatus

func (dgaNode *DGANode) SetStepRuntimeStatus(stepStatus *core.StepRuntimeStatus)

SetStepRuntimeStatus 设置步骤运行时状态

func (*DGANode) Status

func (dgaNode *DGANode) Status() string

type Edge

type Edge interface {
	// Source 返回边的源节点
	Source() Node
	// Target 返回边的目标节点
	Target() Node
	// Expression 返回边的条件表达式(pongo2模板语法)
	// 如果返回空字符串,表示无条件边,总是可以遍历
	Expression() string
	// Evaluate 评估条件表达式,返回bool表示是否通过
	Evaluate(ctx EvaluationContext) (bool, error)
	// ID 返回边的唯一标识符(格式:source->target)
	ID() string
}

Edge 表示DAG中的边,支持条件表达式

func NewConditionalEdge

func NewConditionalEdge(source, target Node, expression string) Edge

NewConditionalEdge 创建一条条件边

func NewDGAEdge

func NewDGAEdge(source, target Node) Edge

NewDGAEdge 创建一条无条件边

type EdgeModification

type EdgeModification struct {
	Source     string `yaml:"source"`
	Target     string `yaml:"target"`
	Expression string `yaml:"expression,omitempty"`
}

EdgeModification 表示一条要添加的边

type EdgeRemoval

type EdgeRemoval struct {
	Source string `yaml:"source"`
	Target string `yaml:"target"`
}

EdgeRemoval 表示一条要删除的边

type EvaluationContext

type EvaluationContext interface {
	Get(key string) (any, bool)
	All() map[string]any
	WithNode(node Node) EvaluationContext
	WithPipeline(pipeline Pipeline) EvaluationContext
	WithParams(params map[string]any) EvaluationContext
	WithIteration(iteration int) EvaluationContext
	Iteration() int
}

EvaluationContext 表达式求值上下文

func NewEvaluationContext

func NewEvaluationContext() EvaluationContext

NewEvaluationContext 创建一个新的求值上下文

type Event

type Event string

流水线事件

var (
	//监听事件
	PipelineInit                Event = core.EventPipelineInit                // 流水线初始化
	PipelineStart               Event = core.EventPipelineStart               // 流水线开始执行
	PipelineFinish              Event = core.EventPipelineFinish              // 流水线完成
	PipelineExecutorPrepare     Event = core.EventPipelineExecutorPrepare     // 流水线执行器开始准备
	PipelineExecutorPrepareDone Event = core.EventPipelineExecutorPrepareDone // 流水线执行器准备完毕
	PipelineNodeStart           Event = core.EventPipelineNodeStart           // 节点开始
	PipelineNodeFinish          Event = core.EventPipelineNodeFinish          // 节点完成
	PipelineNodeFailed          Event = core.EventPipelineNodeFailed          // 节点执行失败
	PipelinePaused              Event = core.EventPipelinePaused              // 流水线暂停
	PipelineResumed             Event = core.EventPipelineResumed             // 流水线恢复
	PipelineGraphModified       Event = core.EventPipelineGraphModified       // 图被修改
)

type ExecutorProvider

type ExecutorProvider = executor.ExecutorProvider

ExecutorProvider Executor提供者接口 从executor/core导入

type Graph

type Graph interface {
	GraphReader
	//AddVertex 添加顶点
	AddVertex(node Node)
	//AddEdge 添加边
	AddEdge(edge Edge) error
	//RemoveVertex 删除节点及其所有关联边
	RemoveVertex(nodeID string) error
	//RemoveEdge 删除指定的边
	RemoveEdge(srcID, destID string) error
	//HasCycle 检查图中是否存在环
	HasCycle() bool
}

type GraphModifications

type GraphModifications struct {
	// AddNodes 要添加的节点配置列表
	AddNodes []core.NodeConfig `yaml:"addNodes"`
	// RemoveNodes 要删除的节点 ID 列表(同时删除关联边)
	RemoveNodes []string `yaml:"removeNodes"`
	// AddEdges 要添加的边列表
	AddEdges []EdgeModification `yaml:"addEdges"`
	// RemoveEdges 要删除的边列表
	RemoveEdges []EdgeRemoval `yaml:"removeEdges"`
	// AddGraph 可选的 Mermaid stateDiagram-v2 片段,用于批量添加边
	AddGraph string `yaml:"addGraph"`
}

GraphModifications 表示一组要原子应用的图修改操作

func (GraphModifications) IsEmpty

func (m GraphModifications) IsEmpty() bool

IsEmpty 判断修改集是否为空

type GraphReader

type GraphReader interface {
	//Nodes
	Nodes() map[string]Node
	//Edges 返回所有的边
	Edges() []Edge
	//Traversal 遍历图结构
	Traversal(ctx context.Context, evalCtx EvaluationContext, fn TraversalFn) error
	//GetNode 根据节点ID查找节点
	GetNode(nodeID string) (Node, bool)
	//GetEdge 根据源节点和目标节点ID查找边
	GetEdge(srcID, destID string) (Edge, bool)
	//IncomingEdges 返回指向指定节点的所有边
	IncomingEdges(nodeID string) []Edge
	//OutgoingEdges 返回从指定节点出发的所有边
	OutgoingEdges(nodeID string) []Edge
}

type Listener

type Listener interface {
	// 处理对应的事件将事件发生的对应的流水线和对应的事件作为参数传入
	Handle(p Pipeline, event Event)
	// 获取当前注册的Event
	Events() []Event
}

我们将整个流水线的运行过程中的事件抽象成对应的Event 这样我们就能再外部监听Event

type ListeningFn

type ListeningFn func(p Pipeline)

PipelineListeningFn 流水线监听函数

type Metadata

type Metadata map[string]core.FieldItem

type Node

type Node interface {
	//ID 获取节点唯一id
	Id() string
	//PipelineId 获取节点所属的流水线id
	PipelineId() string
	//Status 获取节点状态
	Status() string
	//Get 获取节点属性数据
	Get(key string) string
	// Set 设置节点属性数据
	Set(key string, value any)
	// GetExecutor 获取节点执行器名称
	GetExecutor() string
	// GetSteps 获取节点执行步骤
	GetSteps() []core.Step
	// GetConfig 获取节点配置
	GetConfig() map[string]any
	// 运行时状态管理
	GetRuntimeStatus() *core.NodeRuntimeStatus
	SetRuntimeStatus(status *core.NodeRuntimeStatus)
	// 步骤状态管理
	GetStepRuntimeStatus(stepName string) *core.StepRuntimeStatus
	SetStepRuntimeStatus(stepStatus *core.StepRuntimeStatus)
	// 初始化ID
	EnsureIds()
}

type OutputExtractor

type OutputExtractor interface {
	// Extract 从命令输出中提取数据
	// output: 命令完整输出
	// 返回: 提取的键值对(值为 FieldItem)
	Extract(output string) (map[string]core.FieldItem, error)
}

OutputExtractor 输出提取器接口

type Pipeline

type Pipeline interface {
	//ID 流水线的id
	Id() string
	//GetGraph 返回图结构
	GetGraph() Graph
	//SetGraph 设置图结构
	SetGraph(graph Graph)
	//Status 返回流水线的整体状态
	Status() string
	//SetMetadata 设置元数据
	SetMetadata(store metadata.MetadataStore)
	//Metadata 获取元数据
	Metadata() Metadata
	//Listening 流水线执行事件监听设置
	Listening(listener Listener)
	//Done流水线是否执行完成
	Done() <-chan struct{}
	//Run执行流水线
	Run(ctx context.Context) error
	//Notify 执行的步骤通知流水线
	Notify()
	//Cancel 取消流水线
	Cancel()
	//SetExecutorProvider 设置Executor提供者
	SetExecutorProvider(provider ExecutorProvider)
	//SetTemplateEngine 设置模板引擎
	SetTemplateEngine(engine template.TemplateEngine)
	//GetTemplateEngine 获取模板引擎
	GetTemplateEngine() template.TemplateEngine
	//SetPusher 设置日志推送器
	SetPusher(pusher logger.Pusher)
	//Pause 暂停流水线,等待当前层执行完成后暂停
	Pause() error
	//Resume 恢复暂停的流水线
	Resume(ctx context.Context) error
	//IsModifiable 判断当前是否可修改图
	IsModifiable() bool
	//CurrentNode 返回当前正在执行的节点(如有)
	CurrentNode() Node
}

func NewPipeline

func NewPipeline(ctx context.Context) Pipeline

type PipelineImpl

type PipelineImpl struct {
	// contains filtered or unexported fields
}

func (*PipelineImpl) Cancel

func (p *PipelineImpl) Cancel()

Cancel 终止流水线

func (*PipelineImpl) CurrentNode

func (p *PipelineImpl) CurrentNode() Node

CurrentNode 返回当前正在执行的节点(如有)

func (*PipelineImpl) Done

func (p *PipelineImpl) Done() <-chan struct{}

Done 返回一个通道,用于通知流水线何时完成

func (*PipelineImpl) DoneChanForTest

func (p *PipelineImpl) DoneChanForTest() chan struct{}

DoneChanForTest returns the doneChan for testing purposes.

func (*PipelineImpl) GetGraph

func (p *PipelineImpl) GetGraph() Graph

GetGraph 返回流水线的图结构

func (*PipelineImpl) GetTemplateEngine

func (p *PipelineImpl) GetTemplateEngine() template.TemplateEngine

GetTemplateEngine 获取模板引擎

func (*PipelineImpl) Id

func (p *PipelineImpl) Id() string

Id 返回流水线的ID

func (*PipelineImpl) IsModifiable

func (p *PipelineImpl) IsModifiable() bool

IsModifiable 判断当前是否可修改图

func (*PipelineImpl) Listening

func (p *PipelineImpl) Listening(fn Listener)

Listening 设置流水线执行事件监听器

func (*PipelineImpl) Metadata

func (p *PipelineImpl) Metadata() Metadata

Metadata 获取流水线的元数据

func (*PipelineImpl) Notify

func (p *PipelineImpl) Notify()

Notify 这个主要是在运行过程中节点状态或者流水线状态变化,就会触发这个函数 节点 我们就可以在这里做一些处理 执行ListeningFn函数

func (*PipelineImpl) NotifyEvent

func (p *PipelineImpl) NotifyEvent(event Event)

NotifyEvent 通知监听器特定事件

func (*PipelineImpl) ParamForTest

func (p *PipelineImpl) ParamForTest() map[string]core.FieldItem

ParamForTest returns the pipeline param for testing purposes.

func (*PipelineImpl) Pause

func (p *PipelineImpl) Pause() error

Pause 暂停流水线,等待当前层执行完成后暂停

func (*PipelineImpl) Resume

func (p *PipelineImpl) Resume(ctx context.Context) error

Resume 恢复暂停的流水线

func (*PipelineImpl) Run

func (p *PipelineImpl) Run(ctx context.Context) error

Run 执行流水线

func (*PipelineImpl) SetExecutorProvider

func (p *PipelineImpl) SetExecutorProvider(provider ExecutorProvider)

SetExecutorProvider 设置Executor提供者

func (*PipelineImpl) SetGraph

func (p *PipelineImpl) SetGraph(graph Graph)

SetGraph 设置流水线的图结构

func (*PipelineImpl) SetMaxLoopIterations

func (p *PipelineImpl) SetMaxLoopIterations(max int)

SetMaxLoopIterations 设置循环图最大迭代次数

func (*PipelineImpl) SetMetadata

func (p *PipelineImpl) SetMetadata(store metadata.MetadataStore)

SetMetadata 设置流水线的元数据存储

func (*PipelineImpl) SetParam

func (p *PipelineImpl) SetParam(param map[string]interface{})

SetParam 设置 param 值

func (*PipelineImpl) SetParamForTest

func (p *PipelineImpl) SetParamForTest(param map[string]interface{})

SetParamForTest sets the pipeline param for testing purposes.

func (*PipelineImpl) SetPusher

func (p *PipelineImpl) SetPusher(pusher logger.Pusher)

SetPusher 设置日志推送器

func (*PipelineImpl) SetStatusForTest

func (p *PipelineImpl) SetStatusForTest(status string)

SetStatusForTest sets the pipeline status for testing purposes. This is intentionally exported to allow integration tests in other packages to set up specific pipeline states without modifying the production API.

func (*PipelineImpl) SetTemplateEngine

func (p *PipelineImpl) SetTemplateEngine(engine template.TemplateEngine)

SetTemplateEngine 设置模板引擎

func (*PipelineImpl) Status

func (p *PipelineImpl) Status() string

Status 返回流水线的整体状态

type PipelineSnapshotter

type PipelineSnapshotter struct{}

PipelineSnapshotter 实现

func NewPipelineSnapshotter

func NewPipelineSnapshotter() *PipelineSnapshotter

func (*PipelineSnapshotter) TakeSnapshot

func (ps *PipelineSnapshotter) TakeSnapshot(pipeline Pipeline, originalConfig *core.PipelineConfig) (*core.PipelineConfig, error)

TakeSnapshot 将当前状态序列化为 PipelineConfig

func (*PipelineSnapshotter) ToYAML

func (ps *PipelineSnapshotter) ToYAML(config *core.PipelineConfig) (string, error)

ToYAML 将配置转换为 YAML

type RegexExtractor

type RegexExtractor struct {
	// contains filtered or unexported fields
}

RegexExtractor 正则表达式提取器

func NewRegexExtractor

func NewRegexExtractor(patterns map[string]string, maxSize int) (*RegexExtractor, error)

NewRegexExtractor 创建正则表达式提取器

func (*RegexExtractor) Extract

func (e *RegexExtractor) Extract(output string) (map[string]core.FieldItem, error)

Extract 使用正则表达式从输出中提取数据 返回值为 FieldItem,Description 和 SrcNode 由调用方设置

type Snapshotter

type Snapshotter interface {
	// TakeSnapshot 从 Pipeline 生成带状态的配置
	TakeSnapshot(pipeline Pipeline, originalConfig *core.PipelineConfig) (*core.PipelineConfig, error)
	// ToYAML 将配置转换为 YAML
	ToYAML(config *core.PipelineConfig) (string, error)
}

Snapshotter 状态快照接口

type TraversalFn

type TraversalFn func(ctx context.Context, node Node) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL