Documentation ¶
Overview ¶
Package workerpool implements a concurrency limiting worker pool. Worker routines are spawned on demand as tasks are submitted; up to the configured limit of concurrent workers.
When the limit of concurrently running workers is reached, submitting a task blocks until a worker is able to pick it up. This behavior is intentional as it prevents from accumulating tasks which could grow unbounded. Therefore, it is the responsibility of the caller to queue up tasks if that's the intended behavior.
One caveat is that while the number of concurrently running workers is limited, task results are not and they accumulate until they are collected. Therefore, if a large number of tasks can be expected, the workerpool should be periodically drained (e.g. every 10k tasks).
Example ¶
// SPDX-License-Identifier: Apache-2.0 // Copyright 2021 Authors of Cilium package main import ( "context" "fmt" "os" "runtime" "github.com/cilium/workerpool" ) // IsPrime returns true if n is prime, false otherwise. func IsPrime(n int64) bool { if n < 2 { return false } for p := int64(2); p*p <= n; p++ { if n%p == 0 { return false } } return true } func main() { wp := workerpool.New(runtime.NumCPU()) for i, n := 0, int64(1_000_000_000_000_000_000); n < 1_000_000_000_000_000_100; i, n = i+1, n+1 { n := n // https://golang.org/doc/faq#closures_and_goroutines id := fmt.Sprintf("task #%d", i) // Use Submit to submit tasks for processing. Submit blocks when no // worker is available to pick up the task. err := wp.Submit(id, func(_ context.Context) error { fmt.Println("isprime", n) if IsPrime(n) { fmt.Println(n, "is prime!") } return nil }) // Submit fails when the pool is closed (ErrClosed) or being drained // (ErrDrained). Check for the error when appropriate. if err != nil { fmt.Fprintln(os.Stderr, err) return } } // Drain prevents submitting new tasks and blocks until all submitted tasks // complete. tasks, err := wp.Drain() if err != nil { fmt.Fprintln(os.Stderr, err) return } // Iterating over the results is useful if non-nil errors can be expected. for _, task := range tasks { // Err returns the error that the task returned after execution. if err := task.Err(); err != nil { fmt.Println("task", task, "failed:", err) } } // Close should be called once the worker pool is no longer necessary. if err := wp.Close(); err != nil { fmt.Fprintln(os.Stderr, err) } }
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrDraining is returned when an operation is not possible because // draining is in progress. ErrDraining = errors.New("drain operation in progress") // ErrClosed is returned when operations are attempted after a call to Close. ErrClosed = errors.New("worker pool is closed") )
Functions ¶
This section is empty.
Types ¶
type Task ¶
type Task interface { // String returns the task identifier. fmt.Stringer // Err returns the error resulting from processing the // unit of work. Err() error }
Task is a unit of work.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool spawns, on demand, a number of worker routines to process submitted tasks concurrently. The number of concurrent routines never exceeds the specified limit.
func New ¶
func New(n int) *WorkerPool
New creates a new pool of workers where at most n workers process submitted tasks concurrently. New panics if n ≤ 0.
func (*WorkerPool) Cap ¶
func (wp *WorkerPool) Cap() int
Cap returns the concurrent workers capacity, see New().
func (*WorkerPool) Close ¶
func (wp *WorkerPool) Close() error
Close closes the worker pool, rendering it unable to process new tasks. Close sends the cancellation signal to any running task and waits for all workers, if any, to return. Close will return ErrClosed if it has already been called.
func (*WorkerPool) Drain ¶
func (wp *WorkerPool) Drain() ([]Task, error)
Drain waits until all tasks are completed. This operation prevents submitting new tasks to the worker pool. Drain returns the results of the tasks that have been processed. If a drain operation is already in progress, ErrDraining is returned. If the worker pool is closed, ErrClosed is returned.
func (*WorkerPool) Len ¶ added in v1.1.0
func (wp *WorkerPool) Len() int
Len returns the count of concurrent workers currently running.
func (*WorkerPool) Submit ¶
Submit submits f for processing by a worker. The given id is useful for identifying the task once it is completed. The task f must return when the context ctx is cancelled.
Submit blocks until a routine start processing the task.
If a drain operation is in progress, ErrDraining is returned and the task is not submitted for processing. If the worker pool is closed, ErrClosed is returned and the task is not submitted for processing.