taskgroup

package module
v1.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2024 License: MIT Imports: 4 Imported by: 0

README

taskgroup

golang并发执行多任务(网络i/o、磁盘i/o、内存i/o等),并聚合收集多任务执行结果与执行状态(如任务组执行失败,将返回首个必要成功任务的错误信息, 且会立即停止后续任务的运行)。 使用文档

使用: go get github.com/mlee-msl/taskgroup

功能特点

  • 并发安全的执行多个任务
  • 将多个任务的执行结果与执行状态进行聚合
  • 通过扇出/扇入模式,结合线程安全channel实现高效协程间通信
  • 多任务复用(共享)同一协程,避免了因协程频繁创建或销毁带来的开销,一定程度上也减少了上下文切换(特别是,内核线程与用户协程间的切换)的频次
  • early-return,当出现必要成功的任务失败时,将停止执行所有goroutine上还未启动的所有其他任务

    NOTEs,当所有任务都设置为非必要成功时,即可退化为errgroup包的使用场景

vs官方扩展库errgroup

  • errgroup 没有任务添加阶段,直接会使用协程执行指定的任务

    可通过限制协程数量上限,控制并发量(指定buffer sizechannel实现),当协程数达到上限时,需要等待现有任务执行结束,然后开启新的协程,会增加协程创建或销毁的成本

    另外,当协程量不指定时(默认协程量不受限),随着任务量的增加,会增加内核态线程与用户态协程间的上下文切换成本

  • errgroup可支持带有取消Context的模式,但实际上,该种模式下仍需要所有执行任务的goroutine执行完毕(每个任务都会绑定到新的goroutine上)
  • 仅返回了首个出现错误的接口(任务)状态,未将所有接口(任务)的执行结果(或执行状态)进行收集

IDEAs & TODOs

  • 当使用errgroup.WithContext时,出现错误cancel后,需要同步(如,atomic同步原语)后续协程(因任务阻塞还未开始执行的协程)不再启动(后续执行已无意义,业务层面已选择了带取消的上下文执行方式), 也确保避免了出现内存泄露(协程泄露等)等可能的问题
  • 增加一个默认的context.Context,和errgroup.WithContext的处理逻辑保持一致,统一使用context.Context处理错误 (功能实现上,整体可能会更加优雅点)

关于性能

指标

cpu性能

go test -benchmem -run=^$ -bench ^BenchmarkTaskGroup$ -cpu='1,2,4,8,16' [-benchtime=10x|-benchtime=1s] -count=3 -cpuprofile='cpu.pprof' .

内存性能

go test -benchmem -run=^$ -bench ^BenchmarkTaskGroup$ -cpu='1,2,4,8,16' [-benchtime=10x|-benchtime=1s] -count=3 -memprofile='mem.pprof' .

阻塞性能

go test -benchmem -run=^$ -bench ^BenchmarkTaskGroup$ -cpu='1,2,4,8,16' [-benchtime=10x|-benchtime=1s] -count=3 -blockprofile='block.pprof' .

锁性能

go test -benchmem -run=^$ -bench ^BenchmarkTaskGroup$ -cpu='1,2,4,8,16' [-benchtime=10x|-benchtime=1s] -count=3 -mutexprofile='mutex.pprof' .

效果对比

taskgroup

并发任务量: 6 taskgroup_6.jpg
并发任务量: 60
taskgroup_60.jpg

errgroup

并发任务量: 6
errgroup_6.jpg
并发任务量: 60
errgroup_60.jpg

可以看出,在增加任务执行结果聚合、失败快速返回的能力前提下:

当并发任务量不多时,前者相较于后者的综合性能提升还不明显;
当并发任务量较多时,前者相较于后者的综合性能表现提升较大;
深度验证后发现,随着并发任务量的增加,其综合性能表现的差距将会越明显。

其他

  • 核心功能的测试用例均采用Fuzz Test模糊测试,并测试通过
  • 包中提供的核心功能,均有Example Test样例用例,并执行通过,更多详情

Documentation

Overview

Package taskgroup 实现了多任务并发执行,聚合收集最终所有任务的执行结果与执行状态,

taskgroup.TaskGroup 引用 sync.WaitGroup, 但增加了任务结果聚合和错误返回(首个必要成功任务的错误信息, 且会立即停止后续任务的运行)

Index

Examples

Constants

This section is empty.

Variables

View Source
var If = func(cond bool, a, b interface{}) interface{} {
	if cond {
		return a
	}
	return b
}

If 简单的三元表达式实现

Functions

This section is empty.

Types

type Option added in v1.0.5

type Option func(*TaskGroup)

Option 表示任务组对象默认行为的修改

func WithWorkerNums added in v1.0.5

func WithWorkerNums(workerNums uint32) Option

WithWorkerNums 指定任务组执行时所需的协程数`workerNums`

Worker 数量的选择:

1. 硬件资源:系统上的 CPU 核心数量、内存大小和网络带宽等因素会限制可以并行运行的 worker 的数量。如果 worker 数量超过硬件资源能够支持的程度,那么增加更多的 worker 并不会提高整体性能,反而可能因为上下文切换和资源争用而降低性能

2. 任务的性质:任务可能是 I/O 密集型(如网络请求或磁盘读写)或 CPU 密集型(如复杂的数学计算)。对于 I/O 密集型任务,增加 worker 数量可以更有效地利用等待时间,因为当一个 worker 在等待 I/O 操作完成时,其他 worker 可以继续执行。然而,对于 CPU 密集型任务,增加 worker 数量可能会导致 CPU 资源耗尽,从而降低性能。

3. 任务的粒度:任务的粒度(即每个任务所需的时间)也会影响 worker 的数量。如果任务粒度很小,那么可能需要更多的 worker 来确保 CPU 始终保持忙碌状态。但是,如果任务粒度很大,那么少量的 worker 就足以处理所有任务,增加 worker 数量可能是不必要的。

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(fNO uint32, f TaskFunc, mustSuccess bool) *Task

NewTask 创建一个任务,`fNO`用以表示任务`f`的唯一标识, `mustSuccess`则表示该任务`f`是否为必须成功,当`true`时, 且任务`f`执行失败,表示整个任务组将执行失败

type TaskFunc added in v1.0.5

type TaskFunc func() (interface{}, error)

TaskFunc 任务函数的签名

type TaskGroup

type TaskGroup struct {
	// contains filtered or unexported fields
}

TaskGroup 表示可将多个任务进行安全并发执行的一个对象

Example (Abnormal)

Abnormal 展示了异常的使用案例,包括,多任务创建、任务执行、结果收集,错误处理等

defer func() {
	if r := recover(); r != nil {
		fmt.Printf("err: %+v\n", r)
	}
}()

tasks := []*taskgroup.Task{
	taskgroup.NewTask(1, task1ReturnFailWrapper(1, false), true),
	taskgroup.NewTask(2, task2ReturnSuccessWrapper(2, false), true),
	taskgroup.NewTask(2, task3ReturnFailWrapper(3, false), true),
	nil,
	nil,
}

taskResults, err := taskgroup.NewTaskGroup(taskgroup.WithWorkerNums(uint32(len(tasks)))).AddTask(tasks...).Run()
if err != nil {
	fmt.Printf("err: %+v\n", err)
	return
}
for fno, result := range taskResults {
	fmt.Printf("FNO: %d, RESULT: %v , STATUS: %v\n", fno, result.Result(), result.Error())
}
Output:

err: AddTask: Already have the same Task 2
Example (Default)

Default 展示了默认配置的使用案例,包括,多任务创建、任务执行、结果收集,错误处理等

tasks := []*taskgroup.Task{
	taskgroup.NewTask(1, task1ReturnFailWrapper(1, false), false),
	taskgroup.NewTask(2, task2ReturnSuccessWrapper(2, false), true),
	taskgroup.NewTask(3, task3ReturnFailWrapper(3, false), true),
}

taskResults, err := new(taskgroup.TaskGroup).AddTask(tasks...).Run()
if err != nil {
	fmt.Printf("err: %+v\n", err)
	return
}
for fno, result := range taskResults {
	fmt.Printf("FNO: %d, RESULT: %v , STATUS: %v\n", fno, result.Result(), result.Error())
}
Output:

err: fno: 3, TASK3 err
Example (JustNotBad)

JustNotBad 展示了非最佳的使用案例,包括,多任务创建、任务执行、结果收集,错误处理等

tasks := []*taskgroup.Task{
	taskgroup.NewTask(1, task1ReturnFailWrapper(1, false), false),
	taskgroup.NewTask(2, task2ReturnSuccessWrapper(2, false), true),
	taskgroup.NewTask(3, task3ReturnFailWrapper(2, false), false),
	nil,
	nil,
}

taskResults, err := taskgroup.NewTaskGroup(nil).AddTask(tasks...).Run()
if err != nil {
	fmt.Printf("err: %+v\n", err)
	return
}
for fno, result := range taskResults {
	fmt.Printf("FNO: %d, RESULT: %v , STATUS: %v\n", fno, result.Result(), result.Error())
}
Output:

FNO: 3, RESULT: TASK3: The data is 928 , STATUS: fno: 2, TASK3 err
FNO: 1, RESULT: 1127 , STATUS: fno: 1, TASK1 err
FNO: 2, RESULT: {1112 mlee} , STATUS: <nil>
Example (Typical)

Typical 展示了典型的使用案例,包括,多任务创建、任务执行、结果收集,错误处理等

tasks := []*taskgroup.Task{
	taskgroup.NewTask(1, task1ReturnFailWrapper(1, false), false),
	taskgroup.NewTask(2, task2ReturnSuccessWrapper(2, false), true),
	taskgroup.NewTask(3, task3ReturnFailWrapper(3, false), false),
}

taskResults, err := taskgroup.NewTaskGroup(taskgroup.WithWorkerNums(uint32(len(tasks)))).AddTask(tasks...).Run()
if err != nil {
	fmt.Printf("err: %+v\n", err)
	return
}
for fno, result := range taskResults {
	fmt.Printf("FNO: %d, RESULT: %v , STATUS: %v\n", fno, result.Result(), result.Error())
}
Output:

FNO: 3, RESULT: TASK3: The data is 928 , STATUS: fno: 3, TASK3 err
FNO: 2, RESULT: {1112 mlee} , STATUS: <nil>
FNO: 1, RESULT: 1127 , STATUS: fno: 1, TASK1 err

func NewTaskGroup added in v1.0.5

func NewTaskGroup(opts ...Option) *TaskGroup

NewTaskGroup 创建一个任务组对象

func (*TaskGroup) AddTask

func (tg *TaskGroup) AddTask(tasks ...*Task) *TaskGroup

AddTask 向任务组中添加若干待执行的任务`tasks`

NOTEs: 出现了相同的任务(任务的标识相等),将会`panic`

func (*TaskGroup) Run

func (tg *TaskGroup) Run() (map[uint32]*TaskResult, error)

Run 启动并运行任务组中的所有任务

当返回`non-nil`错误时,则,返回的任务执行结果将不可信

func (*TaskGroup) RunExactlyOnce

func (tg *TaskGroup) RunExactlyOnce() (result map[uint32]*TaskResult, err error)

RunExactlyOnce 启动并运行任务组中的所有任务(运行当且仅当一次)

type TaskResult added in v1.0.4

type TaskResult struct {
	// contains filtered or unexported fields
}

TaskResult 表示任务的执行结果与执行状态

func (*TaskResult) Error added in v1.0.4

func (tr *TaskResult) Error() error

Error 获取任务执行状态

func (*TaskResult) FNO added in v1.2.0

func (tr *TaskResult) FNO() uint32

FNO 获取任务的唯一标识号

func (*TaskResult) Result added in v1.0.4

func (tr *TaskResult) Result() interface{}

Result 获取任务执行结果

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL