light_flow

package module
v1.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2023 License: MIT Imports: 11 Imported by: 0

README

light-flow

Introduction

light-flow is a task arrange framework.

Designed to provide the most efficient orchestration and execution strategy for tasks with dependencies.

Features

Efficient Task Planning: The framework allows you to define task dependencies, enabling the efficient planning and sequencing of steps based on their relationships

Context Connect And Isolation: Tasks can only access to the context of dependent tasks up to the root task. Modifications to the context by the current task will not affect disconnected tasks.

Rich Test Case: Test cases cover every public API and most scenarios.

Minimal Code Requirement: With this framework, you only need to write a minimal amount of code to define and execute your tasks.

Task Dependency Visualization:The framework provides a draw plugin to visualize the dependencies between tasks.

Installation

Install the framework by running the following command:

go get gitee.com/MetaphysicCoding/light-flow

Draw your flow

Step 1: Define Step Functions

Define the step functions that will be executed in the workflow.

Each step function should have a specific signature and return result and errors.

import (
	"fmt"
	flow "gitee.com/MetaphysicCoding/light-flow"
	"time"
)

func Step1(ctx *flow.Context) (any, error) {
    // If we have previously set the 'name' in input
    // then we can retrieve the 'name'
	name, exist := ctx.Get("name")
	if exist {
		fmt.Printf("step1 get 'name' from ctx: %s \n", name.(string))
	}
	ctx.Set("age", 18)
	return "finish", nil
}

func Step2(ctx *flow.Context) (any, error) {
    // If we have previously set the 'name'in the dependent steps, 
    // then we can retrieve the 'name'
	age, exist := ctx.Get("age")
	if exist {
		fmt.Printf("step2 get 'age' from ctx: %d \n", age.(int))
	}
	result, exist := ctx.GetStepResult("Step1")
	if exist {
		fmt.Printf("step2 get result from step1: %s \n", result.(string))
	}
	return nil, nil
}
Step 2: Register Work-Flow And Process

**After register, you could add current process in another work-flow. **

And you cloud merge another process into current process, usage see in advance usage

func init() {
	workflow := flow.RegisterFlow("MyFlow")
     // Processes of workflow are parallel
	process := workflow.AddProcess("MyProcess")
}
Step 3: Add Step And Define Dependencies
func init() {
	...
	process := workflow.AddProcess("MyProcess")
    // AddStep automatically uses "Step1" as step name
	process.AddStep(Step1)
    // AddStepWithAlias use alias "Step2" as the step name
    // Identify Step 1 as a dependency of Step 2
	process.AddStepWithAlias("Step2", Step2, Step1)
}
Step 4: Run Work-Flow
func main() {
    // Done flow run work-flow and block until all processes are completed.
    // If you want to run  asynchronously, you could use AsyncFlow in stead
    // AsyncFlow can stop and pause the workflow or process.
	features := flow.DoneFlow("MyFlow", map[string]any{"name": "foo"})
}
Step 5: Get Execute ResultI By Features
func main() {
	result := flow.DoneFlow("MyFlow", map[string]any{"name": "foo"})
	if result.Success() {
		return
	}
	for processName, feature := range result.Features() {
        // Exceptions may include Timeout、Panic、Error
		fmt.Printf("process [%s] failed exceptions: %v \n", processName, feature.Exceptions())
	}
}
Complete Example
package main

import (
	"fmt"
	flow "gitee.com/MetaphysicCoding/light-flow"
)

func Step1(ctx *flow.Context) (any, error) {
	name, exist := ctx.Get("name")
	if exist {
		fmt.Printf("step1 get 'name' from ctx: %s \n", name.(string))
	}
	ctx.Set("age", 18)
	return "finish", nil
}

func Step2(ctx *flow.Context) (any, error) {
	age, exist := ctx.Get("age")
	if exist {
		fmt.Printf("step2 get 'age' from ctx: %d \n", age.(int))
	}
	result, exist := ctx.GetStepResult("Step1")
	if exist {
		fmt.Printf("step2 get result from step1: %s \n", result.(string))
	}
	return nil, nil
}

func init() {
	workflow := flow.RegisterFlow("MyFlow")
	process := workflow.AddProcess("MyProcess")
	process.AddStep(Step1)
	process.AddStepWithAlias("Step2", Step2, Step1)
}

func main() {
	result := flow.DoneFlow("MyFlow", map[string]any{"name": "foo"})
	if result.Success() {
		return
	}
	for processName, feature := range result.Features() {
		fmt.Printf("process [%s] failed exceptions: %v \n", processName, feature.Exceptions())
	}
}

Advance Usage

Depends
  1. Depends indicates that the current step will not be executed until all dependent steps are completed.
  2. flow.Context use adjacency list as data structure. The current step can only access context from dependent steps up to the root step.
Context Connect And Isolation

We define dependencies like flow code.

workflow := flow.RegisterFlow("MyFlow")
process := workflow.AddProcess("MyProcess")
process.AddStep(TaskA)
process.AddStep(TaskB, TaskA)
process.AddStep(TaskC, TaskA)
process.AddStep(TaskD, TaskB, TaskC)
process.AddStep(TaskE, TaskC)

The dependency relationship is shown in the figure

Relation

TaskD can access the context of TaskB,TaskC,TaskA.

TaskE can access the context of TaskC,TaskA, but TaskE can't access the context of TaskB.

Note:  
Key first match in its own context, then matched in parents context, 
finally matched in global contxt.

You can use AddPriority to break the order 
Process Reuse And Merge

You can add a registered process to the work-flow and merge a registered process into the current process.

workflow := flow.RegisterFlow("MyFlow")
// Add a registered process called 'RegisterProcess' to the current workflow.
workflow.AddRegisterProcess("RegisterProcess")

process := workflow.AddProcess("MyProcess")
// Merge process called 'AnotherProcess' to the current process.
process.Merge("AnotherProcess")
// You can merge a process to the current process at any position, 
// and you can still add your own steps both before and after the merge operation.
process.AddStep(TaskA)

Merge will eliminates duplicate steps and merges the dependencies of duplicate steps after deduplication.

If merge creates a cycle, then Merge method will panic and it indicates which two steps form the cycle.

Documentation

Index

Constants

View Source
const (
	InternalPrefix = "::"
	WorkflowCtx    = "Flow::"
	ProcessCtx     = "Proc::"
)

these constants are used to indicate the scope of the context

View Source
const (
	Before = "Before"
	After  = "After"
)

Variables

View Source
var (
	End     = &StatusEnum{0b1, "End"}
	Head    = &StatusEnum{0b1 << 1, "Head"}
	HasNext = &StatusEnum{0b1 << 2, "HasNext"}
	Merged  = &StatusEnum{0b1 << 3, "Merged"}
)

these constants are used to indicate the position of the process

View Source
var (
	Pending    = &StatusEnum{0, "Pending"}
	Running    = &StatusEnum{0b1, "Running"}
	Pause      = &StatusEnum{0b1 << 1, "Pause"}
	Success    = &StatusEnum{0b1 << 15, "Success"}
	NormalMask = &StatusEnum{0b1<<16 - 1, "NormalMask"}
	Cancel     = &StatusEnum{0b1 << 16, "Cancel"}
	Timeout    = &StatusEnum{0b1 << 17, "Timeout"}
	Panic      = &StatusEnum{0b1 << 18, "Panic"}
	Error      = &StatusEnum{0b1 << 19, "Error"}
	Stop       = &StatusEnum{0b1 << 20, "Stop"}
	Failed     = &StatusEnum{0b1 << 31, "Failed"}
	// AbnormalMask An abnormal step status will cause the cancellation of dependent unexecuted steps.
	AbnormalMask = &StatusEnum{NormalMask.flag << 16, "AbnormalMask"}
)

these variable are used to indicate the status of the unit

Functions

func CopyProperties

func CopyProperties(src, dst interface{})

func CopyPropertiesSkipNotEmpty added in v1.10.0

func CopyPropertiesSkipNotEmpty(src, dst interface{})

func CreateStruct

func CreateStruct[T any](src any) (target T)

func GetFuncName

func GetFuncName(f interface{}) string

GetFuncName function retrieves the stepName of a provided function. If the provided function is anonymous function, it panics.

func GetStructName added in v1.9.0

func GetStructName(obj any) string

func NewProcessConfig added in v1.11.0

func NewProcessConfig() *processConfig

func SetIdGenerator added in v1.7.1

func SetIdGenerator(method func() string)

Types

type BasicInfo added in v1.10.0

type BasicInfo struct {
	*Status
	Id   string
	Name string
}

func (*BasicInfo) GetId added in v1.10.0

func (bi *BasicInfo) GetId() string

func (*BasicInfo) GetName added in v1.10.0

func (bi *BasicInfo) GetName() string

type BasicInfoI added in v1.11.0

type BasicInfoI interface {
	StatusI

	GetId() string
	GetName() string
	// contains filtered or unexported methods
}

type Callback added in v1.10.0

type Callback[T BasicInfoI] struct {
	// contains filtered or unexported fields
}

func (*Callback[T]) Exclude added in v1.10.0

func (c *Callback[T]) Exclude(status ...*StatusEnum) *Callback[T]

func (*Callback[T]) NotFor added in v1.10.0

func (c *Callback[T]) NotFor(name ...string) *Callback[T]

func (*Callback[T]) OnlyFor added in v1.10.0

func (c *Callback[T]) OnlyFor(name ...string) *Callback[T]

func (*Callback[T]) When added in v1.10.0

func (c *Callback[T]) When(status ...*StatusEnum) *Callback[T]

type CallbackChain added in v1.10.0

type CallbackChain[T BasicInfoI] struct {
	// contains filtered or unexported fields
}

func (*CallbackChain[T]) AddCallback added in v1.10.0

func (cc *CallbackChain[T]) AddCallback(flag string, must bool, run func(info T) (bool, error)) *Callback[T]

func (*CallbackChain[T]) CopyChain added in v1.10.0

func (cc *CallbackChain[T]) CopyChain() []*Callback[T]

type Configuration added in v1.10.0

type Configuration struct {
	*FlowConfig
	// contains filtered or unexported fields
}

func CreateDefaultConfig added in v1.11.0

func CreateDefaultConfig() *Configuration

func (Configuration) AddAfterProcess added in v1.10.0

func (pc Configuration) AddAfterProcess(must bool, callback func(*ProcessInfo) (keepOn bool, err error)) *Callback[*ProcessInfo]

func (Configuration) AddAfterStep added in v1.10.0

func (pc Configuration) AddAfterStep(must bool, callback func(*StepInfo) (keepOn bool, err error)) *Callback[*StepInfo]

func (Configuration) AddBeforeProcess added in v1.10.0

func (pc Configuration) AddBeforeProcess(must bool, callback func(*ProcessInfo) (keepOn bool, err error)) *Callback[*ProcessInfo]

func (Configuration) AddBeforeStep added in v1.10.0

func (pc Configuration) AddBeforeStep(must bool, callback func(*StepInfo) (keepOn bool, err error)) *Callback[*StepInfo]

func (Configuration) AddStepRetry added in v1.11.0

func (pc Configuration) AddStepRetry(retry int) *processConfig

func (Configuration) AddStepTimeout added in v1.11.0

func (pc Configuration) AddStepTimeout(timeout time.Duration) *processConfig

type Context

type Context struct {
	// contains filtered or unexported fields
}

func (*Context) Exposed added in v1.9.0

func (ctx *Context) Exposed(key string, value any)

Exposed method exposes a key-value pair to the scope, so that units within the scope (steps in the process) can access it.

func (*Context) Get

func (ctx *Context) Get(key string) (any, bool)

Get method retrieves the value associated with the given key from the context path. The method first checks the priority context, then own context, finally parents context. Returns the value associated with the key (if found) and a boolean indicating its presence.

func (*Context) GetAll added in v1.10.0

func (ctx *Context) GetAll(key string) map[string]any

func (*Context) GetCtxName added in v1.11.0

func (ctx *Context) GetCtxName() string

func (*Context) GetStepResult

func (ctx *Context) GetStepResult(name string) (any, bool)

GetStepResult method retrieves the result of a step's execution. Each time a step is executed, its execution result is saved in the context of the process. Use this method to retrieve the execution result of a step.

func (*Context) Set

func (ctx *Context) Set(key string, value any)

Set method sets the value associated with the given key in own context.

type Controller added in v1.9.0

type Controller interface {
	Resume()
	Pause()
	Stop()
}

type Feature

type Feature struct {
	*Status
	*BasicInfo
	// contains filtered or unexported fields
}

func (*Feature) Done added in v1.7.1

func (f *Feature) Done()

Done method waits for the corresponding process to complete.

type FlowConfig added in v1.10.0

type FlowConfig struct {
	*CallbackChain[*FlowInfo]
}

func (*FlowConfig) AddAfterFlow added in v1.10.0

func (fc *FlowConfig) AddAfterFlow(must bool, callback func(*FlowInfo) (keepOn bool, err error)) *Callback[*FlowInfo]

func (*FlowConfig) AddBeforeFlow added in v1.10.0

func (fc *FlowConfig) AddBeforeFlow(must bool, callback func(*FlowInfo) (keepOn bool, err error)) *Callback[*FlowInfo]

type FlowController added in v1.10.0

type FlowController interface {
	Controller
	ResultI
	Done() map[string]*Feature
	ListProcess() []string
	ProcessController(name string) Controller
}

func AsyncArgs added in v1.9.0

func AsyncArgs(name string, args ...any) FlowController

func AsyncFlow added in v1.9.0

func AsyncFlow(name string, input map[string]any) FlowController

type FlowInfo added in v1.10.0

type FlowInfo struct {
	*BasicInfo
	*Context
}

type FlowMeta added in v1.9.0

type FlowMeta struct {
	*FlowConfig
	// contains filtered or unexported fields
}

func RegisterFlow added in v1.9.0

func RegisterFlow(name string) *FlowMeta

func (*FlowMeta) AddProcess added in v1.9.0

func (fm *FlowMeta) AddProcess(name string) *ProcessMeta

func (*FlowMeta) AddProcessWithConf added in v1.10.0

func (fm *FlowMeta) AddProcessWithConf(name string, conf *processConfig) *ProcessMeta

func (*FlowMeta) AddRegisterProcess added in v1.9.0

func (fm *FlowMeta) AddRegisterProcess(name string)

func (*FlowMeta) BuildRunFlow added in v1.10.0

func (fm *FlowMeta) BuildRunFlow(input map[string]any) *RunFlow

func (*FlowMeta) NotUseDefault added in v1.10.0

func (fm *FlowMeta) NotUseDefault() *FlowMeta

type ProcessInfo

type ProcessInfo struct {
	*BasicInfo
	*Context
	FlowId string
}

type ProcessMeta added in v1.9.0

type ProcessMeta struct {
	// contains filtered or unexported fields
}

func (ProcessMeta) AddAfterProcess added in v1.11.0

func (pc ProcessMeta) AddAfterProcess(must bool, callback func(*ProcessInfo) (keepOn bool, err error)) *Callback[*ProcessInfo]

func (ProcessMeta) AddAfterStep added in v1.11.0

func (pc ProcessMeta) AddAfterStep(must bool, callback func(*StepInfo) (keepOn bool, err error)) *Callback[*StepInfo]

func (ProcessMeta) AddBeforeProcess added in v1.11.0

func (pc ProcessMeta) AddBeforeProcess(must bool, callback func(*ProcessInfo) (keepOn bool, err error)) *Callback[*ProcessInfo]

func (ProcessMeta) AddBeforeStep added in v1.11.0

func (pc ProcessMeta) AddBeforeStep(must bool, callback func(*StepInfo) (keepOn bool, err error)) *Callback[*StepInfo]

func (*ProcessMeta) AddStep added in v1.9.0

func (pm *ProcessMeta) AddStep(run func(ctx *Context) (any, error), depends ...any) *StepMeta

func (ProcessMeta) AddStepRetry added in v1.11.0

func (pc ProcessMeta) AddStepRetry(retry int) *processConfig

func (ProcessMeta) AddStepTimeout added in v1.11.0

func (pc ProcessMeta) AddStepTimeout(timeout time.Duration) *processConfig

func (*ProcessMeta) AddStepWithAlias added in v1.9.0

func (pm *ProcessMeta) AddStepWithAlias(alias string, run func(ctx *Context) (any, error), depends ...any) *StepMeta

func (*ProcessMeta) AddWaitAll added in v1.9.0

func (pm *ProcessMeta) AddWaitAll(alias string, run func(ctx *Context) (any, error)) *StepMeta

func (*ProcessMeta) AddWaitBefore added in v1.9.0

func (pm *ProcessMeta) AddWaitBefore(alias string, run func(ctx *Context) (any, error)) *StepMeta

AddWaitBefore method treats the last added step as a dependency

func (*ProcessMeta) Merge added in v1.9.0

func (pm *ProcessMeta) Merge(name string)

func (*ProcessMeta) NotUseDefault added in v1.10.0

func (pm *ProcessMeta) NotUseDefault()

type ResultI added in v1.10.0

type ResultI interface {
	BasicInfoI
	Features() map[string]*Feature
	FailFeatures() map[string]*Feature
}

func DoneArgs added in v1.9.0

func DoneArgs(name string, args ...any) ResultI

func DoneFlow added in v1.9.0

func DoneFlow(name string, input map[string]any) ResultI

type RunFlow added in v1.9.0

type RunFlow struct {
	*FlowMeta
	*BasicInfo
	*Context
	// contains filtered or unexported fields
}

func BuildRunFlow added in v1.10.0

func BuildRunFlow(name string, input map[string]any) *RunFlow

func (*RunFlow) Done added in v1.9.0

func (rf *RunFlow) Done() map[string]*Feature

Done function will block util all process done.

func (*RunFlow) FailFeatures added in v1.10.0

func (rf *RunFlow) FailFeatures() map[string]*Feature

func (*RunFlow) Features added in v1.10.0

func (rf *RunFlow) Features() map[string]*Feature

func (*RunFlow) Flow added in v1.10.0

func (rf *RunFlow) Flow() map[string]*Feature

Flow function asynchronous execute process of workflow and return immediately.

func (*RunFlow) ListProcess added in v1.9.0

func (rf *RunFlow) ListProcess() []string

func (*RunFlow) Pause added in v1.9.0

func (rf *RunFlow) Pause()

func (*RunFlow) ProcessController added in v1.10.0

func (rf *RunFlow) ProcessController(name string) Controller

func (*RunFlow) Resume added in v1.9.0

func (rf *RunFlow) Resume()

func (*RunFlow) SkipFinishedStep added in v1.9.0

func (rf *RunFlow) SkipFinishedStep(name string, result any) error

func (*RunFlow) Stop added in v1.9.0

func (rf *RunFlow) Stop()

type RunProcess added in v1.9.0

type RunProcess struct {
	*ProcessMeta
	*Context
	*Status
	// contains filtered or unexported fields
}

func (RunProcess) AddAfterProcess added in v1.11.0

func (pc RunProcess) AddAfterProcess(must bool, callback func(*ProcessInfo) (keepOn bool, err error)) *Callback[*ProcessInfo]

func (RunProcess) AddAfterStep added in v1.11.0

func (pc RunProcess) AddAfterStep(must bool, callback func(*StepInfo) (keepOn bool, err error)) *Callback[*StepInfo]

func (RunProcess) AddBeforeProcess added in v1.11.0

func (pc RunProcess) AddBeforeProcess(must bool, callback func(*ProcessInfo) (keepOn bool, err error)) *Callback[*ProcessInfo]

func (RunProcess) AddBeforeStep added in v1.11.0

func (pc RunProcess) AddBeforeStep(must bool, callback func(*StepInfo) (keepOn bool, err error)) *Callback[*StepInfo]

func (RunProcess) AddStepRetry added in v1.11.0

func (pc RunProcess) AddStepRetry(retry int) *processConfig

func (RunProcess) AddStepTimeout added in v1.11.0

func (pc RunProcess) AddStepTimeout(timeout time.Duration) *processConfig

func (*RunProcess) Pause added in v1.9.0

func (rp *RunProcess) Pause()

func (*RunProcess) Resume added in v1.9.0

func (rp *RunProcess) Resume()

func (*RunProcess) SkipFinishedStep added in v1.9.0

func (rp *RunProcess) SkipFinishedStep(name string, result any)

func (*RunProcess) Stop added in v1.9.0

func (rp *RunProcess) Stop()

type RunStep added in v1.9.0

type RunStep struct {
	*StepMeta
	*Context
	*Status

	Start time.Time
	End   time.Time
	Err   error
	// contains filtered or unexported fields
}

type Set added in v1.9.0

type Set[T comparable] struct {
	// contains filtered or unexported fields
}

func CreateFromSliceFunc added in v1.9.0

func CreateFromSliceFunc[T any, K comparable](src []T, transfer func(T) K) *Set[K]

func NewRoutineUnsafeSet added in v1.9.0

func NewRoutineUnsafeSet[T comparable]() *Set[T]

func (*Set[T]) Add added in v1.9.0

func (s *Set[T]) Add(item T)

func (*Set[T]) Contains added in v1.9.0

func (s *Set[T]) Contains(item T) bool

func (*Set[T]) Remove added in v1.9.0

func (s *Set[T]) Remove(item T)

func (*Set[T]) Size added in v1.9.0

func (s *Set[T]) Size() int

func (*Set[T]) Slice added in v1.10.0

func (s *Set[T]) Slice() []T

type Status added in v1.10.0

type Status int64

func (*Status) Append added in v1.11.0

func (s *Status) Append(enum *StatusEnum) bool

func (*Status) Contain added in v1.10.0

func (s *Status) Contain(enum *StatusEnum) bool

func (*Status) Exceptions added in v1.10.0

func (s *Status) Exceptions() []string

Exceptions return contain exception's message

func (*Status) ExplainStatus added in v1.10.0

func (s *Status) ExplainStatus() []string

ExplainStatus function explains the status represented by the provided bitmask. The function checks the status against predefined abnormal and normal flags, and returns a slice of strings containing the names of the matching flags. Parameter status is the bitmask representing the status. The returned slice contains the names of the matching flags in the layer they were found. If abnormal flags are found, normal flags will be ignored.

func (*Status) Normal added in v1.10.0

func (s *Status) Normal() bool

Normal return true if not exception occur

func (*Status) Pop added in v1.10.0

func (s *Status) Pop(enum *StatusEnum) bool

Pop function pops a status bit from the specified address. The function checks if the specified status bit exists in the current value. If it exists, it removes the status bit, and returns true indicating successful removal of the status bit. Otherwise, it returns false.

func (*Status) Success added in v1.10.0

func (s *Status) Success() bool

Success return true if finish running and success

type StatusEnum added in v1.10.0

type StatusEnum struct {
	// contains filtered or unexported fields
}

func (*StatusEnum) Contained added in v1.10.0

func (s *StatusEnum) Contained(explain ...string) bool

func (*StatusEnum) Message added in v1.10.0

func (s *StatusEnum) Message() string

type StatusI added in v1.10.0

type StatusI interface {
	Contain(enum *StatusEnum) bool
	Success() bool
	Exceptions() []string
}

type StepConfig

type StepConfig struct {
	StepTimeout time.Duration
	StepRetry   int
}

type StepInfo

type StepInfo struct {
	*BasicInfo
	*Context
	ProcessId string
	FlowId    string
	Prev      map[string]string // prev step stepName to step id
	Next      map[string]string // next step stepName to step id
	Start     time.Time
	End       time.Time
	Err       error
}

func (*StepInfo) Error added in v1.10.0

func (si *StepInfo) Error() error

type StepMeta added in v1.9.0

type StepMeta struct {
	*Status
	*StepConfig
	// contains filtered or unexported fields
}

func (*StepMeta) AddConfig added in v1.9.0

func (meta *StepMeta) AddConfig(config *StepConfig)

AddConfig allow step not using process's config

func (*StepMeta) AddPriority added in v1.9.0

func (meta *StepMeta) AddPriority(priority map[string]any)

func (*StepMeta) CopyDepends added in v1.9.0

func (meta *StepMeta) CopyDepends(src ...any)

Directories

Path Synopsis
core module
graphviz module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL