gotaskflow

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2025 License: Apache-2.0 Imports: 12 Imported by: 1

README

go-taskflow

codecov Go Reference Go Report Card Mentioned in Awesome Go DeepWiki

go-taskflow

go-taskflow is a general-purpose task-parallel programming framework for Go, inspired by taskflow-cpp. It leverages Go's native capabilities and simplicity, making it ideal for managing complex dependencies in concurrent tasks.

Features

  • High Extensibility: Easily extend the framework to adapt to various specific use cases.

  • Native Go Concurrency Model: Leverages Go's goroutines for efficient concurrent task execution.

  • User-Friendly Programming Interface: Simplifies complex task dependency management in Go.

  • Static, Subflow, Conditional, and Cyclic Tasking: Define static tasks, condition nodes, nested subflows, and cyclic flows to enhance modularity and programmability.

    Static Subflow Condition Cyclic
  • Priority Task Scheduling: Assign task priorities to ensure higher-priority tasks are executed first.

  • Built-in Visualization and Profiling Tools: Generate visual representations of tasks and profile task execution performance using integrated tools, simplifying debugging and optimization.

Use Cases

  • Data Pipelines: Orchestrate data processing stages with complex dependencies.
  • AI Agent Workflow Automation: Define and execute AI agent workflows with clear sequences and dependency structures.
  • Parallel Graph Tasking: Execute graph-based tasks concurrently to maximize CPU utilization.

Installation

Import the latest version of go-taskflow using:

go get -u github.com/noneback/go-taskflow

Documentation

DeepWiki Page

Example

Below is an example of using go-taskflow to implement a parallel merge sort:

package main

import (
    "fmt"
    "log"
    "math/rand"
    "os"
    "slices"
    "strconv"
    "sync"

    gtf "github.com/noneback/go-taskflow"
)

// mergeInto merges a sorted source array into a sorted destination array.
func mergeInto(dest, src []int) []int {
    size := len(dest) + len(src)
    tmp := make([]int, 0, size)
    i, j := 0, 0
    for i < len(dest) && j < len(src) {
        if dest[i] < src[j] {
            tmp = append(tmp, dest[i])
            i++
        } else {
            tmp = append(tmp, src[j])
            j++
        }
    }

    if i < len(dest) {
        tmp = append(tmp, dest[i:]...)
    } else {
        tmp = append(tmp, src[j:]...)
    }

    return tmp
}

func main() {
    size := 100
    randomArr := make([][]int, 10)
    sortedArr := make([]int, 0, 10*size)
    mutex := &sync.Mutex{}

    for i := 0; i < 10; i++ {
        for j := 0; j < size; j++ {
            randomArr[i] = append(randomArr[i], rand.Int())
        }
    }

    sortTasks := make([]*gtf.Task, 10)
    tf := gtf.NewTaskFlow("merge sort")
    done := tf.NewTask("Done", func() {
        if !slices.IsSorted(sortedArr) {
            log.Fatal("Sorting failed")
        }
        fmt.Println("Sorted successfully")
        fmt.Println(sortedArr[:1000])
    })

    for i := 0; i < 10; i++ {
        sortTasks[i] = tf.NewTask("sort_"+strconv.Itoa(i), func() {
            arr := randomArr[i]
            slices.Sort(arr)
            mutex.Lock()
            defer mutex.Unlock()
            sortedArr = mergeInto(sortedArr, arr)
        })
    }
    done.Succeed(sortTasks...)

    executor := gtf.NewExecutor(1000)

    executor.Run(tf).Wait()

    if err := tf.Dump(os.Stdout); err != nil {
        log.Fatal("Error dumping taskflow:", err)
    }

    if err := executor.Profile(os.Stdout); err != nil {
        log.Fatal("Error profiling taskflow:", err)
    }
}

For more examples, visit the examples directory.

Benchmark

The following benchmark provides a rough estimate of performance. Note that most realistic workloads are I/O-bound, and their performance cannot be accurately reflected by these results. For CPU-intensive tasks, consider using taskflow-cpp.

$ go test -bench=. -benchmem
goos: linux
goarch: amd64
pkg: github.com/noneback/go-taskflow/benchmark
cpu: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
BenchmarkC32-4    	   23282	     51891 ns/op	    7295 B/op	     227 allocs/op
BenchmarkS32-4    	    7047	    160199 ns/op	    6907 B/op	     255 allocs/op
BenchmarkC6-4     	   66397	     18289 ns/op	    1296 B/op	      47 allocs/op
BenchmarkC8x8-4   	    7946	    143474 ns/op	   16914 B/op	     504 allocs/op
PASS
ok  	github.com/noneback/go-taskflow/benchmark	5.606s

Understanding Conditional Tasks

Conditional nodes in go-taskflow behave similarly to those in taskflow-cpp. They participate in both conditional control and looping. To avoid common pitfalls, refer to the Conditional Tasking documentation.

Error Handling in go-taskflow

In Go, errors are values, and it is the user's responsibility to handle them appropriately. Only unrecovered panic events are managed by the framework. If a panic occurs, the entire parent graph is canceled, leaving the remaining tasks incomplete. This behavior may evolve in the future. If you have suggestions, feel free to share them.

To prevent interruptions caused by panic, you can handle them manually when registering tasks:

tf.NewTask("not interrupt", func() {
    defer func() {
        if r := recover(); r != nil {
            // Handle the panic.
        }
    }()
    // User-defined logic.
})

Visualizing Taskflows

To generate a visual representation of a taskflow, use the Dump method:

if err := tf.Dump(os.Stdout); err != nil {
    log.Fatal(err)
}

The Dump method generates raw strings in DOT format. Use the dot tool to create a graph SVG.

dot

Profiling Taskflows

To profile a taskflow, use the Profile method:

if err := executor.Profile(os.Stdout); err != nil {
    log.Fatal(err)
}

The Profile method generates raw strings in flamegraph format. Use the flamegraph tool to create a flamegraph SVG.

flg

Stargazer

Star History Chart

Documentation

Index

Constants

View Source
const (
	HIGH = TaskPriority(iota + 1)
	NORMAL
	LOW
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Condition added in v0.0.5

type Condition struct {
	// contains filtered or unexported fields
}

Condition Wrapper

type Executor

type Executor interface {
	Wait()                     // Wait block until all tasks finished
	Profile(w io.Writer) error // Profile write flame graph raw text into w
	Run(tf *TaskFlow) Executor // Run start to schedule and execute taskflow
}

Executor schedule and execute taskflow

func NewExecutor

func NewExecutor(concurrency uint) Executor

NewExecutor return a Executor with a specified max goroutine concurrency(recommend a value bigger than Runtime.NumCPU, **MUST** bigger than num(subflows). )

type Static

type Static struct {
	// contains filtered or unexported fields
}

Static Wrapper

type Subflow

type Subflow struct {
	// contains filtered or unexported fields
}

Subflow Wrapper

func (*Subflow) NewCondition added in v0.1.2

func (sf *Subflow) NewCondition(name string, predict func() uint) *Task

NewCondition returns a condition task. The predict func return value determines its successor.

func (*Subflow) NewSubflow added in v0.1.2

func (sf *Subflow) NewSubflow(name string, f func(sf *Subflow)) *Task

NewSubflow returns a subflow task

func (*Subflow) NewTask added in v0.1.2

func (sf *Subflow) NewTask(name string, f func()) *Task

NewStaticTask returns a static task

type Task

type Task struct {
	// contains filtered or unexported fields
}

Basic component of Taskflow

func (*Task) Name

func (t *Task) Name() string

func (*Task) Precede

func (t *Task) Precede(tasks ...*Task)

Precede: Tasks all depend on *this*. In Addition, order of tasks is correspond to predict result, ranging from 0...len(tasks)

func (*Task) Priority added in v0.0.9

func (t *Task) Priority(p TaskPriority) *Task

Priority sets task's sche priority. Noted that due to goroutine concurrent mode, it can only assure task schedule priority, rather than its execution.

func (*Task) Succeed

func (t *Task) Succeed(tasks ...*Task)

Succeed: *this* deps on tasks

type TaskFlow

type TaskFlow struct {
	// contains filtered or unexported fields
}

TaskFlow represents a series of tasks

func NewTaskFlow

func NewTaskFlow(name string) *TaskFlow

NewTaskFlow returns a taskflow struct

func (*TaskFlow) Dump added in v0.1.3

func (tf *TaskFlow) Dump(writer io.Writer) error

Dump writes graph dot data into writer

func (*TaskFlow) Name

func (tf *TaskFlow) Name() string

func (*TaskFlow) NewCondition added in v0.1.2

func (tf *TaskFlow) NewCondition(name string, predict func() uint) *Task

NewCondition returns a attached condition task. NOTICE: The predict func return value determines its successor.

func (*TaskFlow) NewSubflow added in v0.1.2

func (tf *TaskFlow) NewSubflow(name string, instantiate func(sf *Subflow)) *Task

NewSubflow returns a attached subflow task NOTICE: instantiate will be invoke only once to instantiate itself

func (*TaskFlow) NewTask added in v0.1.2

func (tf *TaskFlow) NewTask(name string, f func()) *Task

NewStaticTask returns a attached static task

func (*TaskFlow) Reset

func (tf *TaskFlow) Reset()

Reset resets taskflow

type TaskPriority added in v0.0.9

type TaskPriority uint

Task sche priority

type Visualizer

type Visualizer interface {
	// Visualize generate raw dag text in dot format and write to writer
	Visualize(tf *TaskFlow, writer io.Writer) error
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL