Documentation
¶
Index ¶
- Variables
- func ClearPool(name string)
- func ExecPlan(name string, ctx context.Context) <-chan Output
- func ExportPlan(name string) ([]byte, error)
- func LoadPlanFromJson(name string, jsonData []byte, prebuilt []Node) error
- func RegisterNodeBuilder(name string, builder BuildNodeFunc)
- func RegisterPlan(name string, plan *Plan) error
- func UpdatePlan(name string, update func(plan *Plan)) error
- func WarmupPool(name string, size int)
- type Base
- type BaseWrapper
- type BuildNodeFunc
- type Cloneable
- type Cluster
- type Edge
- type EmptyProps
- type Engine
- func (engine *Engine) ClearPool(name string)
- func (engine *Engine) ExecPlan(name string, ctx context.Context) <-chan Output
- func (engine *Engine) ExportPlan(name string) ([]byte, error)
- func (engine *Engine) LoadPlanFromJson(name string, jsonData []byte, prebuilt []Node) error
- func (engine *Engine) RegisterNodeBuilder(name string, builder BuildNodeFunc)
- func (engine *Engine) RegisterPlan(name string, plan *Plan) error
- func (engine *Engine) UpdatePlan(name string, update func(plan *Plan)) error
- func (engine *Engine) WarmupPool(name string, size int)
- type ExportableProps
- type GraphNode
- type Inspector
- type JsonNode
- type JsonPlan
- type Node
- type NodeInfo
- type Option
- type Output
- type Plan
- type PlanInfo
- type Props
- type Reversible
- type StandardProps
- type StandardState
- type State
- type Stateful
- type TransformStateFunc
- type VertexInfo
- type Wrapper
Constants ¶
This section is empty.
Variables ¶
var AddNodes = func(typ string, names ...string) Option {
return func(dag *_DAG) {
for _, name := range names {
if _, ok := dag.NodeRefs[name]; !ok {
dag.NodeRefs[name] = &_NodeRef{
NodeName: name,
NodeType: typ,
}
}
}
}
}
AddNodes add nodes. typ declare node type, names declare name of each one. node must be added before other options.
var Global = NewDefaultEngine()
var LinkNodes = func(nodes ...string) Option { return func(dag *_DAG) { if len(nodes) < 1 { return } for _, root := range nodes { if _, ok := dag.Vertexes[root]; !ok { if _, ok := dag.NodeRefs[root]; ok { dag.Vertexes[root] = &_Vertex{ RefRoot: dag.NodeRefs[root], } } else { dag.Warning = append(dag.Warning, fmt.Sprintf("link target node %s ref not found", root)) } } } if dag.Vertexes[nodes[0]] != nil { for _, node := range nodes[1:] { if dag.Vertexes[node] != nil { dag.Vertexes[nodes[0]].Next = append(dag.Vertexes[nodes[0]].Next, dag.Vertexes[node]) dag.Vertexes[node].Prev++ } } } } }
LinkNodes link first node with others. example: LinkNodes("A", "B", "C") => A -> B, A -> C.
var MergeNodes = func(cluster string, subNodes ...string) Option { return func(dag *_DAG) { if clusterRef, ok := dag.NodeRefs[cluster]; !ok { dag.Warning = append(dag.Warning, fmt.Sprintf("cluster %s ref not found", cluster)) return } else { for _, node := range subNodes { if _, ok := dag.NodeRefs[node]; ok { clusterRef.SubRefs = append(clusterRef.SubRefs, dag.NodeRefs[node]) } else { dag.Warning = append(dag.Warning, fmt.Sprintf("sub node %s ref not found", node)) } } } } }
MergeNodes merge other nodes as sub-node of the first node. example: MergeNodes("A", "B", "C"). if node "A" implement the Cluster interface, node "B" and "C" will be injected, then "A" could use "B" and "C" as sub-nodes.
var RLinkNodes = func(nodes ...string) Option { return func(dag *_DAG) { if len(nodes) < 1 { return } for _, root := range nodes { if _, ok := dag.Vertexes[root]; !ok { if _, ok := dag.NodeRefs[root]; ok { dag.Vertexes[root] = &_Vertex{ RefRoot: dag.NodeRefs[root], } } else { dag.Warning = append(dag.Warning, fmt.Sprintf("link target node %s ref not found", root)) } } } if dag.Vertexes[nodes[0]] != nil { for _, node := range nodes[1:] { if dag.Vertexes[node] != nil { dag.Vertexes[node].Next = append(dag.Vertexes[node].Next, dag.Vertexes[nodes[0]]) dag.Vertexes[nodes[0]].Prev++ } } } } }
RLinkNodes link first node with others. example: RLinkNodes("A", "B", "C") => B -> A, C -> A.
var ReUseNodes = func(nodes ...string) Option { return func(dag *_DAG) { for _, node := range nodes { if dag.NodeRefs[node] != nil { dag.NodeRefs[node].ReUse = true } } } }
ReUseNodes reuse node to avoid unnecessary rebuilds, fits nodes whose properties do not change and implements the clone method
var SLinkNodes = func(nodes ...string) Option { return func(dag *_DAG) { if len(nodes) < 1 { return } for _, root := range nodes { if _, ok := dag.Vertexes[root]; !ok { if _, ok := dag.NodeRefs[root]; ok { dag.Vertexes[root] = &_Vertex{ RefRoot: dag.NodeRefs[root], } } else { dag.Warning = append(dag.Warning, fmt.Sprintf("Slink target node %s ref not found", root)) } } } for i := range nodes { if i < len(nodes)-1 { prev, next := dag.Vertexes[nodes[i]], dag.Vertexes[nodes[i+1]] if prev != nil && next != nil { prev.Next = append(prev.Next, next) next.Prev++ } } } } }
SLinkNodes link nodes serially. example: SLinkNodes("A", "B", "C") => A -> B -> C.
var WrapAllNodes = func(wrappers ...string) Option { return func(dag *_DAG) { for _, wrapper := range wrappers { for _, ref := range dag.NodeRefs { if ref != nil { ref.Wrappers = append(ref.Wrappers, wrapper) } } } } }
WrapAllNodes wrap all nodes with single or multi wrappers, will only affect nodes added before this
var WrapNodes = func(wrapper string, targets ...string) Option { return func(dag *_DAG) { for _, target := range targets { if targetNodeRef := dag.NodeRefs[target]; targetNodeRef != nil { targetNodeRef.Wrappers = append(targetNodeRef.Wrappers, wrapper) } else { dag.Warning = append(dag.Warning, fmt.Sprintf("wrap target node %s ref not found", target)) } } } }
WrapNodes wrap node to enhance it, wrapper:node type which implement Wrapper, targets:wrap targets
Functions ¶
func ClearPool ¶
func ClearPool(name string)
ClearPool clear worker pool of plan, invoke it to make plan effect immediately after update name: name of plan
func ExportPlan ¶ added in v0.2.1
ExportPlan export plan register in Global, return json bytes
func LoadPlanFromJson ¶
LoadPlanFromJson load plan from json data name: name of plan to load jsonData: json data of plan prebuilt: prebuilt nodes, can be nil
func RegisterNodeBuilder ¶
func RegisterNodeBuilder(name string, builder BuildNodeFunc)
RegisterNodeBuilder register node builder to Global
func RegisterPlan ¶
RegisterPlan register plan to Global
func UpdatePlan ¶
UpdatePlan update plan register in Global.
func WarmupPool ¶ added in v0.2.2
WarmupPool warm up pool to avoid cold start name: plan name size: set size of worker buf queue
Types ¶
type Base ¶
Base a simple impl of Node, Cluster, Stateful Embed it in custom node and override interface methods as needed
func (*Base) ResetSubNodes ¶
func (base *Base) ResetSubNodes()
type BaseWrapper ¶
func (*BaseWrapper) Bind ¶
func (wrapper *BaseWrapper) Bind(state State)
func (*BaseWrapper) Name ¶
func (wrapper *BaseWrapper) Name() string
func (*BaseWrapper) Reset ¶
func (wrapper *BaseWrapper) Reset()
func (*BaseWrapper) Run ¶
func (wrapper *BaseWrapper) Run(ctx context.Context)
func (*BaseWrapper) Wrap ¶
func (wrapper *BaseWrapper) Wrap(target Node)
type Cluster ¶
type Cluster interface { Node // Inject deliver the sub-nodes, will be called when engine build the cluster Inject(nodes []Node) }
Cluster a class of nodes that can contain other nodes
type EmptyProps ¶
type EmptyProps struct{}
func (EmptyProps) Copy ¶ added in v0.2.1
func (props EmptyProps) Copy() Props
func (EmptyProps) Get ¶
func (props EmptyProps) Get(key string) (value interface{}, exists bool)
func (EmptyProps) SubGet ¶
func (props EmptyProps) SubGet(sub, key string) (value interface{}, exists bool)
type Engine ¶
type Engine struct { StateBuilder func() State // contains filtered or unexported fields }
func NewDefaultEngine ¶
func NewDefaultEngine() *Engine
func (*Engine) ClearPool ¶
ClearPool clear worker pool of plan, invoke it to make plan effect immediately after update name: name of plan
func (*Engine) ExportPlan ¶ added in v0.2.1
func (*Engine) LoadPlanFromJson ¶
LoadPlanFromJson load plan from json data name: name of plan to load jsonData: json data of plan prebuilt: prebuilt nodes, can be nil
func (*Engine) RegisterNodeBuilder ¶
func (engine *Engine) RegisterNodeBuilder(name string, builder BuildNodeFunc)
RegisterNodeBuilder register node builder to engine
func (*Engine) RegisterPlan ¶
RegisterPlan register plan to engine
func (*Engine) UpdatePlan ¶
UpdatePlan update plan register in engine
func (*Engine) WarmupPool ¶ added in v0.2.2
WarmupPool warm up pool to avoid cold start name: plan name size: set size of worker buf queue
type ExportableProps ¶ added in v0.2.3
type ExportableProps interface {
Raw() map[string]interface{}
}
ExportableProps can be used to lookup raw data and serialization
type Inspector ¶ added in v0.2.3
type Inspector struct {
// contains filtered or unexported fields
}
func (Inspector) DescribePlan ¶ added in v0.2.3
func (Inspector) GetNodeBuildersName ¶ added in v0.2.3
func (Inspector) GetPlansName ¶ added in v0.2.3
type JsonPlan ¶
type JsonPlan struct { Props json.RawMessage Graph []GraphNode }
type Node ¶
type Node interface { Name() string // Run will be called when all deps solved or cluster invoke it Run(ctx context.Context) // Reset will be called when the node will no longer execute until the next execution plan Reset() }
Node basic unit of execution
type Plan ¶
type Plan struct { Props Props Prebuilt []Node Options []Option Strict bool // contains filtered or unexported fields }
Plan explain how to execute nodes
func NewPlan ¶
NewPlan new a plan. props: build props of nodes. prebuilt: prebuilt nodes, reduce cost of build node, nil is fine. options: AddNodes, LinkNodes and so on.
func (*Plan) Init ¶
Init Plan take effect only after initialization. if plan is invalid, such as circular dependencies, return error.
func (*Plan) MarshalJSON ¶
func (*Plan) UnmarshalJSON ¶
type PlanInfo ¶ added in v0.2.3
type PlanInfo struct { Version string Vertexes []VertexInfo Edges []Edge GlobalProps map[string]interface{} }
type Props ¶
type Props interface { // Get return global value of the key Get(key string) (interface{}, bool) //SubGet node value of the key, deliver node name as sub SubGet(sub, key string) (interface{}, bool) // Copy safe use of copies Copy() Props }
Props provide build parameters for the node builder
type Reversible ¶ added in v0.2.1
Reversible a class of nodes that can be reverted
type StandardProps ¶
type StandardProps map[string]interface{}
func (StandardProps) Copy ¶ added in v0.2.1
func (props StandardProps) Copy() Props
func (StandardProps) Get ¶
func (props StandardProps) Get(key string) (value interface{}, exists bool)
func (StandardProps) Raw ¶ added in v0.2.3
func (props StandardProps) Raw() map[string]interface{}
func (StandardProps) SubGet ¶
func (props StandardProps) SubGet(sub, key string) (value interface{}, exists bool)
type StandardState ¶
func NewStandardState ¶
func NewStandardState() *StandardState
func (*StandardState) Query ¶
func (state *StandardState) Query(key string) (value interface{}, exists bool)
func (*StandardState) Transform ¶
func (state *StandardState) Transform(key string, transform TransformStateFunc)
func (*StandardState) Update ¶
func (state *StandardState) Update(key string, value interface{})
type State ¶
type State interface { // Query return value of the key Query(key string) (interface{}, bool) // Update set a new value for the key Update(key string, value interface{}) // Transform set a new value for the key, according to the old value Transform(key string, transform TransformStateFunc) }
State store state of nodes
type Stateful ¶
type Stateful interface { Node // Bind deliver the state, should be called before engine run the node Bind(state State) }
Stateful a class of nodes that need record or query state
type TransformStateFunc ¶
type TransformStateFunc func(from interface{}) interface{}