taskpool

package
v0.18.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2021 License: MIT Imports: 2 Imported by: 0

Documentation

Overview

非阻塞协程池,协程数量可动态增长,可配置最大协程并发数量,可手动释放空闲的协程

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrTaskPool = errors.New("naza.taskpool: fxxk")

Functions

func Go

func Go(task TaskFn, param ...interface{})

func Init

func Init(modOptions ...ModOption) error

func KillIdleWorkers

func KillIdleWorkers()

func NewWorker

func NewWorker(p *pool) *worker

Types

type ModOption

type ModOption func(option *Option)

type Option

type Option struct {
	// 创建池对象时,预先开启的worker(协程)数量,如果为0,则不预先开启。只是一个小优化
	InitWorkerNum int

	// - 如果为0,则无协程数量限制。向池中添加任务时如果无空闲协程,会无条件创建新的协程。
	// - 如果不为0,则池内总协程数量达到阈值后,将不再创建新的协程。此时任务会被缓存,等待有空闲协程时才被执行。
	//   可用来控制任务的最大并发数
	MaxWorkerNum int
}

type Pool

type Pool interface {
	// 向池内放入任务
	// 非阻塞函数,不会等待task执行
	Go(task TaskFn, param ...interface{})

	// 获取当前的状态,注意,只是一个瞬时值
	GetCurrentStatus() Status

	// 关闭池内所有的空闲协程
	KillIdleWorkers()
}

func NewPool

func NewPool(modOptions ...ModOption) (Pool, error)
Example

并发计算0+1+2+...+1000 演示怎么向协程池中添加带参数的函数任务

package main

import (
	"fmt"
	"sync"
	"sync/atomic"

	"github.com/cfeeling/naza/pkg/taskpool"
)

func main() {
	pool, _ := taskpool.NewPool(func(option *taskpool.Option) {
		// 限制最大并发数
		option.MaxWorkerNum = 16
	})
	var sum int32
	var wg sync.WaitGroup
	n := 1000
	wg.Add(n)
	for i := 0; i < n; i++ {
		pool.Go(func(param ...interface{}) {
			ii := param[0].(int)
			atomic.AddInt32(&sum, int32(ii))
			wg.Done()
		}, i)
	}
	wg.Wait()
	fmt.Println(sum)
}
Output:

499500

type Status

type Status struct {
	TotalWorkerNum int // 总协程数量
	IdleWorkerNum  int // 空闲协程数量
	BlockTaskNum   int // 等待执行的任务数。注意,只在协程数量有最大限制的情况下,该值才可能不为0,具体见Option.MaxWorkerNum
}

func GetCurrentStatus

func GetCurrentStatus() Status

type TaskFn

type TaskFn func(param ...interface{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL