flow

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2025 License: Apache-2.0 Imports: 9 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	LIKE      = Operator{"LIKE", "文本包含", String, false, 10}
	IN_LIKE   = Operator{"IN_LIKE", "文本包含", StringList, false, 20}
	IN        = Operator{"IN", "IN", StringList, false, 30}
	NOT_IN    = Operator{"NOT_IN", "非IN", StringList, false, 30}
	SAME      = Operator{"SAME", "完全匹配", String, false, 100}
	EQ        = Operator{"EQ", "数值等于", Number, false, 150}
	NE        = Operator{"NE", "数值不等于", Number, false, 200}
	GT        = Operator{"GT", "数值大于", Number, false, 300}
	GTE       = Operator{"GTE", "数值大于等于", Number, false, 400}
	LT        = Operator{"LT", "数值小于", Number, false, 500}
	LTE       = Operator{"LTE", "数值小于等于", Number, false, 600}
	NOT_EMPTY = Operator{"NOT_EMPTY", "存在", Single, false, 0}
	EMPTY     = Operator{"EMPTY", "不存在", Single, false, 1}
	BETWEEN   = Operator{"BETWEEN", "数值介于", NumberTuple, false, 700}
)

预定义的 Operator 实例

Functions

func CompareNumeric

func CompareNumeric(op, ks, vs string) bool

func ConditionValidator

func ConditionValidator(ctx *PipelineContext, condition Condition) bool

func Eval

func Eval(o string, k string, v string) bool

func RegisterUnit

func RegisterUnit(name string, unit any)

Types

type BaseUnit

type BaseUnit struct {
	ID         string    `json:"id"`
	UnitName   string    `json:"unit_name,omitempty"`
	IOConfig   *IOConfig `json:"io_config,omitempty"`
	UnFlowable bool      `json:"flowable,omitempty"` // 是否不接受外来输入
	Status     JobStatus `json:"status,omitempty"`
}

BaseUnit 为所有单元提供基础属性(如 ID)

func (*BaseUnit) GetFlowable

func (u *BaseUnit) GetFlowable() bool

func (*BaseUnit) GetID

func (u *BaseUnit) GetID() string

func (*BaseUnit) GetIOConfig

func (u *BaseUnit) GetIOConfig() *IOConfig

func (*BaseUnit) GetUnitName

func (u *BaseUnit) GetUnitName() string

func (*BaseUnit) Next

func (u *BaseUnit) Next(ctx *PipelineContext, i *Input) []PhaseUnit

func (*BaseUnit) PresetID

func (u *BaseUnit) PresetID()

type Condition

type Condition struct {
	Key       string      `json:"key,omitempty"`      // 条件键
	Operator  string      `json:"operator,omitempty"` // 操作符
	Value     string      `json:"value,omitempty"`    // 值
	Label     string      `json:"label,omitempty"`    // 标签
	Script    string      `json:"script,omitempty"`   // 脚本
	JointNext Joiner      `json:"joint_next,omitempty"`
	Children  []Condition `json:"children,omitempty"`
}

Condition 结构体

type ConditionFunc

type ConditionFunc func(ctx *PipelineContext) bool

ConditionFunc 定义了一个根据上下文判断条件的函数

type DictValueType

type DictValueType string
const (
	String      DictValueType = "STRING"
	StringList  DictValueType = "STRING_LIST"
	Number      DictValueType = "NUMBER"
	NumberTuple DictValueType = "NUMBER_TUPLE"
	Single      DictValueType = "SINGLE"
	JSON        DictValueType = "JSON"
	JSON_ARRAY  DictValueType = "JSON_ARRAY"
)

type IOConfig

type IOConfig struct {
	Input        Input  `json:"input,omitempty"`
	DefaultInput Input  `json:"default_input,omitempty"`
	Output       Output `json:"output,omitempty"`
}

type IfUnit

type IfUnit struct {
	BaseUnit
	IfCondition      Condition     `json:"if_condition"` // 注意函数无法直接序列化
	ElseIfConditions []Condition   `json:"else_if,omitempty"`
	IfUnits          []PhaseUnit   `json:"if_units,omitempty"`
	ElseIfUnits      [][]PhaseUnit `json:"else_if_units,omitempty"`
	ElseUnits        []PhaseUnit   `json:"else_units,omitempty"`
}

IfUnit 实现 if–else 控制,根据条件选择执行 true 或 false 分支中的单元

func (*IfUnit) Execute

func (t *IfUnit) Execute(ctx *PipelineContext, i *Input) (*Output, error)

func (*IfUnit) GetUnitName

func (t *IfUnit) GetUnitName() string

func (*IfUnit) Next

func (t *IfUnit) Next(ctx *PipelineContext, i *Input) []PhaseUnit

func (*IfUnit) UnmarshalJSON

func (t *IfUnit) UnmarshalJSON(data []byte) error

type Input

type Input struct {
	//Template string `json:"template"`  //这是一段文字,内容为{{output}},现将{{slots}}中的内容去噪
	Data      any    `json:"data,omitempty"`      //最终输出
	DataType  string `json:"data_type,omitempty"` // plaintext, json, json_array,socket
	Slottable bool   `json:"slottable,omitempty"` // 是否是可插槽的
}

func GetInput

func GetInput(unit PhaseUnit, env map[string]any) (*Input, error)

type JobStatus

type JobStatus string
const (
	TaskPending   JobStatus = "PENDING"
	TaskRunning   JobStatus = "RUNNING"
	TaskCompleted JobStatus = "COMPLETED"
	TaskFailed    JobStatus = "FAILED"
)

type Joiner

type Joiner = string // AND, OR

type Line

type Line struct {
	From string
	To   string
}

type NextResult

type NextResult struct {
	Units      []PhaseUnit
	ReplaceAll bool // true = 跳转控制流(如 if/while)
}

type Operator

type Operator struct {
	Value     string        `json:"value,omitempty"`
	Desc      string        `json:"desc,omitempty"`
	ValueType DictValueType `json:"value_type,omitempty"`
	Disabled  bool          `json:"disabled,omitempty"`
	Order     int           `json:"order,omitempty"`
}

Operator 枚举

type Output

type Output struct {
	Data      any    `json:"data,omitempty"`
	DataType  string `json:"data_type,omitempty"`
	Slottable bool   `json:"slottable,omitempty"` // 是否是可插槽的
}

type PhaseUnit

type PhaseUnit interface {
	GetID() string
	PresetID()
	GetUnitName() string
	GetFlowable() bool
	GetIOConfig() *IOConfig
	Execute(ctx *PipelineContext, input *Input) (*Output, error)
	Next(ctx *PipelineContext, input *Input) []PhaseUnit
}

PhaseUnit 定义了工作单元接口,所有单元必须实现 GetID 与 Execute 方法

func ParsePhaseUnits

func ParsePhaseUnits(jsonData []byte, typeField string) ([]PhaseUnit, error)

func ParsePhaseUnitsFromMap

func ParsePhaseUnitsFromMap(rawList []map[string]any) ([]PhaseUnit, error)

func PrepareUnits

func PrepareUnits(units []PhaseUnit) []PhaseUnit

func SafeUnits

func SafeUnits(units []PhaseUnit) []PhaseUnit

type PipeStatus

type PipeStatus struct {
	Total  int32     `json:"total,omitempty"`
	Step   string    `json:"step,omitempty"`
	Status JobStatus `json:"status,omitempty"` // 运行状态
}

type Pipeline

type Pipeline struct {
	Units       []PhaseUnit      `json:"units,omitempty"`
	Context     *PipelineContext `json:"-"`
	Interrupted bool
	LastOutput  Output
}

Pipeline 由多个 PhaseUnit 组成,并持有执行上下文

func NewPipeline

func NewPipeline(units []PhaseUnit) *Pipeline

func (*Pipeline) Interrupt

func (p *Pipeline) Interrupt()

func (*Pipeline) Run

func (p *Pipeline) Run() error

type PipelineContext

type PipelineContext struct {
	Env        map[string]any                                `json:"env,omitempty"`
	PipeStatus PipeStatus                                    `json:"pipe_status,omitempty"`
	Handler    func(ctx *PipelineContext, status PipeStatus) `json:"_"`
	Context    context.Context
}

PipelineContext 用于保存执行过程中的环境变量

func (PipelineContext) SetEnv

func (c PipelineContext) SetEnv(k string, v any)

type StoryBoard

type StoryBoard struct {
	Units []PhaseUnit //这里不是真正的Stage , 需要套一层UI-Data ,StageVo -》 Stage
	Lines []Line
}

StoryBoard 故事版跟Pipeline的相似度处: 1. 都有Units[] 2. 都有Units关系 从StoryBoard创建Pipeline,只要有连线,校验通过

func (*StoryBoard) Build

func (u *StoryBoard) Build() (p *Pipeline, e error)

type UnitOutput

type UnitOutput struct {
	ID     string `json:"id,omitempty"`
	Output any    `json:"output,omitempty"`
}

type UnitRepository

type UnitRepository struct {
	Mappings map[string]reflect.Type // 存储所有单元的映射关系
	// contains filtered or unexported fields
}

func (*UnitRepository) ParsePhaseUnits

func (r *UnitRepository) ParsePhaseUnits(jsonData []byte, typeField string) ([]PhaseUnit, error)

func (*UnitRepository) ParsePhaseUnitsFromMap

func (r *UnitRepository) ParsePhaseUnitsFromMap(rawList []map[string]any) ([]PhaseUnit, error)

func (*UnitRepository) RegisterUnit

func (r *UnitRepository) RegisterUnit(name string, unit any)

type WhileUnit

type WhileUnit struct {
	BaseUnit
	Condition Condition   `json:"condition,omitempty"`
	Units     []PhaseUnit `json:"units,omitempty"`
}

WhileUnit 循环单元 =====

func (*WhileUnit) Execute

func (t *WhileUnit) Execute(ctx *PipelineContext, i *Input) (*Output, error)

func (*WhileUnit) Next

func (t *WhileUnit) Next(ctx *PipelineContext, i *Input) []PhaseUnit

func (*WhileUnit) UnmarshalJSON

func (t *WhileUnit) UnmarshalJSON(data []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL