running

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2022 License: MIT Imports: 7 Imported by: 0

README

Running

Running 是什么

Running 是一个基于 DAG 的 Golang 图化执行框架。

目标是实现方便,灵活地切换算子的组合方式和执行顺序,并发挥 Golang 的并发优势。

特点
  • 定义 Node,定义 plan,执行 plan 三步走
  • 内置基本实现,目标开箱即用
  • 可以并行执行的就并行执行,目标高性能
  • 无任何第三方依赖,目标稳定可靠

使用说明

简单使用
示例代码
package example

import (
	"context"
	"fmt"
	"log"

	"github.com/symphony09/running"
	"github.com/symphony09/running/common"
)

func BaseUsage() {
	running.RegisterNodeBuilder("Greet",
		common.NewSimpleNodeBuilder(func(ctx context.Context) {
			fmt.Println("Hello!")
		}))

	running.RegisterNodeBuilder("Introduce",
		common.NewSimpleNodeBuilder(func(ctx context.Context) {
			fmt.Println("This is", ctx.Value("name"), ".")
		}))

	err := running.RegisterPlan("Plan1",
		running.NewPlan(nil, nil,
			running.AddNodes("Greet", "Greet1"),
			running.AddNodes("Introduce", "Introduce1"),
			running.SLinkNodes("Greet1", "Introduce1")))

	if err != nil {
		log.Fatalln(err)
	}

	ctx := context.WithValue(context.Background(), "name", "RUNNING")

	<-running.ExecPlan("Plan1", ctx)
}

输出
Hello!
This is RUNNING .
说明

示例代码做了以下几件事:

  1. 注册 Greet,Introduce 两个 Node 构建函数

Node 是引擎的执行单位,引擎会管理 Node 的构建和执行。所以需要注册 Node 的构建函数,而不是具体的 Node。

common.NewSimpleNodeBuilder 接受一个签名为 func(ctx context.Context) 的函数,返回 SimpleNode 的构建函数。

SimpleNode 是引擎的一个内置 Node 实现。

NewSimpleNodeBuilder 接受的函数会封装在 SimpleNode 内,引擎执行 SimpleNode 时就会调用此函数。

  1. 注册 Plan1

Plan 是引擎的执行规划,有了封装了运算逻辑的 Node 后,就可以规划如何执行 Node 了。

这里先忽略running.NewPlan 的前两个参数,第三个参数开始是不定长参数,定义了一系列操作:

  • AddNodes:添加 Node

    • 第一个参数是 Node 类型,对应之前注册的 Node 构建函数
    • 第二个参数开始是不定长参数,对应具体 Node 的名字。几个名字,就对应几个 Node。
  • SLinkNodes:不定长参数,将添加的 Node 串行连接起来

示例代码的 plan 可以简单表示为 Greet1 -> Introduce1

Greet1 由 Greet 对应的构建函数构建,执行时输出 Hello!

Introduce1 由 Introduce 对应的构建函数构建,执行时输出 This is 加上上下文参数中的 name 值。

默认情况下,没有通过 AddNode 添加 Node,直接在其他操作如 SLinkNodes 中引用 Node 不会报错

例如:将实例代码中 running.SLinkNodes("Greet1", "Introduce1"))) 改为 running.SLinkNodes("Greet1", "Introduce1", "END")))

END 并没有通过 AddNode 添加,引擎还是会按 Greet1 -> Introduce1 执行,直接忽略 END

如果将 Plan 的 Strict 属性设为 true,则可以严格按照 Plan 执行,如果没有找到 END,注册 Plan 就会报错。

  1. 执行 Plan1

在 plan 注册完成后就可以在任意时机,执行任意次数 plan。

running.ExecPlan 接受两个参数。一个是 Plan 名,另一个是执行的上下文参数,上下文参数会由引擎传递给 Node 的运行函数。

ExecPlan 会立即返回一个通道,真正的执行逻辑是异步执行的,最后将结果通过通道返回。

自定义 Node

当引擎内置的 Node 实现不能满足需要时,可以自定义 Node 使用。

Node 接口定义:

type Node interface {
	Name() string

	Run(ctx context.Context)

	Reset()
}

Node 接口共三个方法,分别用于获取 Node 名,执行运行逻辑,重置 Node 状态。

重置方法在当次计划执行过程中 Node 不会再执行时调用。引擎不会每次都创建新的 Node 来执行计划,所以需要通过重置方法来初始化 Node。

示例代码
type IntroduceNode struct {
	running.Base

	Words string
}

func NewIntroduceNode(name string, props running.Props) (running.Node, error) {
	node := new(IntroduceNode)
	node.SetName(name)

	helper := utils.ProxyProps(props)
	node.Words = helper.SubGetString(name, "words")

	return node, nil
}

func (i *IntroduceNode) Run(ctx context.Context) {
	fmt.Println(i.Words)
}

func BaseUsage02() {
	running.RegisterNodeBuilder("Greet",
		common.NewSimpleNodeBuilder(func(ctx context.Context) {
			fmt.Println("Hello!")
		}))

	running.RegisterNodeBuilder("Introduce", NewIntroduceNode)

	props := running.StandardProps(map[string]interface{}{
		"Introduce1.words": "This is RUNNING .",
	})

	err := running.RegisterPlan("Plan2",
		running.NewPlan(props, nil,
			running.AddNodes("Greet", "Greet1"),
			running.AddNodes("Introduce", "Introduce1"),
			running.SLinkNodes("Greet1", "Introduce1")))

	if err != nil {
		log.Fatalln(err)
	}

	ctx := context.Background()

	<-running.ExecPlan("Plan2", ctx)
}
输出
Hello!
This is RUNNING .
说明

有几点需要说明:

  • IntroduceNode 不需要实现 Name 和 Reset 是因为嵌入的 running.Base 已经实现了,SetName 也是 running.Base 实现的。
  • Props 用于为 Node 构建函数提供构建参数
    • 引擎内置了一个基于 Map 的 Props 实现,即 StandardProps,Key 格式为 Node 名 + “.” + 参数名
    • utils.ProxyProps 用于简化参数类型断言
更复杂的 Plan

上文提到了 AddNodes 和 SLinkNodes,除了这两种引擎还支持 MergeNodes,WrapNodes 和 LinkNodes。

  • MergeNodes :将一些 Node 合并为 一个 Node 的 子 Node,子 Node 如何执行由父 Node 决定。
  • WrapNodes:包装一些 Node 进行功能增强,如耗时统计,记录日志等,可以参考 test/wrap_test.go。
  • LinkNodes:与 SLinkNodes 类似,但连接方式略有不同,LinkNodes 是将其他 Node 同时作为一个 Node的后继。
示例代码
func BaseUsage03() {
	running.RegisterNodeBuilder("Greet",
		common.NewSimpleNodeBuilder(func(ctx context.Context) {
			fmt.Println("Hello!")
		}))

	running.RegisterNodeBuilder("Bye",
		common.NewSimpleNodeBuilder(func(ctx context.Context) {
			fmt.Println("bye!")
		}))

	running.RegisterNodeBuilder("Introduce", NewIntroduceNode)

	ops := []running.Option{
		running.AddNodes("Greet", "Greet1"),
		running.AddNodes("Bye", "Bye1"),
		running.AddNodes("Introduce", "Introduce1", "Introduce2", "Introduce3"),
		running.AddNodes("Select", "Select1"),
		running.MergeNodes("Select1", "Introduce2", "Introduce3"),
		running.LinkNodes("Greet1", "Select1", "Introduce1"),
		running.SLinkNodes("Introduce1", "Bye1"),
		running.SLinkNodes("Select1", "Bye1"),
	}

	props := running.StandardProps(map[string]interface{}{
		"Introduce1.words":         "This is RUNNING .",
		"Select1.Introduce2.words": "A good day .",
		"Select1.Introduce3.words": "A terrible day .",
		"Select1.selected":         "Introduce2",
	})

	err := running.RegisterPlan("Plan3", running.NewPlan(props, nil, ops...))

	if err != nil {
		log.Fatalln(err)
	}

	ctx := context.Background()

	<-running.ExecPlan("Plan3", ctx)
}
输出
Hello!
This is RUNNING .
A good day .
bye!
说明

程序执行流程如下:

Greet1 -> Introduce1 -> Select1.Introduce2 -> Bye1

Greet1 -> Select1.Introduce2 -> Introduce1 -> Bye1

示例代码中,Select 是在引入 common 包时自动注册的 Node,Select 可以合并其他 Node,称为 Cluster (簇)。

Select 会根据 props 传入的参数,从合并的 Node 中选择 Node 执行。

自定义 Cluster

Cluster 接口定义:

type Cluster interface {
	Node

	Inject(nodes []Node)
}

Inject 方法用于引擎根据 plan 注入子 Node,嵌入 running.Base 可以自动实现此方法

func (base *Base) Inject(nodes []Node) {
	base.SubNodes = append(base.SubNodes, nodes...)

	if base.SubNodesMap == nil {
		base.SubNodesMap = make(map[string]Node)
	}

	for _, node := range nodes {
		base.SubNodesMap[node.Name()] = node
	}
}

嵌入 running.Base 的结构体可以间接通过 Base 获取 SubNodes 和 SubNodesMap 字段,从而执行这些 Node。

具体实现方法可以参考 common 包下的源码。

Node 间通信

在引擎中,Node 间通过 State 通信,执行完成后 State 也会作为 ExecPlan 的执行结果从通道返回。

要使用 State,Node 需要实现 Stateful 接口:

type Stateful interface {
	Node

	Bind(state State)
}

Bind 用于引擎为 Node 绑定状态,嵌入 running.Base 可以自动实现此方法。

func (base *Base) Bind(state State) {
	base.State = state

	for _, node := range base.SubNodes {
		if statefulNode, ok := node.(Stateful); ok {
			statefulNode.Bind(state)
		}
	}
}

嵌入 running.Base 的结构体可以间接通过 Base 获取 State 字段,从而读取和写入 State。

State 定义如下:

type State interface {
	Query(key string) (interface{}, bool)

	Update(key string, value interface{})

	Transform(key string, transform TransformStateFunc)
}

分别用于查询 State,更新 State 和转换 State,引擎内置了 并发安全的 StandardState 实现。

示例代码
type Counter struct {
	running.Base
}

func (node *Counter) Run(ctx context.Context) {
	node.State.Transform("count", func(from interface{}) interface{} {
		if from == nil {
			return 1
		}
		if count, ok := from.(int); ok {
			count++
			return count
		} else {
			return from
		}
	})
}

type Reporter struct {
	running.Base
}

func (node *Reporter) Run(ctx context.Context) {
	count, _ := node.State.Query("count")
	fmt.Printf("count = %d\n", count)
}

func BaseUsage04() {
	running.RegisterNodeBuilder("Counter", func(name string, props running.Props) (running.Node, error) {
		node := new(Counter)
		node.SetName(name)
		return node, nil
	})

	running.RegisterNodeBuilder("Reporter", func(name string, props running.Props) (running.Node, error) {
		node := new(Reporter)
		node.SetName(name)
		return node, nil
	})

	ops := []running.Option{
		running.AddNodes("Counter", "Counter1"),
		running.AddNodes("Reporter", "Reporter1"),
		running.AddNodes("Loop", "Loop1"),
		running.MergeNodes("Loop1", "Counter1"),
		running.SLinkNodes("Loop1", "Reporter1"),
	}

	props := running.StandardProps(map[string]interface{}{
		"Loop1.max_loop": 3,
	})

	err := running.RegisterPlan("Plan4", running.NewPlan(props, nil, ops...))

	if err != nil {
		log.Fatalln(err)
	}

	ctx := context.Background()

	<-running.ExecPlan("Plan4", ctx)
}
输出
count = 3
说明

Loop 也是 common 包内定义的 cluster,可以按 props 中的 max_loop 参数循环执行子Node。

Loop1 的 max_loop 参数设为 3,则 Counter1 最多循环执行 3 次。

Counter1 执行时将 count 写入 State,而 Reporter1 执行时从 State 读取 count 并打印。

utils 包也有简化 State 类型断言的 helper。

ExecPlan 返回的通道的基础类型为 <-chan Output,Output 定义如下:

type Output struct {
	Err error

	State State
}

如果 plan 顺利执行,引擎会把 State 透出供外部代码使用。

更新 Plan

更新函数为running.UdatePlan,签名为 func UpdatePlan(name string, fastMode bool, update func(plan *Plan)) error

第一个参数为要更新的 plan 名,第二个参数设置是否快速生效,如果设为 true 即快速生效,Worker 池会被清空,第三个参数则是 plan 的具体更新函数。

预建 Node

有时,构建 Node 的成本是高昂的,虽然引擎已经通过 Worker 池复用 Node 来减小开销,

但是在需要新建 Worker 的情况下,还是会存在开销过大的问题,这在 plan 执行次数还比较少时或突然加快执行频率时会比较突出。

为了解决这个问题,引擎支持从 plan 中获取预先构建好的 Node 的复制而不是重新构建 Node 来减小构建开销。

running.NewPlan 的第二个参数用于接收预建 Node 数组。如:

c2 := new(TestNode3)
c2.SetName("C2")
a6 := new(TestNode2)
a6.SetName("C2.A6")
c2.Inject([]running.Node{a6})

plan := running.NewPlan(running.EmptyProps{}, []running.Node{c2}, ops...)

要注意的是,多个复制而来的 Node 之间可能通过指针相互影响,这通常不是我们所期望的。

所以最好为预建 Node 实现 Cloneable 接口:

type Cloneable interface {
	Node

	Clone() Node
}

这样引擎就会调用预建 Node 的 Clone 方法获取克隆Node,而不是直接浅拷贝预建 Node。

Plan 导出和载入

Plan 支持 json 序列化和反序列化,所以可以方便的导出和保存。

在序列化之前,Plan 首先应该调用 Init 方法进行初始化。

要载入 Plan,调用引擎的 LoadPlanFromJson 的方法即可。

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AddNodes = func(typ string, names ...string) Option {
	return func(dag *_DAG) {
		for _, name := range names {
			if _, ok := dag.NodeRefs[name]; !ok {
				dag.NodeRefs[name] = &_NodeRef{
					NodeName: name,
					NodeType: typ,
				}
			}
		}
	}
}

AddNodes add nodes. typ declare node type, names declare name of each one. node must be added before other options.

View Source
var Global = NewDefaultEngine()
View Source
var LinkNodes = func(nodes ...string) Option {
	return func(dag *_DAG) {
		if len(nodes) < 1 {
			return
		}

		for _, root := range nodes {
			if _, ok := dag.Vertexes[root]; !ok {
				if _, ok := dag.NodeRefs[root]; ok {
					dag.Vertexes[root] = &_Vertex{
						RefRoot: dag.NodeRefs[root],
					}
				} else {
					dag.Warning = append(dag.Warning, fmt.Sprintf("link target node %s ref not found", root))
				}
			}
		}

		if dag.Vertexes[nodes[0]] != nil {
			for _, node := range nodes[1:] {
				if dag.Vertexes[node] != nil {
					dag.Vertexes[nodes[0]].Next = append(dag.Vertexes[nodes[0]].Next, dag.Vertexes[node])
					dag.Vertexes[node].Prev++
				}
			}
		}
	}
}

LinkNodes link first node with others. example: LinkNodes("A", "B", "C") => A -> B, A -> C.

View Source
var MergeNodes = func(cluster string, subNodes ...string) Option {
	return func(dag *_DAG) {
		if clusterRef, ok := dag.NodeRefs[cluster]; !ok {
			dag.Warning = append(dag.Warning, fmt.Sprintf("cluster %s ref not found", cluster))
			return
		} else {
			for _, node := range subNodes {
				if _, ok := dag.NodeRefs[node]; ok {
					clusterRef.SubRefs = append(clusterRef.SubRefs, dag.NodeRefs[node])
				} else {
					dag.Warning = append(dag.Warning, fmt.Sprintf("sub node %s ref not found", node))
				}
			}
		}
	}
}

MergeNodes merge other nodes as sub-node of the first node. example: MergeNodes("A", "B", "C"). if node "A" implement the Cluster interface, node "B" and "C" will be injected, then "A" could use "B" and "C" as sub-nodes.

View Source
var RLinkNodes = func(nodes ...string) Option {
	return func(dag *_DAG) {
		if len(nodes) < 1 {
			return
		}

		for _, root := range nodes {
			if _, ok := dag.Vertexes[root]; !ok {
				if _, ok := dag.NodeRefs[root]; ok {
					dag.Vertexes[root] = &_Vertex{
						RefRoot: dag.NodeRefs[root],
					}
				} else {
					dag.Warning = append(dag.Warning, fmt.Sprintf("link target node %s ref not found", root))
				}
			}
		}

		if dag.Vertexes[nodes[0]] != nil {
			for _, node := range nodes[1:] {
				if dag.Vertexes[node] != nil {
					dag.Vertexes[node].Next = append(dag.Vertexes[node].Next, dag.Vertexes[nodes[0]])
					dag.Vertexes[nodes[0]].Prev++
				}
			}
		}
	}
}

RLinkNodes link first node with others. example: RLinkNodes("A", "B", "C") => B -> A, C -> A.

View Source
var ReUseNodes = func(nodes ...string) Option {
	return func(dag *_DAG) {
		for _, node := range nodes {
			if dag.NodeRefs[node] != nil {
				dag.NodeRefs[node].ReUse = true
			}
		}
	}
}

ReUseNodes reuse node to avoid unnecessary rebuilds, fits nodes whose properties do not change and implements the clone method

View Source
var SLinkNodes = func(nodes ...string) Option {
	return func(dag *_DAG) {
		if len(nodes) < 1 {
			return
		}

		for _, root := range nodes {
			if _, ok := dag.Vertexes[root]; !ok {
				if _, ok := dag.NodeRefs[root]; ok {
					dag.Vertexes[root] = &_Vertex{
						RefRoot: dag.NodeRefs[root],
					}
				} else {
					dag.Warning = append(dag.Warning, fmt.Sprintf("Slink target node %s ref not found", root))
				}
			}
		}

		for i := range nodes {
			if i < len(nodes)-1 {
				prev, next := dag.Vertexes[nodes[i]], dag.Vertexes[nodes[i+1]]

				if prev != nil && next != nil {
					prev.Next = append(prev.Next, next)
					next.Prev++
				}
			}
		}
	}
}

SLinkNodes link nodes serially. example: SLinkNodes("A", "B", "C") => A -> B -> C.

View Source
var WrapAllNodes = func(wrappers ...string) Option {
	return func(dag *_DAG) {
		for _, wrapper := range wrappers {
			for _, ref := range dag.NodeRefs {
				if ref != nil {
					ref.Wrappers = append(ref.Wrappers, wrapper)
				}
			}
		}
	}
}

WrapAllNodes wrap all nodes with single or multi wrappers, will only affect nodes added before this

View Source
var WrapNodes = func(wrapper string, targets ...string) Option {
	return func(dag *_DAG) {
		for _, target := range targets {
			if targetNodeRef := dag.NodeRefs[target]; targetNodeRef != nil {
				targetNodeRef.Wrappers = append(targetNodeRef.Wrappers, wrapper)
			} else {
				dag.Warning = append(dag.Warning, fmt.Sprintf("wrap target node %s ref not found", target))
			}
		}
	}
}

WrapNodes wrap node to enhance it, wrapper:node type which implement Wrapper, targets:wrap targets

Functions

func ClearPool

func ClearPool(name string)

ClearPool clear worker pool of plan, invoke it to make plan effect immediately after update name: name of plan

func ExecPlan

func ExecPlan(name string, ctx context.Context) <-chan Output

ExecPlan exec plan register in Global

func ExportPlan added in v0.2.1

func ExportPlan(name string) ([]byte, error)

ExportPlan export plan register in Global, return json bytes

func LoadPlanFromJson

func LoadPlanFromJson(name string, jsonData []byte, prebuilt []Node) error

LoadPlanFromJson load plan from json data name: name of plan to load jsonData: json data of plan prebuilt: prebuilt nodes, can be nil

func RegisterNodeBuilder

func RegisterNodeBuilder(name string, builder BuildNodeFunc)

RegisterNodeBuilder register node builder to Global

func RegisterPlan

func RegisterPlan(name string, plan *Plan) error

RegisterPlan register plan to Global

func UpdatePlan

func UpdatePlan(name string, update func(plan *Plan)) error

UpdatePlan update plan register in Global.

func WarmupPool added in v0.2.2

func WarmupPool(name string, size int)

WarmupPool warm up pool to avoid cold start name: plan name size: set size of worker buf queue

Types

type Base

type Base struct {
	NodeName string

	State State

	SubNodes []Node

	SubNodesMap map[string]Node
}

Base a simple impl of Node, Cluster, Stateful Embed it in custom node and override interface methods as needed

func (*Base) Bind

func (base *Base) Bind(state State)

func (*Base) Inject

func (base *Base) Inject(nodes []Node)

func (*Base) Name

func (base *Base) Name() string

func (*Base) Reset

func (base *Base) Reset()

func (*Base) ResetSubNodes

func (base *Base) ResetSubNodes()

func (*Base) Revert added in v0.2.1

func (base *Base) Revert(ctx context.Context)

func (*Base) Run

func (base *Base) Run(ctx context.Context)

func (*Base) SetName

func (base *Base) SetName(name string)

type BaseWrapper

type BaseWrapper struct {
	Target Node

	State State
}

func (*BaseWrapper) Bind

func (wrapper *BaseWrapper) Bind(state State)

func (*BaseWrapper) Name

func (wrapper *BaseWrapper) Name() string

func (*BaseWrapper) Reset

func (wrapper *BaseWrapper) Reset()

func (*BaseWrapper) Run

func (wrapper *BaseWrapper) Run(ctx context.Context)

func (*BaseWrapper) Wrap

func (wrapper *BaseWrapper) Wrap(target Node)

type BuildNodeFunc

type BuildNodeFunc func(name string, props Props) (Node, error)

type Cloneable

type Cloneable interface {
	Node

	// Clone self
	Clone() Node
}

Cloneable a class of nodes that can be cloned

type Cluster

type Cluster interface {
	Node

	// Inject deliver the sub-nodes, will be called when engine build the cluster
	Inject(nodes []Node)
}

Cluster a class of nodes that can contain other nodes

type EmptyProps

type EmptyProps struct{}

func (EmptyProps) Copy added in v0.2.1

func (props EmptyProps) Copy() Props

func (EmptyProps) Get

func (props EmptyProps) Get(key string) (value interface{}, exists bool)

func (EmptyProps) SubGet

func (props EmptyProps) SubGet(sub, key string) (value interface{}, exists bool)

type Engine

type Engine struct {
	StateBuilder func() State
	// contains filtered or unexported fields
}

func NewDefaultEngine

func NewDefaultEngine() *Engine

func (*Engine) ClearPool

func (engine *Engine) ClearPool(name string)

ClearPool clear worker pool of plan, invoke it to make plan effect immediately after update name: name of plan

func (*Engine) ExecPlan

func (engine *Engine) ExecPlan(name string, ctx context.Context) <-chan Output

ExecPlan exec plan register in engine

func (*Engine) ExportPlan added in v0.2.1

func (engine *Engine) ExportPlan(name string) ([]byte, error)

func (*Engine) LoadPlanFromJson

func (engine *Engine) LoadPlanFromJson(name string, jsonData []byte, prebuilt []Node) error

LoadPlanFromJson load plan from json data name: name of plan to load jsonData: json data of plan prebuilt: prebuilt nodes, can be nil

func (*Engine) RegisterNodeBuilder

func (engine *Engine) RegisterNodeBuilder(name string, builder BuildNodeFunc)

RegisterNodeBuilder register node builder to engine

func (*Engine) RegisterPlan

func (engine *Engine) RegisterPlan(name string, plan *Plan) error

RegisterPlan register plan to engine

func (*Engine) UpdatePlan

func (engine *Engine) UpdatePlan(name string, update func(plan *Plan)) error

UpdatePlan update plan register in engine

func (*Engine) WarmupPool added in v0.2.2

func (engine *Engine) WarmupPool(name string, size int)

WarmupPool warm up pool to avoid cold start name: plan name size: set size of worker buf queue

type GraphNode added in v0.1.2

type GraphNode struct {
	Node *JsonNode

	NextNodes []string
}

type JsonNode

type JsonNode struct {
	Name string

	Type string

	SubNodes []*JsonNode

	Wrappers []string

	ReUse bool
}

type JsonPlan

type JsonPlan struct {
	Props json.RawMessage

	Graph []GraphNode
}

type Node

type Node interface {
	Name() string

	// Run will be called when all deps solved or cluster invoke it
	Run(ctx context.Context)

	// Reset will be called when the node will no longer execute until the next execution plan
	Reset()
}

Node basic unit of execution

type Option

type Option func(*_DAG)

type Output

type Output struct {
	Err error

	State State
}

type Plan

type Plan struct {
	Props Props

	Prebuilt []Node

	Options []Option

	Strict bool
	// contains filtered or unexported fields
}

Plan explain how to execute nodes

func NewPlan

func NewPlan(props Props, prebuilt []Node, options ...Option) *Plan

NewPlan new a plan. props: build props of nodes. prebuilt: prebuilt nodes, reduce cost of build node, nil is fine. options: AddNodes, LinkNodes and so on.

func (*Plan) Init

func (plan *Plan) Init() error

Init Plan take effect only after initialization. if plan is invalid, such as circular dependencies, return error.

func (*Plan) MarshalJSON

func (plan *Plan) MarshalJSON() ([]byte, error)

func (*Plan) UnmarshalJSON

func (plan *Plan) UnmarshalJSON(bytes []byte) error

type Props

type Props interface {
	// Get return global value of the key
	Get(key string) (interface{}, bool)

	//SubGet node value of the key, deliver node name as sub
	SubGet(sub, key string) (interface{}, bool)

	// Copy safe use of copies
	Copy() Props
}

Props provide build parameters for the node builder

type Reversible added in v0.2.1

type Reversible interface {
	Node

	Revert(ctx context.Context)
}

Reversible a class of nodes that can be reverted

type StandardProps

type StandardProps map[string]interface{}

func (StandardProps) Copy added in v0.2.1

func (props StandardProps) Copy() Props

func (StandardProps) Get

func (props StandardProps) Get(key string) (value interface{}, exists bool)

func (StandardProps) MarshalJSON added in v0.2.1

func (props StandardProps) MarshalJSON() ([]byte, error)

func (StandardProps) SubGet

func (props StandardProps) SubGet(sub, key string) (value interface{}, exists bool)

type StandardState

type StandardState struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewStandardState

func NewStandardState() *StandardState

func (*StandardState) Query

func (state *StandardState) Query(key string) (value interface{}, exists bool)

func (*StandardState) Transform

func (state *StandardState) Transform(key string, transform TransformStateFunc)

func (*StandardState) Update

func (state *StandardState) Update(key string, value interface{})

type State

type State interface {
	// Query return value of the key
	Query(key string) (interface{}, bool)

	// Update set a new value for the key
	Update(key string, value interface{})

	// Transform set a new value for the key, according to the old value
	Transform(key string, transform TransformStateFunc)
}

State store state of nodes

type Stateful

type Stateful interface {
	Node

	// Bind deliver the state, should be called before engine run the node
	Bind(state State)
}

Stateful a class of nodes that need record or query state

type TransformStateFunc

type TransformStateFunc func(from interface{}) interface{}

type Wrapper

type Wrapper interface {
	Node

	Wrap(target Node)
}

Wrapper a class of nodes that can wrap other node

Directories

Path Synopsis
Package common implement some node, cluster, state for common usage
Package common implement some node, cluster, state for common usage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL