Documentation
¶
Overview ¶
Package taskgroup 实现了多任务并发执行,聚合收集最终所有任务的执行结果与执行状态,
taskgroup.TaskGroup 引用 sync.WaitGroup, 但增加了任务结果聚合和错误返回(首个必要成功任务的错误信息, 且会立即停止后续任务的运行)
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var If = func(cond bool, a, b interface{}) interface{} {
if cond {
return a
}
return b
}
If 简单的三元表达式实现
Functions ¶
This section is empty.
Types ¶
type Option ¶ added in v1.0.5
type Option func(*TaskGroup)
Option 表示任务组对象默认行为的修改
func WithWorkerNums ¶ added in v1.0.5
WithWorkerNums 指定任务组执行时所需的协程数`workerNums`
Worker 数量的选择:
1. 硬件资源:系统上的 CPU 核心数量、内存大小和网络带宽等因素会限制可以并行运行的 worker 的数量。如果 worker 数量超过硬件资源能够支持的程度,那么增加更多的 worker 并不会提高整体性能,反而可能因为上下文切换和资源争用而降低性能
2. 任务的性质:任务可能是 I/O 密集型(如网络请求或磁盘读写)或 CPU 密集型(如复杂的数学计算)。对于 I/O 密集型任务,增加 worker 数量可以更有效地利用等待时间,因为当一个 worker 在等待 I/O 操作完成时,其他 worker 可以继续执行。然而,对于 CPU 密集型任务,增加 worker 数量可能会导致 CPU 资源耗尽,从而降低性能。
3. 任务的粒度:任务的粒度(即每个任务所需的时间)也会影响 worker 的数量。如果任务粒度很小,那么可能需要更多的 worker 来确保 CPU 始终保持忙碌状态。但是,如果任务粒度很大,那么少量的 worker 就足以处理所有任务,增加 worker 数量可能是不必要的。
type TaskGroup ¶
type TaskGroup struct {
// contains filtered or unexported fields
}
TaskGroup 表示可将多个任务进行安全并发执行的一个对象
Example (Abnormal) ¶
Abnormal 展示了异常的使用案例,包括,多任务创建、任务执行、结果收集,错误处理等
defer func() { if r := recover(); r != nil { fmt.Printf("err: %+v\n", r) } }() tasks := []*taskgroup.Task{ taskgroup.NewTask(1, task1ReturnFailWrapper(1, false), true), taskgroup.NewTask(2, task2ReturnSuccessWrapper(2, false), true), taskgroup.NewTask(2, task3ReturnFailWrapper(3, false), true), nil, nil, } taskResults, err := taskgroup.NewTaskGroup(taskgroup.WithWorkerNums(uint32(len(tasks)))).AddTask(tasks...).Run() if err != nil { fmt.Printf("err: %+v\n", err) return } for fno, result := range taskResults { fmt.Printf("FNO: %d, RESULT: %v , STATUS: %v\n", fno, result.Result(), result.Error()) }
Output: err: AddTask: Already have the same Task 2
Example (Default) ¶
Default 展示了默认配置的使用案例,包括,多任务创建、任务执行、结果收集,错误处理等
tasks := []*taskgroup.Task{ taskgroup.NewTask(1, task1ReturnFailWrapper(1, false), false), taskgroup.NewTask(2, task2ReturnSuccessWrapper(2, false), true), taskgroup.NewTask(3, task3ReturnFailWrapper(3, false), true), } taskResults, err := new(taskgroup.TaskGroup).AddTask(tasks...).Run() if err != nil { fmt.Printf("err: %+v\n", err) return } for fno, result := range taskResults { fmt.Printf("FNO: %d, RESULT: %v , STATUS: %v\n", fno, result.Result(), result.Error()) }
Output: err: fno: 3, TASK3 err
Example (JustNotBad) ¶
JustNotBad 展示了非最佳的使用案例,包括,多任务创建、任务执行、结果收集,错误处理等
tasks := []*taskgroup.Task{ taskgroup.NewTask(1, task1ReturnFailWrapper(1, false), false), taskgroup.NewTask(2, task2ReturnSuccessWrapper(2, false), true), taskgroup.NewTask(3, task3ReturnFailWrapper(2, false), false), nil, nil, } taskResults, err := taskgroup.NewTaskGroup(nil).AddTask(tasks...).Run() if err != nil { fmt.Printf("err: %+v\n", err) return } for fno, result := range taskResults { fmt.Printf("FNO: %d, RESULT: %v , STATUS: %v\n", fno, result.Result(), result.Error()) }
Output: FNO: 3, RESULT: TASK3: The data is 928 , STATUS: fno: 2, TASK3 err FNO: 1, RESULT: 1127 , STATUS: fno: 1, TASK1 err FNO: 2, RESULT: {1112 mlee} , STATUS: <nil>
Example (Typical) ¶
Typical 展示了典型的使用案例,包括,多任务创建、任务执行、结果收集,错误处理等
tasks := []*taskgroup.Task{ taskgroup.NewTask(1, task1ReturnFailWrapper(1, false), false), taskgroup.NewTask(2, task2ReturnSuccessWrapper(2, false), true), taskgroup.NewTask(3, task3ReturnFailWrapper(3, false), false), } taskResults, err := taskgroup.NewTaskGroup(taskgroup.WithWorkerNums(uint32(len(tasks)))).AddTask(tasks...).Run() if err != nil { fmt.Printf("err: %+v\n", err) return } for fno, result := range taskResults { fmt.Printf("FNO: %d, RESULT: %v , STATUS: %v\n", fno, result.Result(), result.Error()) }
Output: FNO: 3, RESULT: TASK3: The data is 928 , STATUS: fno: 3, TASK3 err FNO: 2, RESULT: {1112 mlee} , STATUS: <nil> FNO: 1, RESULT: 1127 , STATUS: fno: 1, TASK1 err
func NewTaskGroup ¶ added in v1.0.5
NewTaskGroup 创建一个任务组对象
func (*TaskGroup) Run ¶
func (tg *TaskGroup) Run() (map[uint32]*TaskResult, error)
Run 启动并运行任务组中的所有任务
当返回`non-nil`错误时,则,返回的任务执行结果将不可信
func (*TaskGroup) RunExactlyOnce ¶
func (tg *TaskGroup) RunExactlyOnce() (result map[uint32]*TaskResult, err error)
RunExactlyOnce 启动并运行任务组中的所有任务(运行当且仅当一次)
type TaskResult ¶ added in v1.0.4
type TaskResult struct {
// contains filtered or unexported fields
}
TaskResult 表示任务的执行结果与执行状态
func (*TaskResult) Result ¶ added in v1.0.4
func (tr *TaskResult) Result() interface{}
Result 获取任务执行结果