running

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2022 License: MIT Imports: 7 Imported by: 0

README

Running 一个简单的图节点执行框架

与其他图节点执行框架一样,running 也是将任务抽象为节点,将依赖关系抽象为边。

节点与边一起构成了图,也就是整个执行流程的抽象。

如何组织节点,按照预定的流程执行,就是 running 要解决的问题。

主要特点

注重并发执行场景

running 的设计注重许多任务需要并发执行的场景。所以它适合在线计算任务,而非离线计算任务。

例如 running 在执行过程中实时计算依赖,而非预先计算执行步骤,顺序执行。只要上游依赖解决,那么节点就会尽快运行。

此外 running 还着重考虑了冷启动问题,缓解瞬时高并发造成的性能问题。

支持定义复杂执行流程

在某些情况下,纯粹用图定义执行流程是比较困难的。

比如循环和分支选择流程,running 定义了簇 cluster 来解决这个问题。

cluster 是一类特殊的节点,它包含了若干个子节点,这些子节点的执行时机不再通过计算依赖关系确定,而是由 cluster 决定。

因为 cluster 也属于节点,所以它也可以嵌套在其他的 cluster 中,实现更加复杂的流程。

动态增强节点

将任务逻辑封装为节点,不可避免地会带来一定的心智负担。

running 定义了装饰器 wrapper 来动态增强节点,使用组合的理念降低负担。

一是增加节点的可复用性,避免需要频繁封装类似功能的节点。二是增加灵活性,新功能随用随加。

wrapper 也是一类特殊的节点,它包含要增强的目标节点,可以在目标节点执行前后执行增强逻辑。并且也可以嵌套。

方便地导出和载入图定义

running 支持对构建好的图进行序列化和反序列化,方便不同环境同步。

例如,工作流程在测试环境验证完成后,导出序列化图,线上环境热加载图定义就可以马上同步工作流程。

如何入手

package common

common 包定义了一些通用的 cluster 、wrapper 和 node 实现。

结合对应 test 代码和方法注释就可以基本掌握如何使用 running。

十分建议在阅读完本文档后,阅读这些代码。有错误的地方也欢迎指出。

running.Engine

running.Engine 提供了一组方法用于注册节点构建函数,管理计划(包括图定义和节点初始化参数)和 worker 池。

考虑到对 Engine 的定制需求较少,running 初始化了一个全局变量并暴露对应了方法。

因此可以直接调用 running.RegisterNodeBuilder 等函数,无需自行创建 Engine 实例来调用其方法。

有需要的话,如出于隔离目的,也可以使用 running.NewDefaultEngine 创建新的 Engine 实例。

running.Plan

plan 包含了图定义和节点的初始化参数,图定义通过一系列操作选项确定。

将 plan 通过 RegisterPlan 注册到 engine 中后,就可以调用 ExecPlan 执行。

创建 plan 的函数签名如下:

func NewPlan(props Props, prebuilt []Node, options ...Option) *Plan

  • props 即节点参数,running 提供了一个基于 map[string]interface{} 的实现 StandardProps。
    • key 基于约定设为 节点名.参数名,value 为参数值
  • prebuilt 为预建节点,需要实现 Clone 方法,可以用于减少运行过程中节点构建消耗
  • options 即操作选项,包括连接节点,合并节点为 cluster,包装节点等
    • 具体操作类型请参考 plan_options.go 及 test 代码
running.Base & running.BaseWrapper

为了简化代码封装,running 提供 running.Base 和 running.BaseWrapper

将 running.Base 嵌入结构体后就自动实现了 Node 和 Cluster 接口的大部分方法,running.BaseWrapper 则实现 Wrapper 的接口方法, 只需再实现 Run 方法即可。 如果通用实现不满足要求,按需求重写对应方法即可。

running.State

running.State 用于运行时节点间通信。只要实现 running.Stateful 接口, 节点在运行时就会被绑定 state。

通过对 state 增改查就能实现节点间的通信。

要注意的是,running 提供了 State 的一个实现 StandardState , StandardState 通过读写锁在一定程度上可以保证并发安全,但是前提是对于 Query 得到的对象不要做任何修改(尤其是引用类型)。 需要做修改时,使用 Update 方法或 Transform 方法。

在安全性或性能上有定制需求时,可以自行实现 State 接口,并设置 Engine 的 StateBuilder。

状态和路线图

目前项目任然处于初期阶段,不建议生产使用,同时欢迎提交 issue 来让 running 变得更好。

  • 更加灵活的 props 操作

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AddNodes = func(typ string, names ...string) Option {
	return func(dag *_DAG) {
		for _, name := range names {
			if _, ok := dag.NodeRefs[name]; !ok {
				dag.NodeRefs[name] = &_NodeRef{
					NodeName: name,
					NodeType: typ,
				}
			}
		}
	}
}

AddNodes add nodes. typ declare node type, names declare name of each one. node must be added before other options.

View Source
var Global = NewDefaultEngine()
View Source
var LinkNodes = func(nodes ...string) Option {
	return func(dag *_DAG) {
		if len(nodes) < 1 {
			return
		}

		for _, root := range nodes {
			if _, ok := dag.Vertexes[root]; !ok {
				if _, ok := dag.NodeRefs[root]; ok {
					dag.Vertexes[root] = &_Vertex{
						RefRoot: dag.NodeRefs[root],
					}
				} else {
					dag.Warning = append(dag.Warning, fmt.Sprintf("link target node %s ref not found", root))
				}
			}
		}

		if dag.Vertexes[nodes[0]] != nil {
			for _, node := range nodes[1:] {
				if dag.Vertexes[node] != nil {
					dag.Vertexes[nodes[0]].Next = append(dag.Vertexes[nodes[0]].Next, dag.Vertexes[node])
					dag.Vertexes[node].Prev++
				}
			}
		}
	}
}

LinkNodes link first node with others. example: LinkNodes("A", "B", "C") => A -> B, A -> C.

View Source
var MergeNodes = func(cluster string, subNodes ...string) Option {
	return func(dag *_DAG) {
		if clusterRef, ok := dag.NodeRefs[cluster]; !ok {
			dag.Warning = append(dag.Warning, fmt.Sprintf("cluster %s ref not found", cluster))
			return
		} else {
			for _, node := range subNodes {
				if _, ok := dag.NodeRefs[node]; ok {
					clusterRef.SubRefs = append(clusterRef.SubRefs, dag.NodeRefs[node])
				} else {
					dag.Warning = append(dag.Warning, fmt.Sprintf("sub node %s ref not found", node))
				}
			}
		}
	}
}

MergeNodes merge other nodes as sub-node of the first node. example: MergeNodes("A", "B", "C"). if node "A" implement the Cluster interface, node "B" and "C" will be injected, then "A" could use "B" and "C" as sub-nodes.

View Source
var RLinkNodes = func(nodes ...string) Option {
	return func(dag *_DAG) {
		if len(nodes) < 1 {
			return
		}

		for _, root := range nodes {
			if _, ok := dag.Vertexes[root]; !ok {
				if _, ok := dag.NodeRefs[root]; ok {
					dag.Vertexes[root] = &_Vertex{
						RefRoot: dag.NodeRefs[root],
					}
				} else {
					dag.Warning = append(dag.Warning, fmt.Sprintf("link target node %s ref not found", root))
				}
			}
		}

		if dag.Vertexes[nodes[0]] != nil {
			for _, node := range nodes[1:] {
				if dag.Vertexes[node] != nil {
					dag.Vertexes[node].Next = append(dag.Vertexes[node].Next, dag.Vertexes[nodes[0]])
					dag.Vertexes[nodes[0]].Prev++
				}
			}
		}
	}
}

RLinkNodes link first node with others. example: RLinkNodes("A", "B", "C") => B -> A, C -> A.

View Source
var ReUseNodes = func(nodes ...string) Option {
	return func(dag *_DAG) {
		for _, node := range nodes {
			if dag.NodeRefs[node] != nil {
				dag.NodeRefs[node].ReUse = true
			}
		}
	}
}

ReUseNodes reuse node to avoid unnecessary rebuilds, fits nodes whose properties do not change and implements the clone method

View Source
var SLinkNodes = func(nodes ...string) Option {
	return func(dag *_DAG) {
		if len(nodes) < 1 {
			return
		}

		for _, root := range nodes {
			if _, ok := dag.Vertexes[root]; !ok {
				if _, ok := dag.NodeRefs[root]; ok {
					dag.Vertexes[root] = &_Vertex{
						RefRoot: dag.NodeRefs[root],
					}
				} else {
					dag.Warning = append(dag.Warning, fmt.Sprintf("Slink target node %s ref not found", root))
				}
			}
		}

		for i := range nodes {
			if i < len(nodes)-1 {
				prev, next := dag.Vertexes[nodes[i]], dag.Vertexes[nodes[i+1]]

				if prev != nil && next != nil {
					prev.Next = append(prev.Next, next)
					next.Prev++
				}
			}
		}
	}
}

SLinkNodes link nodes serially. example: SLinkNodes("A", "B", "C") => A -> B -> C.

View Source
var WrapAllNodes = func(wrappers ...string) Option {
	return func(dag *_DAG) {
		for _, wrapper := range wrappers {
			for _, ref := range dag.NodeRefs {
				if ref != nil {
					ref.Wrappers = append(ref.Wrappers, wrapper)
				}
			}
		}
	}
}

WrapAllNodes wrap all nodes with single or multi wrappers, will only affect nodes added before this

View Source
var WrapNodes = func(wrapper string, targets ...string) Option {
	return func(dag *_DAG) {
		for _, target := range targets {
			if targetNodeRef := dag.NodeRefs[target]; targetNodeRef != nil {
				targetNodeRef.Wrappers = append(targetNodeRef.Wrappers, wrapper)
			} else {
				dag.Warning = append(dag.Warning, fmt.Sprintf("wrap target node %s ref not found", target))
			}
		}
	}
}

WrapNodes wrap node to enhance it, wrapper:node type which implement Wrapper, targets:wrap targets

Functions

func ClearPool

func ClearPool(name string)

ClearPool clear worker pool of plan, invoke it to make plan effect immediately after update name: name of plan

func ExecPlan

func ExecPlan(name string, ctx context.Context) <-chan Output

ExecPlan exec plan register in Global

func ExportPlan added in v0.2.1

func ExportPlan(name string) ([]byte, error)

ExportPlan export plan register in Global, return json bytes

func LoadPlanFromJson

func LoadPlanFromJson(name string, jsonData []byte, prebuilt []Node) error

LoadPlanFromJson load plan from json data name: name of plan to load jsonData: json data of plan prebuilt: prebuilt nodes, can be nil

func RegisterNodeBuilder

func RegisterNodeBuilder(name string, builder BuildNodeFunc)

RegisterNodeBuilder register node builder to Global

func RegisterPlan

func RegisterPlan(name string, plan *Plan) error

RegisterPlan register plan to Global

func UpdatePlan

func UpdatePlan(name string, update func(plan *Plan)) error

UpdatePlan update plan register in Global.

func WarmupPool added in v0.2.2

func WarmupPool(name string, size int)

WarmupPool warm up pool to avoid cold start name: plan name size: set size of worker buf queue

Types

type Base

type Base struct {
	NodeName string

	State State

	SubNodes []Node

	SubNodesMap map[string]Node
}

Base a simple impl of Node, Cluster, Stateful Embed it in custom node and override interface methods as needed

func (*Base) Bind

func (base *Base) Bind(state State)

func (*Base) Inject

func (base *Base) Inject(nodes []Node)

func (*Base) Name

func (base *Base) Name() string

func (*Base) Reset

func (base *Base) Reset()

func (*Base) ResetSubNodes

func (base *Base) ResetSubNodes()

func (*Base) Revert added in v0.2.1

func (base *Base) Revert(ctx context.Context)

func (*Base) Run

func (base *Base) Run(ctx context.Context)

func (*Base) SetName

func (base *Base) SetName(name string)

type BaseWrapper

type BaseWrapper struct {
	Target Node

	State State
}

func (*BaseWrapper) Bind

func (wrapper *BaseWrapper) Bind(state State)

func (*BaseWrapper) Name

func (wrapper *BaseWrapper) Name() string

func (*BaseWrapper) Reset

func (wrapper *BaseWrapper) Reset()

func (*BaseWrapper) Run

func (wrapper *BaseWrapper) Run(ctx context.Context)

func (*BaseWrapper) Wrap

func (wrapper *BaseWrapper) Wrap(target Node)

type BuildNodeFunc

type BuildNodeFunc func(name string, props Props) (Node, error)

type Cloneable

type Cloneable interface {
	Node

	// Clone self
	Clone() Node
}

Cloneable a class of nodes that can be cloned

type Cluster

type Cluster interface {
	Node

	// Inject deliver the sub-nodes, will be called when engine build the cluster
	Inject(nodes []Node)
}

Cluster a class of nodes that can contain other nodes

type Edge added in v0.2.3

type Edge struct {
	From string
	To   string
}

type EmptyProps

type EmptyProps struct{}

func (EmptyProps) Copy added in v0.2.1

func (props EmptyProps) Copy() Props

func (EmptyProps) Get

func (props EmptyProps) Get(key string) (value interface{}, exists bool)

func (EmptyProps) SubGet

func (props EmptyProps) SubGet(sub, key string) (value interface{}, exists bool)

type Engine

type Engine struct {
	StateBuilder func() State
	// contains filtered or unexported fields
}

func NewDefaultEngine

func NewDefaultEngine() *Engine

func (*Engine) ClearPool

func (engine *Engine) ClearPool(name string)

ClearPool clear worker pool of plan, invoke it to make plan effect immediately after update name: name of plan

func (*Engine) ExecPlan

func (engine *Engine) ExecPlan(name string, ctx context.Context) <-chan Output

ExecPlan exec plan register in engine

func (*Engine) ExportPlan added in v0.2.1

func (engine *Engine) ExportPlan(name string) ([]byte, error)

func (*Engine) LoadPlanFromJson

func (engine *Engine) LoadPlanFromJson(name string, jsonData []byte, prebuilt []Node) error

LoadPlanFromJson load plan from json data name: name of plan to load jsonData: json data of plan prebuilt: prebuilt nodes, can be nil

func (*Engine) RegisterNodeBuilder

func (engine *Engine) RegisterNodeBuilder(name string, builder BuildNodeFunc)

RegisterNodeBuilder register node builder to engine

func (*Engine) RegisterPlan

func (engine *Engine) RegisterPlan(name string, plan *Plan) error

RegisterPlan register plan to engine

func (*Engine) UpdatePlan

func (engine *Engine) UpdatePlan(name string, update func(plan *Plan)) error

UpdatePlan update plan register in engine

func (*Engine) WarmupPool added in v0.2.2

func (engine *Engine) WarmupPool(name string, size int)

WarmupPool warm up pool to avoid cold start name: plan name size: set size of worker buf queue

type ExportableProps added in v0.2.3

type ExportableProps interface {
	Raw() map[string]interface{}
}

ExportableProps can be used to lookup raw data and serialization

type GraphNode added in v0.1.2

type GraphNode struct {
	Node *JsonNode

	NextNodes []string
}

type Inspector added in v0.2.3

type Inspector struct {
	// contains filtered or unexported fields
}

func Inspect added in v0.2.3

func Inspect(e *Engine) Inspector

func (Inspector) DescribePlan added in v0.2.3

func (i Inspector) DescribePlan(name string) PlanInfo

func (Inspector) GetNodeBuildersName added in v0.2.3

func (i Inspector) GetNodeBuildersName() []string

func (Inspector) GetPlansName added in v0.2.3

func (i Inspector) GetPlansName() []string

type JsonNode

type JsonNode struct {
	Name string

	Type string

	SubNodes []*JsonNode

	Wrappers []string

	ReUse bool
}

type JsonPlan

type JsonPlan struct {
	Props json.RawMessage

	Graph []GraphNode
}

type Node

type Node interface {
	Name() string

	// Run will be called when all deps solved or cluster invoke it
	Run(ctx context.Context)

	// Reset will be called when the node will no longer execute until the next execution plan
	Reset()
}

Node basic unit of execution

type NodeInfo added in v0.2.3

type NodeInfo struct {
	NodeType string

	NodeName string

	Props map[string]interface{}

	Wrappers []string

	ReUse bool

	SubNodes []NodeInfo
}

type Option

type Option func(*_DAG)

type Output

type Output struct {
	Err error

	State State
}

type Plan

type Plan struct {
	Props Props

	Prebuilt []Node

	Options []Option

	Strict bool
	// contains filtered or unexported fields
}

Plan explain how to execute nodes

func NewPlan

func NewPlan(props Props, prebuilt []Node, options ...Option) *Plan

NewPlan new a plan. props: build props of nodes. prebuilt: prebuilt nodes, reduce cost of build node, nil is fine. options: AddNodes, LinkNodes and so on.

func (*Plan) Init

func (plan *Plan) Init() error

Init Plan take effect only after initialization. if plan is invalid, such as circular dependencies, return error.

func (*Plan) MarshalJSON

func (plan *Plan) MarshalJSON() ([]byte, error)

func (*Plan) UnmarshalJSON

func (plan *Plan) UnmarshalJSON(bytes []byte) error

type PlanInfo added in v0.2.3

type PlanInfo struct {
	Version string

	Vertexes []VertexInfo

	Edges []Edge

	GlobalProps map[string]interface{}
}

type Props

type Props interface {
	// Get return global value of the key
	Get(key string) (interface{}, bool)

	//SubGet node value of the key, deliver node name as sub
	SubGet(sub, key string) (interface{}, bool)

	// Copy safe use of copies
	Copy() Props
}

Props provide build parameters for the node builder

type Reversible added in v0.2.1

type Reversible interface {
	Node

	Revert(ctx context.Context)
}

Reversible a class of nodes that can be reverted

type StandardProps

type StandardProps map[string]interface{}

func (StandardProps) Copy added in v0.2.1

func (props StandardProps) Copy() Props

func (StandardProps) Get

func (props StandardProps) Get(key string) (value interface{}, exists bool)

func (StandardProps) Raw added in v0.2.3

func (props StandardProps) Raw() map[string]interface{}

func (StandardProps) SubGet

func (props StandardProps) SubGet(sub, key string) (value interface{}, exists bool)

type StandardState

type StandardState struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewStandardState

func NewStandardState() *StandardState

func (*StandardState) Query

func (state *StandardState) Query(key string) (value interface{}, exists bool)

func (*StandardState) Transform

func (state *StandardState) Transform(key string, transform TransformStateFunc)

func (*StandardState) Update

func (state *StandardState) Update(key string, value interface{})

type State

type State interface {
	// Query return value of the key
	Query(key string) (interface{}, bool)

	// Update set a new value for the key
	Update(key string, value interface{})

	// Transform set a new value for the key, according to the old value
	Transform(key string, transform TransformStateFunc)
}

State store state of nodes

type Stateful

type Stateful interface {
	Node

	// Bind deliver the state, should be called before engine run the node
	Bind(state State)
}

Stateful a class of nodes that need record or query state

type TransformStateFunc

type TransformStateFunc func(from interface{}) interface{}

type VertexInfo added in v0.2.3

type VertexInfo struct {
	VertexName string

	NodeInfo NodeInfo
}

type Wrapper

type Wrapper interface {
	Node

	Wrap(target Node)
}

Wrapper a class of nodes that can wrap other node

Directories

Path Synopsis
Package common implement some node, cluster, state for common usage
Package common implement some node, cluster, state for common usage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL