goroutine_pool

package module
v0.0.0-...-34a2e4c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2023 License: MIT Imports: 7 Imported by: 0

README

Golang协程池(Goroutine Pool)

无需创建线程池,全局限制最大运行的协程数(根据内存自动调整)

空闲释放

任务模式

两种任务模式:

  • 提交任务payload和worker,使用相同的worker运行不同的payload
  • 忽略payload的概念,直接运行worker函数

兼容复杂任务场景,多个Pool自由搭建DAG

一个收缩的例子

TODO

  • 支持往当前池子增加任务

Documentation

Index

Constants

View Source
const (

	// ConsumerStateIdle 消费者处于空闲状态
	ConsumerStateIdle = iota

	// ConsumerStateBusy 消费者在忙着处理任务
	ConsumerStateBusy

	// ConsumerStateShutdown 消费者已经退出,不会再参与任务处理
	ConsumerStateShutdown
)
View Source
const (

	// TaskQueueMaxLengthUnlimited 表示队列中等待执行的任务的数量时没有限制的,会随着任务的提交自动增长
	TaskQueueMaxLengthUnlimited = 0

	// DefaultTaskQueueMaxLength 默认的任务队列的最大长度,当任务队列中挤压数超过此数量时提交任务时就会卡住直到任务队列有空闲或者超时
	DefaultTaskQueueMaxLength = 100000
)
View Source
const (

	// DefaultInitConsumerNum 初始化的任务数量
	DefaultInitConsumerNum = 100

	// DefaultMaxConsumerNum 最大消费者的数量
	DefaultMaxConsumerNum = 100

	// DefaultMinConsumerNum 最小的消费者数量
	DefaultMinConsumerNum = 100

	// DefaultConsumerMaxIdle 任务空闲多久之后会被释放掉
	DefaultConsumerMaxIdle = time.Minute * 5
)

空闲任务的参数控制

View Source
const (

	// DefaultConsumerIdleCheckInterval 默认情况下隔多久检查一下消费者是否处于空闲状态
	DefaultConsumerIdleCheckInterval = time.Minute

	// DefaultRunTaskTimeout 默认情况下任务运行多久认为是超时了
	DefaultRunTaskTimeout = time.Minute * 5
)

Variables

View Source
var (
	// ErrPayloadConsumeFuncNil 项执行payload类型的任务,但是没有设置PayloadConsumeFunc,只能返回错误了
	ErrPayloadConsumeFuncNil = errors.New("Options.TaskPayloadConsumeFunc nil")

	// ErrPoolNotFound 协程池不存在
	ErrPoolNotFound = errors.New("pool not found")
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {

	// 当前消费者自己独享的一个存储空间,可以进行一些存储之类的,比如每个消费者初始化一个数据库连接之类的或其他资源,还可避免出现锁竞争的情况
	*ConsumerStorage
	// contains filtered or unexported fields
}

Consumer 用于处理协程池中的任务的一个消费者协程

func NewConsumer

func NewConsumer(pool *GoroutinePool) *Consumer

NewConsumer 创建一个新的消费者

func (*Consumer) Await

func (x *Consumer) Await()

func (*Consumer) Consume

func (x *Consumer) Consume(pool *GoroutinePool)

Consume 阻塞性的任务,会一直消费给定的池子中的任务直到自己被关闭或者任务队列消费完毕

func (*Consumer) Idle

func (x *Consumer) Idle() time.Duration

Idle 当前消费者距离上次有任务已经过去多长时间了

func (*Consumer) IsIdle

func (x *Consumer) IsIdle() bool

IsIdle 当前消费者是否处于闲置状态

func (*Consumer) IsShutdown

func (x *Consumer) IsShutdown() bool

IsShutdown 当前消费者是否处于关闭状态

func (*Consumer) Shutdown

func (x *Consumer) Shutdown()

Shutdown 让此消费者退出,如果退出的时候有任务正在执行,则会执行完手头的任务再退出

func (*Consumer) State

func (x *Consumer) State() ConsumerState

State 获取消费者的状态

type ConsumerManager

type ConsumerManager struct {
	// contains filtered or unexported fields
}

ConsumerManager 消费者管理器,用来控制消费者的生生死死

func NewConsumerManager

func NewConsumerManager(pool *GoroutinePool) *ConsumerManager

func (*ConsumerManager) Await

func (x *ConsumerManager) Await()

Await 等待所有消费者都退出后自己也退出

func (*ConsumerManager) Run

func (x *ConsumerManager) Run()

Run 启动消费者管理器,开始替协程池打工,管理它名下的消费者

func (*ConsumerManager) Shutdown

func (x *ConsumerManager) Shutdown()

Shutdown 关闭管理器,同时强制关闭所有管理器创建的消费者

type ConsumerState

type ConsumerState int

ConsumerState 用于表示消费者的状态信息

type ConsumerStorage

type ConsumerStorage struct {
	// contains filtered or unexported fields
}

ConsumerStorage 保证每个worker都有自己单独的Storage空间用来暂存一些东西 Note: 非线程安全,必须保证所有操作都在同一个协程中

func NewWorkerStorage

func NewWorkerStorage() *ConsumerStorage

NewWorkerStorage 创建一个用来存储数据的东东

func (*ConsumerStorage) Clear

func (x *ConsumerStorage) Clear()

Clear 清空存储空间

func (*ConsumerStorage) List

func (x *ConsumerStorage) List() []*tuple.Tuple2[string, any]

List 列出当前存储的所有KV对

func (*ConsumerStorage) Load

func (x *ConsumerStorage) Load(key string) any

Load 从Consumer的存储空间读取内容

func (*ConsumerStorage) LoadString

func (x *ConsumerStorage) LoadString(key string) string

func (*ConsumerStorage) Size

func (x *ConsumerStorage) Size() int

Size 返回当前存储的大小

func (*ConsumerStorage) Store

func (x *ConsumerStorage) Store(key string, value any)

Store 把内容暂存到Consumer的存储空间中,相当于是一个简单的KV数据库

type CreateGoroutinePoolOptions

type CreateGoroutinePoolOptions struct {

	// 可以为pool取一个更便于理解的名字,如果没有取的话,则会默认生成一个,这个名字在开启监控的时候可以用来区分不同的协程池
	PoolName string

	// 任务队列的最大长度,当任务队列中挤压的任务数量超过这个数字时,新提交的任务就会卡住,直到任务队列有名额或者提交超时
	PoolTaskQueueMaxLength uint64

	// 使用payload形式提交的任务需要的运行函数,这样提交任务的时候就只需要提交任务参数就可以了
	TaskPayloadConsumeFunc TaskPayloadConsumeFunc

	// 下面这几个参数是当协程池中的协程消费者的数量需要动态的调整的时候,用来控制如何调整的
	// 初始化的时候有几个消费者在执行,协程池创建的时候就会启动这么多的消费者
	InitConsumerNum uint64
	// 最大的消费者数量,即使任务挤压再多,启动的消费者的数量也不会超过这个数量限制
	MaxConsumerNum uint64
	// 最小工作的消费者数量,
	MinConsumerNum uint64
	// 当worker空闲超出多长时间之后将其释放掉
	ConsumerMaxIdle time.Duration

	// 当执行的任务返回error的时候的回调方法
	TaskErrorCallback func(ctx context.Context, pool *GoroutinePool, consumer *Consumer, task *Task, err error)

	// 每个Worker初始化的时候调用一次,当返回的error不为空的会放弃启动此Consumer
	ConsumerInitCallback func(ctx context.Context, pool *GoroutinePool, consumer *Consumer) error

	// 每个Worker退出的时候调用一次,包括正常退出和空闲太久被退出
	ConsumerExitCallback func(ctx context.Context, pool *GoroutinePool, consumer *Consumer)

	// TODO 空闲检查每隔多长时间进行一次
	ConsumerIdleCheckInterval time.Duration

	// Worker执行任务的时候是否开启全panic捕获,防止异常退出,
	RunTaskEnablePanicRecovery *bool
	// 当执行任务的时候发生panic的时候会执行此方法
	RunTaskEnablePanicRecoveryFunc func(ctx context.Context, pool *GoroutinePool, consumer *Consumer, task *Task, recoveryResult any)
	RunTaskTimeout                 time.Duration
}

CreateGoroutinePoolOptions 创建协程池的各种选项,用于深度定制协程池

func NewCreateGoroutinePoolOptions

func NewCreateGoroutinePoolOptions() *CreateGoroutinePoolOptions

func (*CreateGoroutinePoolOptions) SetConsumerExitCallback

func (x *CreateGoroutinePoolOptions) SetConsumerExitCallback(consumerExitCallback func(ctx context.Context, pool *GoroutinePool, consumer *Consumer)) *CreateGoroutinePoolOptions

func (*CreateGoroutinePoolOptions) SetConsumerInitCallback

func (x *CreateGoroutinePoolOptions) SetConsumerInitCallback(consumerInitCallback func(ctx context.Context, pool *GoroutinePool, consumer *Consumer) error) *CreateGoroutinePoolOptions

func (*CreateGoroutinePoolOptions) SetConsumerMaxIdle

func (x *CreateGoroutinePoolOptions) SetConsumerMaxIdle(consumerMaxIdle time.Duration) *CreateGoroutinePoolOptions

func (*CreateGoroutinePoolOptions) SetInitConsumerNum

func (x *CreateGoroutinePoolOptions) SetInitConsumerNum(initConsumerNum uint64) *CreateGoroutinePoolOptions

func (*CreateGoroutinePoolOptions) SetMaxConsumerNum

func (x *CreateGoroutinePoolOptions) SetMaxConsumerNum(maxConsumerNum uint64) *CreateGoroutinePoolOptions

func (*CreateGoroutinePoolOptions) SetMinConsumerNum

func (x *CreateGoroutinePoolOptions) SetMinConsumerNum(minConsumerNum uint64) *CreateGoroutinePoolOptions

func (*CreateGoroutinePoolOptions) SetPoolName

func (*CreateGoroutinePoolOptions) SetPoolTaskQueueMaxLength

func (x *CreateGoroutinePoolOptions) SetPoolTaskQueueMaxLength(taskQueueMaxLength uint64) *CreateGoroutinePoolOptions

func (*CreateGoroutinePoolOptions) SetRunTaskEnablePanicRecovery

func (x *CreateGoroutinePoolOptions) SetRunTaskEnablePanicRecovery(runTaskEnablePanicRecovery *bool) *CreateGoroutinePoolOptions

func (*CreateGoroutinePoolOptions) SetRunTaskEnablePanicRecoveryFunc

func (x *CreateGoroutinePoolOptions) SetRunTaskEnablePanicRecoveryFunc(runTaskEnablePanicRecoveryFunc func(ctx context.Context, pool *GoroutinePool, consumer *Consumer, task *Task, recoveryResult any)) *CreateGoroutinePoolOptions

func (*CreateGoroutinePoolOptions) SetRunTaskTimeout

func (x *CreateGoroutinePoolOptions) SetRunTaskTimeout(runTaskTimeout time.Duration) *CreateGoroutinePoolOptions

func (*CreateGoroutinePoolOptions) SetTaskErrorCallback

func (x *CreateGoroutinePoolOptions) SetTaskErrorCallback(taskErrorCallback func(ctx context.Context, pool *GoroutinePool, consumer *Consumer, task *Task, err error)) *CreateGoroutinePoolOptions

func (*CreateGoroutinePoolOptions) SetTaskPayloadConsumeFunc

func (x *CreateGoroutinePoolOptions) SetTaskPayloadConsumeFunc(taskPayloadConsumeFunc TaskPayloadConsumeFunc) *CreateGoroutinePoolOptions

type GoroutinePool

type GoroutinePool struct {

	// 协程池的运行参数
	Options *CreateGoroutinePoolOptions
	// contains filtered or unexported fields
}

GoroutinePool 协程池 泛型参数是任务的payload的类型

func NewGoroutinePool

func NewGoroutinePool(options *CreateGoroutinePoolOptions) (*GoroutinePool, error)

NewGoroutinePool 创建一个协程池

func NewGoroutinePoolWithDefaultOptions

func NewGoroutinePoolWithDefaultOptions() *GoroutinePool

NewGoroutinePoolWithDefaultOptions 创建协程池,使用默认的选项

func (*GoroutinePool) AddNextPool

func (x *GoroutinePool) AddNextPool(pool *GoroutinePool) *GoroutinePool

AddNextPool 协程池允许桥接,桥接的协程池可以把任务发送到下一个池子中,这样当有很多个互相依赖的任务的时候,就可以自己用线程池搭建DAG

func (*GoroutinePool) IsShutdown

func (x *GoroutinePool) IsShutdown() bool

IsShutdown 支持是否已经关闭提交任务

func (*GoroutinePool) RemoveNextPool

func (x *GoroutinePool) RemoveNextPool(pool *GoroutinePool) *GoroutinePool

RemoveNextPool 移除next pool

func (*GoroutinePool) ShutdownAndAwait

func (x *GoroutinePool) ShutdownAndAwait()

ShutdownAndAwait 关闭当前线程池,并等待当前线程池运行完毕

func (*GoroutinePool) SubmitNextTaskFunc

func (x *GoroutinePool) SubmitNextTaskFunc(ctx context.Context, taskFunc TaskFunc) error

func (*GoroutinePool) SubmitNextTaskFuncByFunc

func (x *GoroutinePool) SubmitNextTaskFuncByFunc(ctx context.Context, chooseNextPoolFunc func(pool *GoroutinePool) bool, taskFunc TaskFunc) error

SubmitNextTaskFuncByFunc 往下一个阶段的池子中提交函数类型的任务

func (*GoroutinePool) SubmitNextTaskFuncByPoolName

func (x *GoroutinePool) SubmitNextTaskFuncByPoolName(ctx context.Context, poolName string, taskFunc TaskFunc) error

SubmitNextTaskFuncByPoolName 根据协程池的名字提交后续的任务,当DAG有多个后续协程池时使用

func (*GoroutinePool) SubmitNextTaskPayload

func (x *GoroutinePool) SubmitNextTaskPayload(ctx context.Context, taskPayload any) error

func (*GoroutinePool) SubmitNextTaskPayloadByFunc

func (x *GoroutinePool) SubmitNextTaskPayloadByFunc(ctx context.Context, chooseNextPoolFunc func(pool *GoroutinePool) bool, taskPayload any) error

SubmitNextTaskPayloadByFunc 往下一个阶段的池子中提交任务,如果下一个阶段的池子有多个,则每个都会被提交一个任务 ctx: 超时 Task: 任务的payload

func (*GoroutinePool) SubmitNextTaskPayloadByPoolName

func (x *GoroutinePool) SubmitNextTaskPayloadByPoolName(ctx context.Context, poolName string, taskPayload any) error

SubmitNextTaskPayloadByPoolName 根据协程池的名称提交Payload类型的任务,当DAG有多个后续协程池时使用

func (*GoroutinePool) SubmitTask

func (x *GoroutinePool) SubmitTask(ctx context.Context, task *Task) error

SubmitTask 提交一个任务到任务队列

func (*GoroutinePool) SubmitTaskByFunc

func (x *GoroutinePool) SubmitTaskByFunc(ctx context.Context, taskFunc TaskFunc) error

SubmitTaskByFunc 提交一个函数作为任务运行

func (*GoroutinePool) SubmitTaskByPayload

func (x *GoroutinePool) SubmitTaskByPayload(ctx context.Context, taskPayload any) error

SubmitTaskByPayload 提交一个payload类型的任务

func (*GoroutinePool) TaskQueueSize

func (x *GoroutinePool) TaskQueueSize() int

TaskQueueSize 任务队列中还剩下多少个任务没有执行

type Task

type Task struct {

	// 任务的类型
	TaskType TaskType

	// 任务是一个直接运行的函数,那就直接运行
	TaskFunc TaskFunc

	// 任务是提交的一些值参数,传递给额外配置的worker来运行
	TaskPayload any
}

Task 表示一个待运行的任务

func NewTaskByFunc

func NewTaskByFunc(taskFunc TaskFunc) *Task

func NewTaskByPayload

func NewTaskByPayload(taskPayload any) *Task

type TaskFunc

type TaskFunc func(ctx context.Context, pool *GoroutinePool, worker *Consumer) error

TaskFunc 函数类型的任务,提交一个符合此签名的任务到队列中作为一个任务,被调度的时候会执行此任务 如果此任务有参数需要传递,通过闭包来传递,暂不支持通过形参传递 ctx: Context pool: 执行任务的池子 worker: 执行任务的consumer

type TaskPayloadConsumeFunc

type TaskPayloadConsumeFunc func(ctx context.Context, pool *GoroutinePool, worker *Consumer, taskPayload any) error

TaskPayloadConsumeFunc payload类型的任务,需要一个公共的能够执行payload的函数,这个函数需要符合此签名 ctx: Context pool: 执行任务的池子 worker: 执行任务的consumer taskPayload: 被执行的任务的payload,任务的每个实例都会不同,比如可能是爬虫任务的url,可能是etl任务的每条数据的id

type TaskType

type TaskType int

TaskType 任务类型,支持函数类型和payload类型

const (

	// TaskTypeNone 未指定类型时的初始值
	TaskTypeNone TaskType = iota

	// TaskTypeFunc 函数类型的任务就是向队列中提交一个函数,任务的逻辑是封装在这个函数中的,执行任务的时候会直接执行这个函数
	TaskTypeFunc

	// TaskTypePayload 值类型的任务,封装的是任务的一些参数,需要依赖ConsumerPayloadFunc来执行
	TaskTypePayload
)

Directories

Path Synopsis
example
dag

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL