Documentation
¶
Index ¶
- Constants
- Variables
- type Consumer
- type ConsumerManager
- type ConsumerState
- type ConsumerStorage
- type CreateGoroutinePoolOptions
- func (x *CreateGoroutinePoolOptions) SetConsumerExitCallback(...) *CreateGoroutinePoolOptions
- func (x *CreateGoroutinePoolOptions) SetConsumerInitCallback(...) *CreateGoroutinePoolOptions
- func (x *CreateGoroutinePoolOptions) SetConsumerMaxIdle(consumerMaxIdle time.Duration) *CreateGoroutinePoolOptions
- func (x *CreateGoroutinePoolOptions) SetInitConsumerNum(initConsumerNum uint64) *CreateGoroutinePoolOptions
- func (x *CreateGoroutinePoolOptions) SetMaxConsumerNum(maxConsumerNum uint64) *CreateGoroutinePoolOptions
- func (x *CreateGoroutinePoolOptions) SetMinConsumerNum(minConsumerNum uint64) *CreateGoroutinePoolOptions
- func (x *CreateGoroutinePoolOptions) SetPoolName(poolName string) *CreateGoroutinePoolOptions
- func (x *CreateGoroutinePoolOptions) SetPoolTaskQueueMaxLength(taskQueueMaxLength uint64) *CreateGoroutinePoolOptions
- func (x *CreateGoroutinePoolOptions) SetRunTaskEnablePanicRecovery(runTaskEnablePanicRecovery *bool) *CreateGoroutinePoolOptions
- func (x *CreateGoroutinePoolOptions) SetRunTaskEnablePanicRecoveryFunc(...) *CreateGoroutinePoolOptions
- func (x *CreateGoroutinePoolOptions) SetRunTaskTimeout(runTaskTimeout time.Duration) *CreateGoroutinePoolOptions
- func (x *CreateGoroutinePoolOptions) SetTaskErrorCallback(...) *CreateGoroutinePoolOptions
- func (x *CreateGoroutinePoolOptions) SetTaskPayloadConsumeFunc(taskPayloadConsumeFunc TaskPayloadConsumeFunc) *CreateGoroutinePoolOptions
- type GoroutinePool
- func (x *GoroutinePool) AddNextPool(pool *GoroutinePool) *GoroutinePool
- func (x *GoroutinePool) IsShutdown() bool
- func (x *GoroutinePool) RemoveNextPool(pool *GoroutinePool) *GoroutinePool
- func (x *GoroutinePool) ShutdownAndAwait()
- func (x *GoroutinePool) SubmitNextTaskFunc(ctx context.Context, taskFunc TaskFunc) error
- func (x *GoroutinePool) SubmitNextTaskFuncByFunc(ctx context.Context, chooseNextPoolFunc func(pool *GoroutinePool) bool, ...) error
- func (x *GoroutinePool) SubmitNextTaskFuncByPoolName(ctx context.Context, poolName string, taskFunc TaskFunc) error
- func (x *GoroutinePool) SubmitNextTaskPayload(ctx context.Context, taskPayload any) error
- func (x *GoroutinePool) SubmitNextTaskPayloadByFunc(ctx context.Context, chooseNextPoolFunc func(pool *GoroutinePool) bool, ...) error
- func (x *GoroutinePool) SubmitNextTaskPayloadByPoolName(ctx context.Context, poolName string, taskPayload any) error
- func (x *GoroutinePool) SubmitTask(ctx context.Context, task *Task) error
- func (x *GoroutinePool) SubmitTaskByFunc(ctx context.Context, taskFunc TaskFunc) error
- func (x *GoroutinePool) SubmitTaskByPayload(ctx context.Context, taskPayload any) error
- func (x *GoroutinePool) TaskQueueSize() int
- type Task
- type TaskFunc
- type TaskPayloadConsumeFunc
- type TaskType
Constants ¶
const ( // ConsumerStateIdle 消费者处于空闲状态 ConsumerStateIdle = iota // ConsumerStateBusy 消费者在忙着处理任务 ConsumerStateBusy // ConsumerStateShutdown 消费者已经退出,不会再参与任务处理 ConsumerStateShutdown )
const ( // TaskQueueMaxLengthUnlimited 表示队列中等待执行的任务的数量时没有限制的,会随着任务的提交自动增长 TaskQueueMaxLengthUnlimited = 0 // DefaultTaskQueueMaxLength 默认的任务队列的最大长度,当任务队列中挤压数超过此数量时提交任务时就会卡住直到任务队列有空闲或者超时 DefaultTaskQueueMaxLength = 100000 )
const ( // DefaultInitConsumerNum 初始化的任务数量 DefaultInitConsumerNum = 100 // DefaultMaxConsumerNum 最大消费者的数量 DefaultMaxConsumerNum = 100 // DefaultMinConsumerNum 最小的消费者数量 DefaultMinConsumerNum = 100 // DefaultConsumerMaxIdle 任务空闲多久之后会被释放掉 DefaultConsumerMaxIdle = time.Minute * 5 )
空闲任务的参数控制
const ( // DefaultConsumerIdleCheckInterval 默认情况下隔多久检查一下消费者是否处于空闲状态 DefaultConsumerIdleCheckInterval = time.Minute // DefaultRunTaskTimeout 默认情况下任务运行多久认为是超时了 DefaultRunTaskTimeout = time.Minute * 5 )
Variables ¶
var ( // ErrPayloadConsumeFuncNil 项执行payload类型的任务,但是没有设置PayloadConsumeFunc,只能返回错误了 ErrPayloadConsumeFuncNil = errors.New("Options.TaskPayloadConsumeFunc nil") // ErrPoolNotFound 协程池不存在 ErrPoolNotFound = errors.New("pool not found") )
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct { // 当前消费者自己独享的一个存储空间,可以进行一些存储之类的,比如每个消费者初始化一个数据库连接之类的或其他资源,还可避免出现锁竞争的情况 *ConsumerStorage // contains filtered or unexported fields }
Consumer 用于处理协程池中的任务的一个消费者协程
func (*Consumer) Consume ¶
func (x *Consumer) Consume(pool *GoroutinePool)
Consume 阻塞性的任务,会一直消费给定的池子中的任务直到自己被关闭或者任务队列消费完毕
type ConsumerManager ¶
type ConsumerManager struct {
// contains filtered or unexported fields
}
ConsumerManager 消费者管理器,用来控制消费者的生生死死
func NewConsumerManager ¶
func NewConsumerManager(pool *GoroutinePool) *ConsumerManager
func (*ConsumerManager) Shutdown ¶
func (x *ConsumerManager) Shutdown()
Shutdown 关闭管理器,同时强制关闭所有管理器创建的消费者
type ConsumerStorage ¶
type ConsumerStorage struct {
// contains filtered or unexported fields
}
ConsumerStorage 保证每个worker都有自己单独的Storage空间用来暂存一些东西 Note: 非线程安全,必须保证所有操作都在同一个协程中
func (*ConsumerStorage) List ¶
func (x *ConsumerStorage) List() []*tuple.Tuple2[string, any]
List 列出当前存储的所有KV对
func (*ConsumerStorage) Load ¶
func (x *ConsumerStorage) Load(key string) any
Load 从Consumer的存储空间读取内容
func (*ConsumerStorage) LoadString ¶
func (x *ConsumerStorage) LoadString(key string) string
func (*ConsumerStorage) Store ¶
func (x *ConsumerStorage) Store(key string, value any)
Store 把内容暂存到Consumer的存储空间中,相当于是一个简单的KV数据库
type CreateGoroutinePoolOptions ¶
type CreateGoroutinePoolOptions struct { // 可以为pool取一个更便于理解的名字,如果没有取的话,则会默认生成一个,这个名字在开启监控的时候可以用来区分不同的协程池 PoolName string // 任务队列的最大长度,当任务队列中挤压的任务数量超过这个数字时,新提交的任务就会卡住,直到任务队列有名额或者提交超时 PoolTaskQueueMaxLength uint64 // 使用payload形式提交的任务需要的运行函数,这样提交任务的时候就只需要提交任务参数就可以了 TaskPayloadConsumeFunc TaskPayloadConsumeFunc // 下面这几个参数是当协程池中的协程消费者的数量需要动态的调整的时候,用来控制如何调整的 // 初始化的时候有几个消费者在执行,协程池创建的时候就会启动这么多的消费者 InitConsumerNum uint64 // 最大的消费者数量,即使任务挤压再多,启动的消费者的数量也不会超过这个数量限制 MaxConsumerNum uint64 // 最小工作的消费者数量, MinConsumerNum uint64 // 当worker空闲超出多长时间之后将其释放掉 ConsumerMaxIdle time.Duration // 当执行的任务返回error的时候的回调方法 TaskErrorCallback func(ctx context.Context, pool *GoroutinePool, consumer *Consumer, task *Task, err error) // 每个Worker初始化的时候调用一次,当返回的error不为空的会放弃启动此Consumer ConsumerInitCallback func(ctx context.Context, pool *GoroutinePool, consumer *Consumer) error // 每个Worker退出的时候调用一次,包括正常退出和空闲太久被退出 ConsumerExitCallback func(ctx context.Context, pool *GoroutinePool, consumer *Consumer) // TODO 空闲检查每隔多长时间进行一次 ConsumerIdleCheckInterval time.Duration // Worker执行任务的时候是否开启全panic捕获,防止异常退出, RunTaskEnablePanicRecovery *bool // 当执行任务的时候发生panic的时候会执行此方法 RunTaskEnablePanicRecoveryFunc func(ctx context.Context, pool *GoroutinePool, consumer *Consumer, task *Task, recoveryResult any) RunTaskTimeout time.Duration }
CreateGoroutinePoolOptions 创建协程池的各种选项,用于深度定制协程池
func NewCreateGoroutinePoolOptions ¶
func NewCreateGoroutinePoolOptions() *CreateGoroutinePoolOptions
func (*CreateGoroutinePoolOptions) SetConsumerExitCallback ¶
func (x *CreateGoroutinePoolOptions) SetConsumerExitCallback(consumerExitCallback func(ctx context.Context, pool *GoroutinePool, consumer *Consumer)) *CreateGoroutinePoolOptions
func (*CreateGoroutinePoolOptions) SetConsumerInitCallback ¶
func (x *CreateGoroutinePoolOptions) SetConsumerInitCallback(consumerInitCallback func(ctx context.Context, pool *GoroutinePool, consumer *Consumer) error) *CreateGoroutinePoolOptions
func (*CreateGoroutinePoolOptions) SetConsumerMaxIdle ¶
func (x *CreateGoroutinePoolOptions) SetConsumerMaxIdle(consumerMaxIdle time.Duration) *CreateGoroutinePoolOptions
func (*CreateGoroutinePoolOptions) SetInitConsumerNum ¶
func (x *CreateGoroutinePoolOptions) SetInitConsumerNum(initConsumerNum uint64) *CreateGoroutinePoolOptions
func (*CreateGoroutinePoolOptions) SetMaxConsumerNum ¶
func (x *CreateGoroutinePoolOptions) SetMaxConsumerNum(maxConsumerNum uint64) *CreateGoroutinePoolOptions
func (*CreateGoroutinePoolOptions) SetMinConsumerNum ¶
func (x *CreateGoroutinePoolOptions) SetMinConsumerNum(minConsumerNum uint64) *CreateGoroutinePoolOptions
func (*CreateGoroutinePoolOptions) SetPoolName ¶
func (x *CreateGoroutinePoolOptions) SetPoolName(poolName string) *CreateGoroutinePoolOptions
func (*CreateGoroutinePoolOptions) SetPoolTaskQueueMaxLength ¶
func (x *CreateGoroutinePoolOptions) SetPoolTaskQueueMaxLength(taskQueueMaxLength uint64) *CreateGoroutinePoolOptions
func (*CreateGoroutinePoolOptions) SetRunTaskEnablePanicRecovery ¶
func (x *CreateGoroutinePoolOptions) SetRunTaskEnablePanicRecovery(runTaskEnablePanicRecovery *bool) *CreateGoroutinePoolOptions
func (*CreateGoroutinePoolOptions) SetRunTaskEnablePanicRecoveryFunc ¶
func (x *CreateGoroutinePoolOptions) SetRunTaskEnablePanicRecoveryFunc(runTaskEnablePanicRecoveryFunc func(ctx context.Context, pool *GoroutinePool, consumer *Consumer, task *Task, recoveryResult any)) *CreateGoroutinePoolOptions
func (*CreateGoroutinePoolOptions) SetRunTaskTimeout ¶
func (x *CreateGoroutinePoolOptions) SetRunTaskTimeout(runTaskTimeout time.Duration) *CreateGoroutinePoolOptions
func (*CreateGoroutinePoolOptions) SetTaskErrorCallback ¶
func (x *CreateGoroutinePoolOptions) SetTaskErrorCallback(taskErrorCallback func(ctx context.Context, pool *GoroutinePool, consumer *Consumer, task *Task, err error)) *CreateGoroutinePoolOptions
func (*CreateGoroutinePoolOptions) SetTaskPayloadConsumeFunc ¶
func (x *CreateGoroutinePoolOptions) SetTaskPayloadConsumeFunc(taskPayloadConsumeFunc TaskPayloadConsumeFunc) *CreateGoroutinePoolOptions
type GoroutinePool ¶
type GoroutinePool struct { // 协程池的运行参数 Options *CreateGoroutinePoolOptions // contains filtered or unexported fields }
GoroutinePool 协程池 泛型参数是任务的payload的类型
func NewGoroutinePool ¶
func NewGoroutinePool(options *CreateGoroutinePoolOptions) (*GoroutinePool, error)
NewGoroutinePool 创建一个协程池
func NewGoroutinePoolWithDefaultOptions ¶
func NewGoroutinePoolWithDefaultOptions() *GoroutinePool
NewGoroutinePoolWithDefaultOptions 创建协程池,使用默认的选项
func (*GoroutinePool) AddNextPool ¶
func (x *GoroutinePool) AddNextPool(pool *GoroutinePool) *GoroutinePool
AddNextPool 协程池允许桥接,桥接的协程池可以把任务发送到下一个池子中,这样当有很多个互相依赖的任务的时候,就可以自己用线程池搭建DAG
func (*GoroutinePool) IsShutdown ¶
func (x *GoroutinePool) IsShutdown() bool
IsShutdown 支持是否已经关闭提交任务
func (*GoroutinePool) RemoveNextPool ¶
func (x *GoroutinePool) RemoveNextPool(pool *GoroutinePool) *GoroutinePool
RemoveNextPool 移除next pool
func (*GoroutinePool) ShutdownAndAwait ¶
func (x *GoroutinePool) ShutdownAndAwait()
ShutdownAndAwait 关闭当前线程池,并等待当前线程池运行完毕
func (*GoroutinePool) SubmitNextTaskFunc ¶
func (x *GoroutinePool) SubmitNextTaskFunc(ctx context.Context, taskFunc TaskFunc) error
func (*GoroutinePool) SubmitNextTaskFuncByFunc ¶
func (x *GoroutinePool) SubmitNextTaskFuncByFunc(ctx context.Context, chooseNextPoolFunc func(pool *GoroutinePool) bool, taskFunc TaskFunc) error
SubmitNextTaskFuncByFunc 往下一个阶段的池子中提交函数类型的任务
func (*GoroutinePool) SubmitNextTaskFuncByPoolName ¶
func (x *GoroutinePool) SubmitNextTaskFuncByPoolName(ctx context.Context, poolName string, taskFunc TaskFunc) error
SubmitNextTaskFuncByPoolName 根据协程池的名字提交后续的任务,当DAG有多个后续协程池时使用
func (*GoroutinePool) SubmitNextTaskPayload ¶
func (x *GoroutinePool) SubmitNextTaskPayload(ctx context.Context, taskPayload any) error
func (*GoroutinePool) SubmitNextTaskPayloadByFunc ¶
func (x *GoroutinePool) SubmitNextTaskPayloadByFunc(ctx context.Context, chooseNextPoolFunc func(pool *GoroutinePool) bool, taskPayload any) error
SubmitNextTaskPayloadByFunc 往下一个阶段的池子中提交任务,如果下一个阶段的池子有多个,则每个都会被提交一个任务 ctx: 超时 Task: 任务的payload
func (*GoroutinePool) SubmitNextTaskPayloadByPoolName ¶
func (x *GoroutinePool) SubmitNextTaskPayloadByPoolName(ctx context.Context, poolName string, taskPayload any) error
SubmitNextTaskPayloadByPoolName 根据协程池的名称提交Payload类型的任务,当DAG有多个后续协程池时使用
func (*GoroutinePool) SubmitTask ¶
func (x *GoroutinePool) SubmitTask(ctx context.Context, task *Task) error
SubmitTask 提交一个任务到任务队列
func (*GoroutinePool) SubmitTaskByFunc ¶
func (x *GoroutinePool) SubmitTaskByFunc(ctx context.Context, taskFunc TaskFunc) error
SubmitTaskByFunc 提交一个函数作为任务运行
func (*GoroutinePool) SubmitTaskByPayload ¶
func (x *GoroutinePool) SubmitTaskByPayload(ctx context.Context, taskPayload any) error
SubmitTaskByPayload 提交一个payload类型的任务
func (*GoroutinePool) TaskQueueSize ¶
func (x *GoroutinePool) TaskQueueSize() int
TaskQueueSize 任务队列中还剩下多少个任务没有执行
type Task ¶
type Task struct { // 任务的类型 TaskType TaskType // 任务是一个直接运行的函数,那就直接运行 TaskFunc TaskFunc // 任务是提交的一些值参数,传递给额外配置的worker来运行 TaskPayload any }
Task 表示一个待运行的任务
func NewTaskByFunc ¶
func NewTaskByPayload ¶
type TaskFunc ¶
type TaskFunc func(ctx context.Context, pool *GoroutinePool, worker *Consumer) error
TaskFunc 函数类型的任务,提交一个符合此签名的任务到队列中作为一个任务,被调度的时候会执行此任务 如果此任务有参数需要传递,通过闭包来传递,暂不支持通过形参传递 ctx: Context pool: 执行任务的池子 worker: 执行任务的consumer
type TaskPayloadConsumeFunc ¶
type TaskPayloadConsumeFunc func(ctx context.Context, pool *GoroutinePool, worker *Consumer, taskPayload any) error
TaskPayloadConsumeFunc payload类型的任务,需要一个公共的能够执行payload的函数,这个函数需要符合此签名 ctx: Context pool: 执行任务的池子 worker: 执行任务的consumer taskPayload: 被执行的任务的payload,任务的每个实例都会不同,比如可能是爬虫任务的url,可能是etl任务的每条数据的id