Documentation
¶
Overview ¶
Package workerpool provides a handy and fast worker(goroutine) pool.
It is extremely useful when we facing "morestack" issue. Also some options can enable us to do lockless operations under some circumstances by using the worker id.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoWorkersAvaiable is returned if there is no workers available // in condition of both WaitIfNoWorkersAvailable and CreateIfNoWorkersAvailable are disabled. ErrNoWorkersAvaiable = fmt.Errorf("workerpool: not workers available") // ErrInvalidWorkerPool indicates WaitDone function has been called. ErrInvalidWorkerPool = fmt.Errorf("workerpool: invalid worker pool") )
Functions ¶
Types ¶
type Func ¶
Func is the type of the function called by worker in the pool. It is the caller's responsibility to recover the panic.
type Options ¶
type Options struct {
// Capacity specifies the maximum number of resident running workers(goroutines),
// 0 means no limit.
Capacity uint32
// IdleTimeout is the maximum amount of time a worker(goroutine) will
// remain idle before terminating itself. Zero means no limit, the workers
// never die if the pool is valid.
IdleTimeout time.Duration
// ResetInterval defines how often the worker(goroutine) must be restarted,
// zero to disable it.
// With this options enabled, a worker can reset its stack so that large stacks
// don't live in memory forever, 25% jitter will be applied.
ResetInterval time.Duration
// WaitIfNoWorkersAvailable will wait until there is a worker available
// if all resident workers are busy.
// It only works if the option Capacity greater than zero.
// This option will conflict with CreateIfNoWorkersAvailable.
WaitIfNoWorkersAvailable bool
// CreateIfNoWorkersAvailable will create an ephemeral worker only
// if all resident workers are busy.
// It only works if the option Capacity greater than zero and the option
// WaitIfNoWorkerAvailable is disabled.
CreateIfNoWorkersAvailable bool
// CreateWorkerID will inject a worker id into the context of Func.
// It may be useful, for example, we can use it to do some lockless operations
// under some circumstances when we have fixed number of workers and those workers live long enough.
CreateWorkerID bool
}
Options configurates the WorkerPool.
type Stats ¶
type Stats struct {
// ResidentWorkers counts the number of resident workers.
ResidentWorkers uint32
// EphemeralWorkers counts the number of ephemeral workers when
// the option CreateIfNoWorkersAvailable is enabled.
EphemeralWorkers uint32
// IdleWorkers counts all idle workers including any newly created workers.
IdleWorkers uint32
// PendingSubmits counts all pending Submit(*).
PendingSubmits uint32
}
Stats contains a list of worker counters.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool offers a pool of reusable workers(goroutines).
NOTE that the WorkerPool does not handle panics.
Example ¶
p := New(Options{
Capacity: 10,
IdleTimeout: 5 * time.Minute,
CreateIfNoWorkersAvailable: true,
})
count := uint32(0)
for i := 0; i < 100; i++ {
n := uint32(i + 1)
_ = p.Submit(context.TODO(), func(context.Context) {
atomic.AddUint32(&count, n)
})
}
_ = p.WaitDone(context.TODO())
fmt.Println(count)
Output: 5050
Example (LocklessOperation) ¶
p := New(Options{
Capacity: 8,
WaitIfNoWorkersAvailable: true,
CreateWorkerID: true,
})
values := make([]uint32, 8, 8)
for i := 0; i < 100; i++ {
n := uint32(i + 1)
_ = p.Submit(context.TODO(), func(ctx context.Context) {
id, ok := WorkerID(ctx)
if !ok {
panic("not possible")
}
// The worker id starts with 1.
values[id-1] += n
time.Sleep(10 * time.Millisecond) // Too fast, sleep for a while..
})
}
_ = p.WaitDone(context.TODO())
sum := uint32(0)
count := 0
for _, v := range values {
if v > 0 {
count++
}
sum += v
}
fmt.Println(count)
fmt.Println(sum)
Output: 8 5050
func New ¶
func New(opts Options) *WorkerPool
New creates a new WorkerPool. The pool with default(empty) Options has infinite workers and the workers never die.
func (*WorkerPool) Submit ¶
func (p *WorkerPool) Submit(ctx context.Context, fn Func) error
Submit submits a task and waits until it acquired by an available worker or wait until the context done if WaitIfNoWorkersAvailable enabled. The "same" ctx will be passed into Func. NOTE it panics if ctx==nil, pass context.TODO() or context.Background() instead.
func (*WorkerPool) SubmitConcurrentDependent ¶
func (p *WorkerPool) SubmitConcurrentDependent(ctx context.Context, fns ...Func) error
SubmitConcurrentDependent submits multiple *concurrent dependent* tasks and waits until all of them are acquired by available workers or wait until the context done if WaitIfNoWorkersAvailable enabled. The "same" ctx will be passed into Func.
func (*WorkerPool) WaitDone ¶
func (p *WorkerPool) WaitDone(ctx context.Context) error
WaitDone waits until all tasks done or the context done. The pool becomes unusable(read only) after this operation. If you want to wait multiple times, using an extra sync.WaitGroup. NOTE it panics if ctx==nil, pass context.TODO() or context.Background() instead.