nodegraphflow

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2022 License: LGPL-2.1 Imports: 3 Imported by: 0

README

node-graph-flow codecov GoDoc

node graph flow

Minimal frame for node graph based task processing.

Disclaimer:

  • Each individual flow can be executed concurrently on separate goroutines.

  • At node level, all nodes of an individual graph should execute on the same goroutine (synchronous).


My intended purpose is to use this with https://github.com/WolvenSpirit/postgres-queue but it is just a few lines of code to provide a frame or example for any sort of node based separation of steps that together can define a task flow.


Why define a generic task or a handler like this?

In Go, handling errors always creates branches within a program (like in most languages), this might make things hard to debug later on.

In contrast, what others consider good code, slim functions with a single purpose etc. does tend to just wrap to many things and makes another mess because things now at top level are just some custom named wrappers that might not even be in the same style within a program.

This brings more confusion.

This package arguably is one way to solve the problems and provides error handling with errors that are recorded throughout the task execution.


TODO:

✅ More tests.

✅ Now makes full use of Go Generics (go 1.18)

𐄂 Provide clean-up (like func onFinish() { clean(*n) } ) hooks on each node. No matter if the node fails or succeeds, a custom clean-up function can run that might perform additional checks or store metrics regarding the outcome.


Examples for using this package can be found in the examples folder.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BindNodes

func BindNodes[T interface{}](parent *Node[T], siblings ...*Node[T])

BindNodes links the parent to the sub nodes and each sub node laterally to each other.

func BuildChain

func BuildChain[T interface{}](stopChain *chan int, nodes ...*Node[T])

BuildChain will bind nodes into a chain which just means that the last node will have the first node as a sibling. Does not bind any siblings and no siblings should be set manually. The usecase for this would be running an refresh loop that has a lifetime which usually would equal that of the program.

func Flow

func Flow[T interface{}](ctx *FlowContext, n *Node[T], i T, SubNodeIndex int, LateralNodeIndex int, err error)

Flow initiates each node sequentially from the start node downstream to all sub nodes. If a parent has more than one sub node, the higher index nodes are fallback nodes. Should the first of the siblings fail, the next lateral node will execute from the siblings slice. If all nodes from a level error out then the context of the flow will be canceled.

func StartFlow

func StartFlow[T interface{}](ctx *FlowContext, n *Node[T])

StartFlow is an alias for calling Flow with the arguments as Flow(ctx, n, nil, 0, 0, nil)

Types

type AbortError

type AbortError struct {
	Message string
}

func (AbortError) Error

func (err AbortError) Error() string

type CircularNodePolicy

type CircularNodePolicy struct {
	Timeout            time.Duration
	RequiredForSuccess bool
	RestartOnError     bool
	IsCircularNode     bool
	StopChain          *chan int
}

CircularNodePolicy enforces a shutdown procedure for the chain iteration

type FlowContext

type FlowContext struct {
	Ctx    context.Context
	Cancel context.CancelFunc
}

Initialized context and cancel func need to be populated

func (*FlowContext) Init

func (c *FlowContext) Init()

func (*FlowContext) IsCanceled

func (c *FlowContext) IsCanceled() (bool, error)

type Input

type Input interface{}

type Node

type Node[T interface{}] struct {
	Name               string                           // Name of the node
	ParentNode         *Node[T]                         // Parent node
	SubNodes           []*Node[T]                       // Children nodes
	Siblings           []*Node[T]                       // Lateral nodes
	Task               func(*FlowContext, T) (T, error) // Task that should be processed
	Input              T                                // Input payload, nil if starting node
	Output             T                                // Output payload
	FlowTrail          []string                         // The order in which nodes were executed
	NodeTrail          NodeTrail                        // Meta data populated after node processing finishes
	Context            *FlowContext                     // Pointer to the flow context
	CircularNodePolicy CircularNodePolicy               // Policy for circular nodes
}

func (*Node[Input]) SetInput

func (n *Node[Input]) SetInput(i Input)

func (*Node[Output]) SetOutput

func (n *Node[Output]) SetOutput(o Output)

type NodeTrail

type NodeTrail struct {
	NodeName   string
	StartedAt  time.Time
	FinishedAt time.Time
	NodeError  error
}

type Output

type Output interface{}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL