Documentation
¶
Overview ¶
Package workerpool offers a convenient and efficient worker(goroutine) pool solution, featuring a straightforward concurrent pattern called "pipeline" for effortless integration and usage.
Index ¶
- Variables
- func GoSpawn(ctx context.Context, fn Func) error
- func WorkerID(ctx context.Context) (uint32, bool)
- func Wrap[In, Out any](p *WorkerPool, f func(context.Context, In) (Out, error)) func(context.Context, In) (Out, error)
- type AsyncExecutor
- type Func
- type None
- type Options
- type Pipeline
- func (p *Pipeline[In, Out]) Join() <-chan Out
- func (p *Pipeline[In, Out]) ProcessedCount() int
- func (p *Pipeline[In, Out]) StartFeeder(ctx context.Context, items []In) error
- func (p *Pipeline[In, Out]) StartFeederFunc(ctx context.Context, feedLoop func(context.Context, chan<- In)) error
- func (p *Pipeline[In, Out]) StartWorker(ctx context.Context, workOne func(context.Context, In) Out) error
- func (p *Pipeline[In, Out]) StartWorkerN(ctx context.Context, concurrency int, workOne func(context.Context, In) Out) error
- type PipelineOptions
- type Stats
- type WorkerPool
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoWorkersAvailable is returned if there is no workers available // in condition of both WaitIfNoWorkersAvailable and CreateIfNoWorkersAvailable are disabled. ErrNoWorkersAvailable = fmt.Errorf("workerpool: not workers available") // ErrInvalidWorkerPool indicates WaitDone function has been called. ErrInvalidWorkerPool = fmt.Errorf("workerpool: invalid worker pool") )
var ( // ErrPipelineFrozen means the pipeline does not accept any further operations // since Pipeline.Join has been called. ErrPipelineFrozen = fmt.Errorf("workerpool: pipeline is frozen") )
Functions ¶
func GoSpawn ¶ added in v1.1.0
GoSpawn is an implementation of AsyncExecutor that spawns a goroutine and directly executes the function (fn) within it.
func WorkerID ¶
WorkerID returns the worker id associated with this context. Only available if the option CreateWorkerID enabled. NOTE that the worker id always starts with 1.
func Wrap ¶ added in v1.1.0
func Wrap[In, Out any](p *WorkerPool, f func(context.Context, In) (Out, error)) func(context.Context, In) (Out, error)
Wrap wraps a function for ease of future use, allowing the wrapped function to be executed within the WorkerPool.
Example ¶
pool := New(Options{ Capacity: 8, WaitIfNoWorkersAvailable: true, }) increase := func(a int) int { return a + 1 } wrappedIncrease := Wrap(pool, func(_ context.Context, i int) (int, error) { return increase(i), nil }) count := 0 for i := 0; i < 100; i++ { count, _ = wrappedIncrease(context.TODO(), count) } _ = pool.WaitDone(context.TODO()) fmt.Println(count)
Output: 100
Types ¶
type AsyncExecutor ¶ added in v1.1.0
AsyncExecutor is a function type used for executing a function asynchronously.
type Func ¶
Func is the type of function called by worker in the pool. It is the caller's responsibility to recover the panic.
type None ¶ added in v1.1.0
type None struct{}
None is a placeholder for convinience if there is no parameters or no return value.
type Options ¶
type Options struct { // Capacity specifies the maximum number of resident running workers(goroutines), // 0 means no limit. Capacity uint32 // IdleTimeout is the maximum amount of time a worker(goroutine) will // remain idle before terminating itself. Zero means no limit, the workers // never die if the pool is valid. IdleTimeout time.Duration // ResetInterval defines how often the worker(goroutine) must be restarted, // zero to disable it. // With this options enabled, a worker can reset its stack so that large stacks // don't live in memory forever, 25% jitter will be applied. ResetInterval time.Duration // WaitIfNoWorkersAvailable will wait until there is a worker available // if all resident workers are busy. // It only works if the option Capacity greater than zero. // This option will conflict with CreateIfNoWorkersAvailable. WaitIfNoWorkersAvailable bool // CreateIfNoWorkersAvailable will create an ephemeral worker only // if all resident workers are busy. // It only works if the option Capacity greater than zero and the option // WaitIfNoWorkerAvailable is disabled. CreateIfNoWorkersAvailable bool // CreateWorkerID will inject a worker id into the context of Func. // It may be useful, for example, we can use it to do some lockless operations // under some circumstances when we have fixed number of workers and those workers live long enough. CreateWorkerID bool }
Options configure the WorkerPool.
type Pipeline ¶ added in v1.1.0
type Pipeline[In, Out any] struct { // contains filtered or unexported fields }
Pipeline utilizes an input and an output channel to create a concurrent processor with three stages.
Example ¶
pool := New(Options{ Capacity: 4, WaitIfNoWorkersAvailable: true, }) pipeline := NewPipelineWith[int, int](PipelineOptions{ FeederAsyncExecutor: GoSpawn, WorkerAsyncExecutor: pool.Submit, }) inputs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} _ = pipeline.StartFeeder(context.Background(), inputs) _ = pipeline.StartWorkerN(context.Background(), 4, func(_ context.Context, i int) int { return i * 2 }) sum := 0 for v := range pipeline.Join() { sum += v } fmt.Println("zero canceled:", len(inputs) == pipeline.ProcessedCount()) fmt.Println("sum:", sum) _ = pool.WaitDone(context.TODO()) // Clean up.
Output: zero canceled: true sum: 272
func NewPipeline ¶ added in v1.1.0
NewPipeline creates a new pipeline that utilizes fire-and-forget goroutines.
func NewPipelineWith ¶ added in v1.1.0
func NewPipelineWith[In, Out any](opts PipelineOptions) *Pipeline[In, Out]
NewPipelineWith creates a new Pipeline with PipelineOptions.
func (*Pipeline[In, Out]) Join ¶ added in v1.1.0
func (p *Pipeline[In, Out]) Join() <-chan Out
Join returns an output channel, the channel will be closed after all tasks are done. It is the caller's responsibility to check if all inputs are processed; the ProcessedCount variable serves this purpose. The pipeline is frozen after the join.
func (*Pipeline[In, Out]) ProcessedCount ¶ added in v1.1.0
ProcessedCount keeps track of the number of inputs that have been processed. The count is stable if the output channel has been closed.
func (*Pipeline[In, Out]) StartFeeder ¶ added in v1.1.0
StartFeeder initiates the feeding process of an array of inputs within the AsyncExecutor. The feeding process can be interrupted by the context.Context without any signal.
This method must be invoked prior to Join, failing which ErrPipelineFrozen will be returned.
func (*Pipeline[In, Out]) StartFeederFunc ¶ added in v1.1.0
func (p *Pipeline[In, Out]) StartFeederFunc(ctx context.Context, feedLoop func(context.Context, chan<- In)) error
StartFeeder initiates a feeding process within the asynchronous executor. It's important for the caller to check the context.Context inside the feedLoop and ensure that the feeding process is stopped properly.
This method must be invoked prior to Join, failing which ErrPipelineFrozen will be returned.
func (*Pipeline[In, Out]) StartWorker ¶ added in v1.1.0
func (p *Pipeline[In, Out]) StartWorker(ctx context.Context, workOne func(context.Context, In) Out) error
StartWorker initiates a worker to process the inputs within the AsyncExecutor. The worker will stop if either the context is done or all inputs have been processed.
This method must be invoked prior to Join, failing which ErrPipelineFrozen will be returned.
func (*Pipeline[In, Out]) StartWorkerN ¶ added in v1.1.0
func (p *Pipeline[In, Out]) StartWorkerN(ctx context.Context, concurrency int, workOne func(context.Context, In) Out) error
StartWorkerN starts N workers to process inputs within the AsyncExecutor. The workers will stop if either the context is done or all inputs have been processed.
This method must be invoked prior to Join, failing which ErrPipelineFrozen will be returned.
type PipelineOptions ¶ added in v1.1.0
type PipelineOptions struct { // FeederAsyncExecutor is the AsyncExecutor used by feeder. FeederAsyncExecutor AsyncExecutor // WorkerAsyncExecutor is the AsyncExecutor used by worker. WorkerAsyncExecutor AsyncExecutor // InputBufferSize is the buffer size of input channel. InputBufferSize int // OutputBufferSize is the buffer size of output channel. OutputBufferSize int }
PipelineOptions configure the Pipeline.
NOTE that if you enable the WaitIfNoWorkersAvailable option with a small Capacity and BufferSize while using the WorkerPool for both Feeder and Worker, it may result in a deadlock. Additionally, chaining multiple Pipelines under such circumstances may also cause a deadlock.
type Stats ¶
type Stats struct { // ResidentWorkers counts the number of resident workers. ResidentWorkers uint32 // EphemeralWorkers counts the number of ephemeral workers when // the option CreateIfNoWorkersAvailable is enabled. EphemeralWorkers uint32 // IdleWorkers counts all idle workers including any newly created workers. IdleWorkers uint32 // PendingSubmits counts all pending Submit(*). PendingSubmits uint32 }
Stats contains a list of worker counters.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool offers a pool of reusable workers(goroutines). NOTE that the WorkerPool does not handle panics.
It is extremely useful when we are facing the "morestack" issue. Additionally, certain options can enable us to perform lockless operations under specific circumstances by utilizing the worker ID.
Example ¶
p := New(Options{ Capacity: 10, IdleTimeout: 5 * time.Minute, CreateIfNoWorkersAvailable: true, }) count := uint32(0) for i := 0; i < 100; i++ { n := uint32(i + 1) _ = p.Submit(context.TODO(), func(context.Context) { atomic.AddUint32(&count, n) }) } _ = p.WaitDone(context.TODO()) fmt.Println(count)
Output: 5050
Example (LocklessOperation) ¶
p := New(Options{ Capacity: 8, WaitIfNoWorkersAvailable: true, CreateWorkerID: true, }) values := make([]uint32, 8, 8) for i := 0; i < 100; i++ { n := uint32(i + 1) _ = p.Submit(context.TODO(), func(ctx context.Context) { id, ok := WorkerID(ctx) if !ok { panic("not possible") } // The worker id starts with 1. values[id-1] += n time.Sleep(10 * time.Millisecond) // Too fast, sleep for a while.. }) } _ = p.WaitDone(context.TODO()) sum := uint32(0) count := 0 for _, v := range values { if v > 0 { count++ } sum += v } fmt.Println(count) fmt.Println(sum)
Output: 8 5050
func New ¶
func New(opts Options) *WorkerPool
New creates a new WorkerPool. The pool with default(empty) Options has infinite workers and the workers never die.
func (*WorkerPool) Submit ¶
func (p *WorkerPool) Submit(ctx context.Context, fn Func) error
Submit submits a task and waits until it acquired by an available worker or wait until the context done if WaitIfNoWorkersAvailable enabled. The "same" ctx will be passed into Func. NOTE it panics if ctx==nil, pass context.TODO() or context.Background() instead.
func (*WorkerPool) SubmitConcurrentDependent ¶
func (p *WorkerPool) SubmitConcurrentDependent(ctx context.Context, fns ...Func) error
SubmitConcurrentDependent submits multiple *concurrent dependent* tasks and waits until all of them are acquired by available workers or wait until the context done if WaitIfNoWorkersAvailable enabled. The "same" ctx will be passed into Func.
func (*WorkerPool) WaitDone ¶
func (p *WorkerPool) WaitDone(ctx context.Context) error
WaitDone waits until all tasks done or the context done. The pool becomes unusable(read only) after this operation. If you want to wait multiple times, using an extra sync.WaitGroup. NOTE it panics if ctx==nil, pass context.TODO() or context.Background() instead.