taskpool

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2022 License: GPL-3.0 Imports: 7 Imported by: 0

README

协程池

无论协程创建、销毁都比较耗资源,还是同时存在大量的协程会让协程M:N模型调度器更累

功能

核心协程:作为核心,当提交任务到协程池中,核心协程数足够则直接创建协程,期间直到服务停止;

任务队列(同步队列):当核心协程数达到后,不会立即启用业余协程,而是将任务放到任务队列中;

业余协程:业余协程在无任务情况下,允许存活一段时间;当队列满了之后之间启用业务协程;

拒绝策略:当任务队列、业余协程忙中,将采用某种拒绝策略;

停止协程池:慢性通知--先通知池停止提交任务,所有协程后台将所有任务处理完;

主动通知--停止提交任务,等待一段时间后台所有协程处理所有任务,再判断是否还有协程在跑;

终止协程池:把所有协程当前手上的任务都处理好,直接退出;

使用

详情请见go_routine_pool_test.go

func init() {
   log.SetGlobalLog("taskpool", true)
}

const TestSize = 100000

func TestGoRoutinePool(t *testing.T) {
   q := NewChanTaskQueue(10000)
   p := NewGoRoutinePool(2000, 4000, 3*time.Second, q, &CallerRunsPolicy{})
   var wg sync.WaitGroup
   wg.Add(TestSize)
   start := time.Now()
   log.Info("开始任务")
   fmt.Println("开始任务?")
   for i := 0; i < TestSize; i++ {
      ret, err := p.Execute(&testR{name: strconv.Itoa(i)})
      if err != nil {
         log.Info("p.Execute", log.Any("ret", ret), log.ErrorField(err))
      }
      if i%15000 == 0 {
         time.Sleep(1 * time.Second)
      }
      if p.queue.Size() == 10000 && p.largestPoolSize.Load() >= 3900 {
         time.Sleep(1 * time.Second)
      }
      wg.Done()
   }
   log.Info("start")
   wg.Wait()
   log.Info("end")
   if ret, err := p.AwaitTermination(3 * time.Second); err != nil {
      log.Info("终止条件失败", log.Any("ret", ret), log.ErrorField(err))
   }
   defer log.Info("完成任务数量", log.Any("", p.completedTaskCount.Load()))
   log.Info("总耗时:", log.Any("", time.Now().Sub(start)))
}

type testR struct {
	name string
}

func (t *testR) Run() (bool, error) {
	time.Sleep(1 * time.Second)
	log.Info(" ->", log.Any("name", t.name))
	return true, nil
}

设计

GoRoutine Worker

//	GoRoutineWork 执行者
type GoRoutineWork struct {
	core             bool
	allowCoreTimeOut bool //是否允许核心协程超时
	status           int
	name             string //工作者名称
	completedTasks   int64
	keepAliveTime    int64 //秒级时间
	task             Runnable
	pool             IGoRoutinePool
}

面向接口编程,提供扩展空间;

//	ITaskPool 线程池:核心数量、最大数量、业余队列存活时间&时间单元、同步队列&队列满后执行策略
type IGoRoutinePool interface {
	Execute(Runnable) (bool, error)
	IsShutdown() bool
	TryTerminate() (bool, error)
	AwaitTermination(unit time.Duration) (bool, error)
	GetTask(allowCoreThreadTimeOut bool) (Runnable, error) //从同步队列取任务
	AddTask(Runnable) (bool, error)                        //增加任务
	Queue() IGoRoutinePoolQueue
	RemoveWorker(string, bool) (bool, error) //取出任务队列
	AddCompletedTasks(int64) bool            //	增加已经完成的任务数
}

队列主要存放Runnable实现类

//	ITaskPoolQueue 线程安全 Put Pool
type IGoRoutinePoolQueue interface {
	IsEmpty() bool
	Size() int64
	//	直接存任务,可能会失败
	Add(r Runnable) (bool, error)
	//	直接取任务,可能为空
	Take() (Runnable, error)
	//	尝试一定时间内放任务
	Offer(r Runnable, unit time.Duration) (bool, error)
	//	尝试一定时间取任务
	Poll(unit time.Duration) (Runnable, error)
}
//	协程池满了,队列满了,要怎么拒绝任务呢?
type IGoRoutinePoolRejectedPolicy interface {
	RejectedExecution(Runnable, IGoRoutinePool) (bool, error)
}
type Runnable interface {
	Run() (bool, error)
}

Documentation

Index

Constants

View Source
const (
	Init                 = 0
	Running              = 1
	CorePoolRunning      = 2
	MaxPoolRunning       = 3
	Terminate            = 4
	Stop                 = 5
	TerminateNoCompleted = Terminate<<5 + Terminate
	TerminateCompleted   = Terminate<<6 + Terminate
	StopNoCompleted      = Stop<<5 + Stop
	StopCompleted        = Stop<<6 + Stop
)

Variables

View Source
var AbortPolicyError = errors.New("abort reject execution")
View Source
var CallerRunsPolicyError = errors.New("caller runs policy reject execution")
View Source
var DiscardOldestPolicyError = errors.New("discard oldest policy reject execution")
View Source
var DiscardPolicyPolicyError = errors.New("discard policy reject execution")
View Source
var ExecuteAddWorkerError = errors.New("execute add worker error")
View Source
var ExecuteRunnableError = errors.New("execute runnable error")
View Source
var Nil = errors.New("anything is nil")
View Source
var PoolStatusError = errors.New("pool status error")
View Source
var PoolStopStatusError = errors.New("pool stop status error")
View Source
var PoolTerminateStatusError = errors.New("pool terminate status error")
View Source
var QueueEmpty = errors.New("Queue empty")
View Source
var QueueFull = errors.New("Queue full")
View Source
var RunnableNil = errors.New("runnable is nil")

Functions

func GoRoutinePool

func GoRoutinePool(allowCoreThreadTimeOut bool,
	corePoolSize, maxPoolSize int64,
	keepAliveTime time.Duration,
	queue IGoRoutinePoolQueue,
	policy IGoRoutinePoolRejectedPolicy) *goRoutinePool

func NewChanTaskQueue

func NewChanTaskQueue(maxQueueSize int64) *chanTaskQueue

func NewGoRoutinePool

func NewGoRoutinePool(corePoolSize, maxPoolSize int64, keepAliveTime time.Duration, queue IGoRoutinePoolQueue, rejectedPolicy IGoRoutinePoolRejectedPolicy) *goRoutinePool

Types

type AbortPolicy

type AbortPolicy struct {
}

func (*AbortPolicy) RejectedExecution

func (p *AbortPolicy) RejectedExecution(Runnable, IGoRoutinePool) (bool, error)

type CallerRunsPolicy

type CallerRunsPolicy struct {
}

func (*CallerRunsPolicy) RejectedExecution

func (p *CallerRunsPolicy) RejectedExecution(r Runnable, pool IGoRoutinePool) (bool, error)

type DiscardOldestPolicy

type DiscardOldestPolicy struct {
}

DiscardOldestPolicy:把队列最老的丢了,再执行当前的

func (*DiscardOldestPolicy) RejectedExecution

func (p *DiscardOldestPolicy) RejectedExecution(r Runnable, pool IGoRoutinePool) (bool, error)

type DiscardPolicy

type DiscardPolicy struct {
}

DiscardPolicy:任务直接丢弃

func (*DiscardPolicy) RejectedExecution

func (p *DiscardPolicy) RejectedExecution(r Runnable, pool IGoRoutinePool) (bool, error)

type GoRoutineWork

type GoRoutineWork struct {
	// contains filtered or unexported fields
}

GoRoutineWork 执行者

func NewWork

func NewWork(task Runnable, name string, core bool, pool IGoRoutinePool) *GoRoutineWork

func (*GoRoutineWork) Work

func (w *GoRoutineWork) Work() (bool, error)

Run:核心任务允许不停下来

type IGoRoutinePool

type IGoRoutinePool interface {
	Execute(Runnable) (bool, error)
	IsShutdown() bool
	TryTerminate() (bool, error)
	AwaitTermination(unit time.Duration) (bool, error)
	GetTask(allowCoreThreadTimeOut bool) (Runnable, error) //从同步队列取任务
	AddTask(Runnable) (bool, error)                        //增加任务
	Queue() IGoRoutinePoolQueue
	RemoveWorker(string, bool) (bool, error) //取出任务队列
	AddCompletedTasks(int64) bool            //	增加已经完成的任务数
}

ITaskPool 线程池:核心数量、最大数量、业余队列存活时间&时间单元、同步队列&队列满后执行策略

type IGoRoutinePoolQueue

type IGoRoutinePoolQueue interface {
	IsEmpty() bool
	Size() int64
	//	直接存任务,可能会失败
	Add(r Runnable) (bool, error)
	//	直接取任务,可能为空
	Take() (Runnable, error)
	//	尝试一定时间内放任务
	Offer(r Runnable, unit time.Duration) (bool, error)
	//	尝试一定时间取任务
	Poll(unit time.Duration) (Runnable, error)
}

ITaskPoolQueue 线程安全 Put Pool

type IGoRoutinePoolRejectedPolicy

type IGoRoutinePoolRejectedPolicy interface {
	RejectedExecution(Runnable, IGoRoutinePool) (bool, error)
}

协程池满了,队列满了,要怎么拒绝任务呢?

type NewGoRoutineRunsPolicy

type NewGoRoutineRunsPolicy struct {
}

func (*NewGoRoutineRunsPolicy) RejectedExecution

func (p *NewGoRoutineRunsPolicy) RejectedExecution(r Runnable, pool IGoRoutinePool) (bool, error)

type Runnable

type Runnable interface {
	Run() (bool, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL