Documentation ¶
Index ¶
- Variables
- type DefaultNoPool
- type DefaultPool
- type DependencyExpression
- func MakeAndExpr(Expr1 DependencyExpression, Expr2 DependencyExpression) DependencyExpression
- func MakeNotExpr(Expr DependencyExpression) DependencyExpression
- func MakeOrExpr(Expr1 DependencyExpression, Expr2 DependencyExpression) DependencyExpression
- func MakeXorExpr(Expr1 DependencyExpression, Expr2 DependencyExpression) DependencyExpression
- type ErrAborted
- type ErrCancelled
- type ErrLoopDependency
- type ErrNoTermination
- type ErrPoolUnsupport
- type ErrSilentFail
- type ErrorMessage
- type Executor
- func (e *Executor) DependencyExpr() DependencyExpression
- func (e *Executor) Name() string
- func (e *Executor) NewDependencyExpr(d *Executor) DependencyExpression
- func (e *Executor) SetDependency(Expr DependencyExpression) *Executor
- func (e *Executor) SetUndoFunc(undo func(args map[string]interface{}) error, skipError bool) *Executor
- type GoroutinePool
- type State
- type StateMessage
- type TCController
- func (m *TCController) AddTask(name string, f func(args map[string]interface{}) (interface{}, error), ...) *Executor
- func (m *TCController) BatchRun() (map[string]interface{}, error)
- func (m *TCController) NewTerminationExpr(d *Executor) DependencyExpression
- func (m *TCController) PoolRun(pool GoroutinePool) (map[string]interface{}, error)
- func (m *TCController) SetTermination(Expr DependencyExpression)
- func (m *TCController) String() string
- func (m *TCController) TerminationExpr() DependencyExpression
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultFalseExpr = DependencyExpression{ // contains filtered or unexported fields }
default dependency expression: always return false
var DefaultTrueExpr = DependencyExpression{ // contains filtered or unexported fields }
default dependency expression: always return true
var EmptyUndoFunc = func(args map[string]interface{}) error { return nil }
Default undo function
Functions ¶
This section is empty.
Types ¶
type DefaultNoPool ¶ added in v1.3.0
type DefaultNoPool struct{}
Default coroutine pool: actually not a coroutine pool but only launch new goroutines.
func (DefaultNoPool) Go ¶ added in v1.3.0
func (DefaultNoPool) Go(task func()) error
type DefaultPool ¶ added in v1.3.0
type DefaultPool struct {
// contains filtered or unexported fields
}
Default coroutine pool: base on ants pool.
func NewDefaultPool ¶ added in v1.3.0
func NewDefaultPool(size int) DefaultPool
Create a default coroutine pool with `size`.
func (DefaultPool) Close ¶ added in v1.3.0
func (p DefaultPool) Close()
func (DefaultPool) Go ¶ added in v1.3.0
func (p DefaultPool) Go(task func()) error
type DependencyExpression ¶
type DependencyExpression struct {
// contains filtered or unexported fields
}
A dependency expression is a filter to describe the tasks' dependency A task will be launched only if the expression is true.
func MakeAndExpr ¶
func MakeAndExpr(Expr1 DependencyExpression, Expr2 DependencyExpression) DependencyExpression
func MakeNotExpr ¶
func MakeNotExpr(Expr DependencyExpression) DependencyExpression
func MakeOrExpr ¶
func MakeOrExpr(Expr1 DependencyExpression, Expr2 DependencyExpression) DependencyExpression
func MakeXorExpr ¶
func MakeXorExpr(Expr1 DependencyExpression, Expr2 DependencyExpression) DependencyExpression
type ErrAborted ¶
type ErrAborted struct { TaskErrors []*ErrorMessage UndoErrors []*ErrorMessage Cancelled []*StateMessage }
It means some fatal errors occur so the execution failed. It consists of multiple errors. TaskErrors: errors from task running. UndoErrors: errors from undo function running. Cancelled: running but cancelled tasks.
func (ErrAborted) Error ¶
func (e ErrAborted) Error() string
type ErrCancelled ¶ added in v1.1.0
type ErrCancelled struct {
State State
}
It means the task is cancelled by the controller because the some other task return a fatal error. If a task is aborted with args["CANCEL"].(context.Context).done(), it should return ErrCancelled. And its running state can be set into the error. The task and its state will be put into Cancelled list.
func (ErrCancelled) Error ¶ added in v1.1.0
func (e ErrCancelled) Error() string
type ErrLoopDependency ¶ added in v1.2.0
type ErrLoopDependency struct {
State State
}
It means there is loop dependency among the tasks.
func (ErrLoopDependency) Error ¶ added in v1.2.0
func (e ErrLoopDependency) Error() string
type ErrNoTermination ¶
type ErrNoTermination struct{}
It means the controller's termination condition haven't been set.
func (ErrNoTermination) Error ¶
func (ErrNoTermination) Error() string
type ErrPoolUnsupport ¶ added in v1.3.0
type ErrPoolUnsupport struct{}
It means the controller doesn't support PoolRun() because not all dependency expressions are `AND`.
func (ErrPoolUnsupport) Error ¶ added in v1.3.0
func (ErrPoolUnsupport) Error() string
type ErrSilentFail ¶ added in v1.2.0
type ErrSilentFail struct{}
It means the task failed, but you don't want the tasks execution aborted. The failed task will be put into TaskErrors if the tasks execution finally failed.
func (ErrSilentFail) Error ¶ added in v1.2.0
func (e ErrSilentFail) Error() string
type ErrorMessage ¶
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func (*Executor) DependencyExpr ¶
func (e *Executor) DependencyExpr() DependencyExpression
Get dependency expression of the executor.
func (*Executor) NewDependencyExpr ¶
func (e *Executor) NewDependencyExpr(d *Executor) DependencyExpression
Create a dependency expression for the executor. It means the task launching may depend on executor `d`.
func (*Executor) SetDependency ¶
func (e *Executor) SetDependency(Expr DependencyExpression) *Executor
Set dependency expression for the executor. `Expr` is a dependency expression.
type GoroutinePool ¶ added in v1.3.0
type GoroutinePool interface {
Go(task func()) error
}
Goroutine pool interface. It should be blocked until worker available.
type StateMessage ¶ added in v1.1.0
type TCController ¶
type TCController struct {
// contains filtered or unexported fields
}
Task Concurrency Controller
func NewTCController ¶
func NewTCController() *TCController
Create an empty task concurrency controller
func (*TCController) AddTask ¶
func (m *TCController) AddTask(name string, f func(args map[string]interface{}) (interface{}, error), args interface{}) *Executor
Add a task to the controller. `name` is a user-defined string identifier of the task. `f` is the task function. `args` is arguments bind with the task, which can be obtained inside the task function from args["BIND"].
func (*TCController) BatchRun ¶ added in v1.3.0
func (m *TCController) BatchRun() (map[string]interface{}, error)
Run the execution. If success, return a map[name]value, where names are task of termination dependent tasks and values are their return value. If failed, return ErrNoTermination, ErrLoopDependency or ErrAborted
Example ¶
// in this example: // hello -+ // +-(&&)-> helloworld + // world -+ + // foo -+ +-(&&)-> [termination] // +-(||)-> foobar + // bar -+ controller := NewTCController() hello := controller.AddTask("hello", hello, 0) world := controller.AddTask("world", world, 1) helloworld := controller.AddTask("helloworld", helloworld, 2) foo := controller.AddTask("foo", foo, 3) bar := controller.AddTask("bar", bar, 4) foobar := controller.AddTask("foobar", foobar, 5) helloworld.SetDependency(MakeAndExpr(helloworld.NewDependencyExpr(hello), helloworld.NewDependencyExpr(world))) foobar.SetDependency(MakeOrExpr(foobar.NewDependencyExpr(foo), foobar.NewDependencyExpr(bar))) controller.SetTermination(MakeAndExpr(controller.NewTerminationExpr(foobar), controller.NewTerminationExpr(helloworld))) _, err := controller.BatchRun() if err != nil { panic(err) }
Output: hello world helloworld foo foobar bar
func (*TCController) NewTerminationExpr ¶
func (m *TCController) NewTerminationExpr(d *Executor) DependencyExpression
Create a termination dependency expression for the controller. It means the execution termination may depend on task `d`.
func (*TCController) PoolRun ¶ added in v1.3.0
func (m *TCController) PoolRun(pool GoroutinePool) (map[string]interface{}, error)
Run the execution with a Coroutine Pool. If success, return a map[name]value, where names are task of termination dependent tasks and values are their return value. If failed, return ErrNoTermination, ErrLoopDependency or ErrAborted
Example ¶
// in this example: // we use ants pool (panjf2000/ants) with cap=2 // A(1) -+ // +-(&&)-> E(2) + // B(3) -+ | // C(1) -+ +-(&&)-> H(1) --> [termination] // +-(&&)-> F(2) + // D(2) -+ | // G(2) ---------------+ controller := NewTCController() A := controller.AddTask("A", sleeptask, 1) B := controller.AddTask("B", sleeptask, 3) C := controller.AddTask("C", sleeptask, 1) D := controller.AddTask("D", sleeptask, 2) E := controller.AddTask("E", sleeptask, 2) F := controller.AddTask("F", sleeptask, 2) G := controller.AddTask("G", sleeptask, 2) H := controller.AddTask("H", sleeptask, 1) E.SetDependency(MakeAndExpr(E.NewDependencyExpr(A), E.NewDependencyExpr(B))) F.SetDependency(MakeAndExpr(F.NewDependencyExpr(C), F.NewDependencyExpr(D))) H.SetDependency(MakeAndExpr( MakeAndExpr(H.NewDependencyExpr(E), H.NewDependencyExpr(F)), H.NewDependencyExpr(G), )) controller.SetTermination(controller.NewTerminationExpr(H)) pool := NewDefaultPool(2) defer pool.Close() _, err := controller.PoolRun(pool) if err != nil { panic(err) }
Output: A C B D G E F H
func (*TCController) SetTermination ¶
func (m *TCController) SetTermination(Expr DependencyExpression)
Set termination condition for the controller. `Expr` is a dependency expression.
func (*TCController) String ¶ added in v1.2.0
func (m *TCController) String() string
The inner state of the controller
func (*TCController) TerminationExpr ¶
func (m *TCController) TerminationExpr() DependencyExpression
Get termination condition of the controller.