gotcc

package module
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2023 License: MIT Imports: 7 Imported by: 0

README

Go Reference Go Report Card codecov

gotcc

🤖 gotcc is a Golang package for Task Concurrency Control. It allows you to define tasks, their dependencies, and the controller will run the tasks concurrently while respecting the dependencies.

Features of gotcc

  • Automatic task concurrency control based on dependency declarations.
  • Support dependency logic expressions: not, and, or, xor and any combination of them.
  • Many-to-many result delivery between tasks.
  • Support tasks rollback in case of any error.
  • Support multiple errors collection.
  • Support coroutine pool: default(panjf2000/ants) or user-defined coroutine pool.

Installation

go get github.com/piaodazhu/gotcc

Usage

A simple usage:

import "github.com/piaodazhu/gotcc"

// User-defined task function
func ExampleFunc1(args map[string]interface{}) (interface{}, error) {
	fmt.Println(args["BIND"].(string))
	return "DONE", nil
}
func ExampleFunc2(args map[string]interface{}) (interface{}, error) {
	return args["BIND"].(string), nil
}

// User-defined undo function
func ExampleUndo(args map[string]interface{}) error {
	fmt.Println("Undo > ", args["BIND"].(string))
	return nil
}

func main() {
	// 1. Create a new controller
	controller := gotcc.NewTCController()

	// 2. Add tasks to the controller
	//   TaskA: bind arguments with ExampleFunc1
	taskA := controller.AddTask("taskA", ExampleFunc1, "BindArg-A")
	//   TaskB: like TaskA, but set undoFunction
	taskB := controller.AddTask("taskB", ExampleFunc1, "BindArg-B").SetUndoFunc(ExampleUndo, true)
	//   TaskC: bind arguments with ExampleFunc2
	taskC := controller.AddTask("taskC", ExampleFunc2, "BindArg-C")
	
	//   TaskD: bind arguments with ExampleFunc2
	taskD := controller.AddTask("taskD", ExampleFunc2, "BindArg-D")

	// 3. Define dependencies
	//   B depend on A
	taskB.SetDependency(taskB.NewDependencyExpr(taskA))
	//   C depend on A
	taskC.SetDependency(taskC.NewDependencyExpr(taskA))
	//   D depend on B and C
	taskD.SetDependency(gotcc.MakeAndExpr(taskD.NewDependencyExpr(taskB), taskD.NewDependencyExpr(taskC)))

	// 4. Define termination (Important)
	//   set TaskD's finish as termination
	controller.SetTermination(controller.NewTerminationExpr(taskD))
	
	// 5. Run the tasks
	result, err := controller.BatchRun()
	if err != nil {
		// get taskErrors: err.(ErrAborted).TaskErrors
		// get undoErrors: err.(ErrAborted).UndoErrors
	}

	// 6. Will Print "BindArg-D"
	fmt.Println(result["taskD"].(string))
}

Tasks will run concurrently, but taskB and taskC will not start until taskA completes, and taskD will not start until both taskB and taskC complete. But if taskD failed (return err!=nil), ExampleUndo("BindArg-B") will be executed.

More detailed usage information can be found in test files, you can refer to example_test.go for a more complex dependency topology, dependency_test.go for the advanced usage of dependency logic expressions, and tcc_test.go for tasks rollback and error message collection.

Specifications

Execution

In summary, a single execution of the TCController contains multiple tasks. There may be some dependencies between tasks, and the termination of the execution depends on the completion of some of these tasks. Therefore, controller.SetTermination must be called before calling controller.BatchRun or controller.PoolRun.

There are 2 mode for the TCController to execute the tasks: BatchRun and PoolRun. BatchRun will create NumOf(tasks) goroutines at most. If we need to control the max number of running goroutines, PoolRun is recommand. A default coroutine pool is provided, based on panjf2000/ants. User-defined coroutine pool should implement this interface, where Go() is a task submission method and should block when workers are busy:

type GoroutinePool interface {
	Go(task func()) error
}

Node that PoolRun mode only avalible when all dependency expressions are AND.

Task Function

The task function must have this form:

func (args map[string]interface{}) (interface{}, error)

There are some built-in keys when running the task function:

  • NAME: the value is the name of this task.
  • BIND: the value is the third arguments when controller.AddTask() was called.
  • CANCEL: the value is a context.Context, with cancel.

Other keys are the names of its dependent tasks, and the corresponding values are the return value of these tasks.

IMPORTANT: Inside task functions, if the task is cancelled by receiving signal from args["CANCEL"].(context.Context).done(), it should return gotcc.ErrCancelled (with state if necessary). if the task failed but you don't want abort the execution, it should return gotcc.ErrSilentFail.

Undo Function

The undo function must have this form:

func (args map[string]interface{}) error

There are some built-in keys when running the undo function:

  • NAME: the value is the name of this task.
  • BIND: the value is the third arguments when controller.AddTask() was called.
  • TASKERR: the value type is []*gotcc.ErrorMessage, recording the errors of tasks execution.
  • UNDOERR: the value type is []*gotcc.ErrorMessage, recording the previous errors of undo execution.
  • CANCELLED: the value type is []*gotcc.StateMessage, recording the state of canncelld task. (For example, what process in that task has been done before cancelled.)

The undo functions will be run in the reverse order of the task function completion. And the second arguments of SetUndoFunc means whether to skip this error if the undo function errors out.

The undo function will be executed when:

  1. Some task return err!=nil when the controller execute the tasks.
  2. The corresponding task has been completed.
  3. The predecessor undo functions have been completed or skipped.

When the undo function run, the arguments args is exactly the same as its corresponding task.

Errors

During the execution of TCController, multiple tasks may fail and after failure, multiple tasks may be cancelled. During rollback, multiple rollback functions may also encounter errors. Therefore, the error definitions in the return value of Run are as follows:

type ErrAborted struct {
	TaskErrors []*ErrorMessage
	UndoErrors []*ErrorMessage
	Cancelled  []*StateMessage
}

type ErrorMessage struct {
	TaskName string
	Error    error
}

type StateMessage struct {
	TaskName string
	State    State
}

Dependency Expression

Supported dependency logic expressions are not, and, or, xor and any combination of them.

For taskB, create a dependency expression about taskA:

ExprAB := taskB.NewDependencyExpr(taskA)

Combine existing dependency expressions to generate dependency expressions:

Expr3 := gotcc.MakeOrExpr(Expr1, Expr2)

Get the current dependency expression of taskA.

Expr := taskA.DependencyExpr()

Set the dependency expression for taskA.

taskA.SetDependencyExpr(Expr)

And termination setup has the same logic as above.

Performance

goos: linux
goarch: amd64
pkg: github.com/piaodazhu/gotcc
cpu: 11th Gen Intel(R) Core(TM) i7-11700 @ 2.50GHz
BenchmarkBatchRunSerialized10-4            54928             19533 ns/op            7106 B/op         84 allocs/op
BenchmarkBatchRunSerialized100-4            6452            172314 ns/op           68873 B/op        750 allocs/op
BenchmarkBatchRunSerialized1000-4            507           2301349 ns/op          772775 B/op       7493 allocs/op
BenchmarkBatchRunManyToOne10-4             59264             20095 ns/op            7673 B/op         77 allocs/op
BenchmarkBatchRunManyToOne100-4             5623            201600 ns/op           78210 B/op        659 allocs/op
BenchmarkBatchRunManyToOne1000-4             100          11388471 ns/op          899267 B/op       6178 allocs/op
BenchmarkBatchRunManyToMany10-4            23252             50629 ns/op           21549 B/op        212 allocs/op
BenchmarkBatchRunManyToMany100-4             410           2814498 ns/op         2270278 B/op      15945 allocs/op
BenchmarkBatchRunBinaryTree10-4            34041             37472 ns/op           10857 B/op        116 allocs/op
BenchmarkBatchRunBinaryTree100-4            6380            204777 ns/op           91623 B/op        880 allocs/op
BenchmarkBatchRunBinaryTree1000-4            804           1506047 ns/op          749709 B/op       6848 allocs/op
BenchmarkPoolRunSerialized10-4             49352             24033 ns/op            7622 B/op         91 allocs/op
BenchmarkPoolRunSerialized100-4             5956            180609 ns/op           72491 B/op        754 allocs/op
BenchmarkPoolRunSerialized1000-4             710           1617208 ns/op          783005 B/op       7222 allocs/op
BenchmarkPoolRunManyToOne10-4              38798             31265 ns/op            8193 B/op         84 allocs/op
BenchmarkPoolRunManyToOne100-4              5371            215742 ns/op           81924 B/op        664 allocs/op
BenchmarkPoolRunManyToOne1000-4              100          11651428 ns/op          937995 B/op       6226 allocs/op
BenchmarkPoolRunManyToMany10-4             22332             52499 ns/op           22833 B/op        218 allocs/op
BenchmarkPoolRunManyToMany100-4              336           3698301 ns/op         2360297 B/op      15935 allocs/op
BenchmarkPoolRunBinaryTree10-4             29043             42600 ns/op           11543 B/op        122 allocs/op
BenchmarkPoolRunBinaryTree100-4             5572            216561 ns/op           96321 B/op        885 allocs/op
BenchmarkPoolRunBinaryTree1000-4             826           1394863 ns/op          782612 B/op       6812 allocs/op

License

gotcc is released under the MIT license. See LICENSE for details.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var DefaultFalseExpr = DependencyExpression{
	// contains filtered or unexported fields
}

default dependency expression: always return false

View Source
var DefaultTrueExpr = DependencyExpression{
	// contains filtered or unexported fields
}

default dependency expression: always return true

View Source
var EmptyUndoFunc = func(args map[string]interface{}) error {
	return nil
}

Default undo function

Functions

This section is empty.

Types

type DefaultNoPool added in v1.3.0

type DefaultNoPool struct{}

Default coroutine pool: actually not a coroutine pool but only launch new goroutines.

func (DefaultNoPool) Go added in v1.3.0

func (DefaultNoPool) Go(task func()) error

type DefaultPool added in v1.3.0

type DefaultPool struct {
	// contains filtered or unexported fields
}

Default coroutine pool: base on ants pool.

func NewDefaultPool added in v1.3.0

func NewDefaultPool(size int) DefaultPool

Create a default coroutine pool with `size`.

func (DefaultPool) Close added in v1.3.0

func (p DefaultPool) Close()

func (DefaultPool) Go added in v1.3.0

func (p DefaultPool) Go(task func()) error

type DependencyExpression

type DependencyExpression struct {
	// contains filtered or unexported fields
}

A dependency expression is a filter to describe the tasks' dependency A task will be launched only if the expression is true.

type ErrAborted

type ErrAborted struct {
	TaskErrors []*ErrorMessage
	UndoErrors []*ErrorMessage
	Cancelled  []*StateMessage
}

It means some fatal errors occur so the execution failed. It consists of multiple errors. TaskErrors: errors from task running. UndoErrors: errors from undo function running. Cancelled: running but cancelled tasks.

func (ErrAborted) Error

func (e ErrAborted) Error() string

type ErrCancelled added in v1.1.0

type ErrCancelled struct {
	State State
}

It means the task is cancelled by the controller because the some other task return a fatal error. If a task is aborted with args["CANCEL"].(context.Context).done(), it should return ErrCancelled. And its running state can be set into the error. The task and its state will be put into Cancelled list.

func (ErrCancelled) Error added in v1.1.0

func (e ErrCancelled) Error() string

type ErrLoopDependency added in v1.2.0

type ErrLoopDependency struct {
	State State
}

It means there is loop dependency among the tasks.

func (ErrLoopDependency) Error added in v1.2.0

func (e ErrLoopDependency) Error() string

type ErrNoTermination

type ErrNoTermination struct{}

It means the controller's termination condition haven't been set.

func (ErrNoTermination) Error

func (ErrNoTermination) Error() string

type ErrPoolUnsupport added in v1.3.0

type ErrPoolUnsupport struct{}

It means the controller doesn't support PoolRun() because not all dependency expressions are `AND`.

func (ErrPoolUnsupport) Error added in v1.3.0

func (ErrPoolUnsupport) Error() string

type ErrSilentFail added in v1.2.0

type ErrSilentFail struct{}

It means the task failed, but you don't want the tasks execution aborted. The failed task will be put into TaskErrors if the tasks execution finally failed.

func (ErrSilentFail) Error added in v1.2.0

func (e ErrSilentFail) Error() string

type ErrorMessage

type ErrorMessage struct {
	TaskName string
	Error    error
}

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func (*Executor) DependencyExpr

func (e *Executor) DependencyExpr() DependencyExpression

Get dependency expression of the executor.

func (*Executor) Name

func (e *Executor) Name() string

Get task name of the executor.

func (*Executor) NewDependencyExpr

func (e *Executor) NewDependencyExpr(d *Executor) DependencyExpression

Create a dependency expression for the executor. It means the task launching may depend on executor `d`.

func (*Executor) SetDependency

func (e *Executor) SetDependency(Expr DependencyExpression) *Executor

Set dependency expression for the executor. `Expr` is a dependency expression.

func (*Executor) SetUndoFunc

func (e *Executor) SetUndoFunc(undo func(args map[string]interface{}) error, skipError bool) *Executor

Set undo function the task executor. The undo function will get all arguments of the task function.

type GoroutinePool added in v1.3.0

type GoroutinePool interface {
	Go(task func()) error
}

Goroutine pool interface. It should be blocked until worker available.

type State added in v1.1.0

type State interface {
	String() string
}

type StateMessage added in v1.1.0

type StateMessage struct {
	TaskName string
	State    State
}

type TCController

type TCController struct {
	// contains filtered or unexported fields
}

Task Concurrency Controller

func NewTCController

func NewTCController() *TCController

Create an empty task concurrency controller

func (*TCController) AddTask

func (m *TCController) AddTask(name string, f func(args map[string]interface{}) (interface{}, error), args interface{}) *Executor

Add a task to the controller. `name` is a user-defined string identifier of the task. `f` is the task function. `args` is arguments bind with the task, which can be obtained inside the task function from args["BIND"].

func (*TCController) BatchRun added in v1.3.0

func (m *TCController) BatchRun() (map[string]interface{}, error)

Run the execution. If success, return a map[name]value, where names are task of termination dependent tasks and values are their return value. If failed, return ErrNoTermination, ErrLoopDependency or ErrAborted

Example
// in this example:
// hello -+
//        +-(&&)-> helloworld +
// world -+                   +
//  foo  -+                   +-(&&)-> [termination]
//        +-(||)->   foobar   +
//  bar  -+

controller := NewTCController()

hello := controller.AddTask("hello", hello, 0)
world := controller.AddTask("world", world, 1)
helloworld := controller.AddTask("helloworld", helloworld, 2)
foo := controller.AddTask("foo", foo, 3)
bar := controller.AddTask("bar", bar, 4)
foobar := controller.AddTask("foobar", foobar, 5)

helloworld.SetDependency(MakeAndExpr(helloworld.NewDependencyExpr(hello), helloworld.NewDependencyExpr(world)))

foobar.SetDependency(MakeOrExpr(foobar.NewDependencyExpr(foo), foobar.NewDependencyExpr(bar)))

controller.SetTermination(MakeAndExpr(controller.NewTerminationExpr(foobar), controller.NewTerminationExpr(helloworld)))

_, err := controller.BatchRun()
if err != nil {
	panic(err)
}
Output:

hello
world
helloworld
foo
foobar
bar

func (*TCController) NewTerminationExpr

func (m *TCController) NewTerminationExpr(d *Executor) DependencyExpression

Create a termination dependency expression for the controller. It means the execution termination may depend on task `d`.

func (*TCController) PoolRun added in v1.3.0

func (m *TCController) PoolRun(pool GoroutinePool) (map[string]interface{}, error)

Run the execution with a Coroutine Pool. If success, return a map[name]value, where names are task of termination dependent tasks and values are their return value. If failed, return ErrNoTermination, ErrLoopDependency or ErrAborted

Example
// in this example:
// we use ants pool (panjf2000/ants) with cap=2
// A(1) -+
//       +-(&&)-> E(2) +
// B(3) -+             |
// C(1) -+             +-(&&)-> H(1) --> [termination]
//       +-(&&)-> F(2) +
// D(2) -+             |
// G(2) ---------------+
controller := NewTCController()

A := controller.AddTask("A", sleeptask, 1)
B := controller.AddTask("B", sleeptask, 3)
C := controller.AddTask("C", sleeptask, 1)
D := controller.AddTask("D", sleeptask, 2)
E := controller.AddTask("E", sleeptask, 2)
F := controller.AddTask("F", sleeptask, 2)
G := controller.AddTask("G", sleeptask, 2)
H := controller.AddTask("H", sleeptask, 1)

E.SetDependency(MakeAndExpr(E.NewDependencyExpr(A), E.NewDependencyExpr(B)))
F.SetDependency(MakeAndExpr(F.NewDependencyExpr(C), F.NewDependencyExpr(D)))
H.SetDependency(MakeAndExpr(
	MakeAndExpr(H.NewDependencyExpr(E), H.NewDependencyExpr(F)),
	H.NewDependencyExpr(G),
))

controller.SetTermination(controller.NewTerminationExpr(H))

pool := NewDefaultPool(2)
defer pool.Close()

_, err := controller.PoolRun(pool)
if err != nil {
	panic(err)
}
Output:

A
C
B
D
G
E
F
H

func (*TCController) SetTermination

func (m *TCController) SetTermination(Expr DependencyExpression)

Set termination condition for the controller. `Expr` is a dependency expression.

func (*TCController) String added in v1.2.0

func (m *TCController) String() string

The inner state of the controller

func (*TCController) TerminationExpr

func (m *TCController) TerminationExpr() DependencyExpression

Get termination condition of the controller.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL