workerpool

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2023 License: BSD-3-Clause Imports: 7 Imported by: 0

README

WorkerPool

Go Reference

This package offers a convenient and efficient worker(goroutine) pool solution, featuring a straightforward concurrent pattern called "pipeline" for effortless integration and usage.

The WorkerPool is extremely useful when we facing "morestack" issue. Also some options can enable us to do lockless operations under some circumstances by using the worker id.

Documentation

Overview

Package workerpool offers a convenient and efficient worker(goroutine) pool solution, featuring a straightforward concurrent pattern called "pipeline" for effortless integration and usage.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoWorkersAvailable is returned if there is no workers available
	// in condition of both WaitIfNoWorkersAvailable and CreateIfNoWorkersAvailable are disabled.
	ErrNoWorkersAvailable = fmt.Errorf("workerpool: not workers available")
	// ErrInvalidWorkerPool indicates WaitDone function has been called.
	ErrInvalidWorkerPool = fmt.Errorf("workerpool: invalid worker pool")
)
View Source
var (
	// ErrPipelineFrozen means the pipeline does not accept any further operations
	// since Pipeline.Join has been called.
	ErrPipelineFrozen = fmt.Errorf("workerpool: pipeline is frozen")
)

Functions

func GoSpawn added in v1.1.0

func GoSpawn(ctx context.Context, fn Func) error

GoSpawn is an implementation of AsyncExecutor that spawns a goroutine and directly executes the function (fn) within it.

func WorkerID

func WorkerID(ctx context.Context) (uint32, bool)

WorkerID returns the worker id associated with this context. Only available if the option CreateWorkerID enabled. NOTE that the worker id always starts with 1.

func Wrap added in v1.1.0

func Wrap[In, Out any](p *WorkerPool, f func(context.Context, In) (Out, error)) func(context.Context, In) (Out, error)

Wrap wraps a function for ease of future use, allowing the wrapped function to be executed within the WorkerPool.

Example
pool := New(Options{
	Capacity:                 8,
	WaitIfNoWorkersAvailable: true,
})

increase := func(a int) int {
	return a + 1
}
wrappedIncrease := Wrap(pool, func(_ context.Context, i int) (int, error) {
	return increase(i), nil
})

count := 0
for i := 0; i < 100; i++ {
	count, _ = wrappedIncrease(context.TODO(), count)
}
_ = pool.WaitDone(context.TODO())
fmt.Println(count)
Output:

100

Types

type AsyncExecutor added in v1.1.0

type AsyncExecutor func(ctx context.Context, fn Func) error

AsyncExecutor is a function type used for executing a function asynchronously.

type Func

type Func func(context.Context)

Func is the type of function called by worker in the pool. It is the caller's responsibility to recover the panic.

type None added in v1.1.0

type None struct{}

None is a placeholder for convinience if there is no parameters or no return value.

type Options

type Options struct {
	// Capacity specifies the maximum number of resident running workers(goroutines),
	// 0 means no limit.
	Capacity uint32
	// IdleTimeout is the maximum amount of time a worker(goroutine) will
	// remain idle before terminating itself. Zero means no limit, the workers
	// never die if the pool is valid.
	IdleTimeout time.Duration
	// ResetInterval defines how often the worker(goroutine) must be restarted,
	// zero to disable it.
	// With this options enabled, a worker can reset its stack so that large stacks
	// don't live in memory forever, 25% jitter will be applied.
	ResetInterval time.Duration
	// WaitIfNoWorkersAvailable will wait until there is a worker available
	// if all resident workers are busy.
	// It only works if the option Capacity greater than zero.
	// This option will conflict with CreateIfNoWorkersAvailable.
	WaitIfNoWorkersAvailable bool
	// CreateIfNoWorkersAvailable will create an ephemeral worker only
	// if all resident workers are busy.
	// It only works if the option Capacity greater than zero and the option
	// WaitIfNoWorkerAvailable is disabled.
	CreateIfNoWorkersAvailable bool
	// CreateWorkerID will inject a worker id into the context of Func.
	// It may be useful, for example, we can use it to do some lockless operations
	// under some circumstances when we have fixed number of workers and those workers live long enough.
	CreateWorkerID bool
}

Options configure the WorkerPool.

type Pipeline added in v1.1.0

type Pipeline[In, Out any] struct {
	// contains filtered or unexported fields
}

Pipeline utilizes an input and an output channel to create a concurrent processor with three stages.

Example
pool := New(Options{
	Capacity:                 4,
	WaitIfNoWorkersAvailable: true,
})

pipeline := NewPipelineWith[int, int](PipelineOptions{
	FeederAsyncExecutor: GoSpawn,
	WorkerAsyncExecutor: pool.Submit,
})
inputs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
_ = pipeline.StartFeeder(context.Background(), inputs)
_ = pipeline.StartWorkerN(context.Background(), 4, func(_ context.Context, i int) int {
	return i * 2
})

sum := 0
for v := range pipeline.Join() {
	sum += v
}
fmt.Println("zero canceled:", len(inputs) == pipeline.ProcessedCount())
fmt.Println("sum:", sum)
_ = pool.WaitDone(context.TODO()) // Clean up.
Output:

zero canceled: true
sum: 272

func NewPipeline added in v1.1.0

func NewPipeline[In, Out any]() *Pipeline[In, Out]

NewPipeline creates a new pipeline that utilizes fire-and-forget goroutines.

func NewPipelineWith added in v1.1.0

func NewPipelineWith[In, Out any](opts PipelineOptions) *Pipeline[In, Out]

NewPipelineWith creates a new Pipeline with PipelineOptions.

func (*Pipeline[In, Out]) Join added in v1.1.0

func (p *Pipeline[In, Out]) Join() <-chan Out

Join returns an output channel, the channel will be closed after all tasks are done. It is the caller's responsibility to check if all inputs are processed; the ProcessedCount variable serves this purpose. The pipeline is frozen after the join.

func (*Pipeline[In, Out]) ProcessedCount added in v1.1.0

func (p *Pipeline[In, Out]) ProcessedCount() int

ProcessedCount keeps track of the number of inputs that have been processed. The count is stable if the output channel has been closed.

func (*Pipeline[In, Out]) StartFeeder added in v1.1.0

func (p *Pipeline[In, Out]) StartFeeder(ctx context.Context, items []In) error

StartFeeder initiates the feeding process of an array of inputs within the AsyncExecutor. The feeding process can be interrupted by the context.Context without any signal.

This method must be invoked prior to Join, failing which ErrPipelineFrozen will be returned.

func (*Pipeline[In, Out]) StartFeederFunc added in v1.1.0

func (p *Pipeline[In, Out]) StartFeederFunc(ctx context.Context, feedLoop func(context.Context, chan<- In)) error

StartFeeder initiates a feeding process within the asynchronous executor. It's important for the caller to check the context.Context inside the feedLoop and ensure that the feeding process is stopped properly.

This method must be invoked prior to Join, failing which ErrPipelineFrozen will be returned.

func (*Pipeline[In, Out]) StartWorker added in v1.1.0

func (p *Pipeline[In, Out]) StartWorker(ctx context.Context, workOne func(context.Context, In) Out) error

StartWorker initiates a worker to process the inputs within the AsyncExecutor. The worker will stop if either the context is done or all inputs have been processed.

This method must be invoked prior to Join, failing which ErrPipelineFrozen will be returned.

func (*Pipeline[In, Out]) StartWorkerN added in v1.1.0

func (p *Pipeline[In, Out]) StartWorkerN(ctx context.Context, concurrency int, workOne func(context.Context, In) Out) error

StartWorkerN starts N workers to process inputs within the AsyncExecutor. The workers will stop if either the context is done or all inputs have been processed.

This method must be invoked prior to Join, failing which ErrPipelineFrozen will be returned.

type PipelineOptions added in v1.1.0

type PipelineOptions struct {
	// FeederAsyncExecutor is the AsyncExecutor used by feeder.
	FeederAsyncExecutor AsyncExecutor
	// WorkerAsyncExecutor is the AsyncExecutor used by worker.
	WorkerAsyncExecutor AsyncExecutor
	// InputBufferSize is the buffer size of input channel.
	InputBufferSize int
	// OutputBufferSize is the buffer size of output channel.
	OutputBufferSize int
}

PipelineOptions configure the Pipeline.

NOTE that if you enable the WaitIfNoWorkersAvailable option with a small Capacity and BufferSize while using the WorkerPool for both Feeder and Worker, it may result in a deadlock. Additionally, chaining multiple Pipelines under such circumstances may also cause a deadlock.

type Stats

type Stats struct {
	// ResidentWorkers counts the number of resident workers.
	ResidentWorkers uint32
	// EphemeralWorkers counts the number of ephemeral workers when
	// the option CreateIfNoWorkersAvailable is enabled.
	EphemeralWorkers uint32
	// IdleWorkers counts all idle workers including any newly created workers.
	IdleWorkers uint32
	// PendingSubmits counts all pending Submit(*).
	PendingSubmits uint32
}

Stats contains a list of worker counters.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool offers a pool of reusable workers(goroutines). NOTE that the WorkerPool does not handle panics.

It is extremely useful when we are facing the "morestack" issue. Additionally, certain options can enable us to perform lockless operations under specific circumstances by utilizing the worker ID.

Example
p := New(Options{
	Capacity:                   10,
	IdleTimeout:                5 * time.Minute,
	CreateIfNoWorkersAvailable: true,
})

count := uint32(0)
for i := 0; i < 100; i++ {
	n := uint32(i + 1)
	_ = p.Submit(context.TODO(), func(context.Context) {
		atomic.AddUint32(&count, n)
	})
}
_ = p.WaitDone(context.TODO())

fmt.Println(count)
Output:

5050
Example (LocklessOperation)
p := New(Options{
	Capacity:                 8,
	WaitIfNoWorkersAvailable: true,
	CreateWorkerID:           true,
})

values := make([]uint32, 8, 8)
for i := 0; i < 100; i++ {
	n := uint32(i + 1)
	_ = p.Submit(context.TODO(), func(ctx context.Context) {
		id, ok := WorkerID(ctx)
		if !ok {
			panic("not possible")
		}
		// The worker id starts with 1.
		values[id-1] += n
		time.Sleep(10 * time.Millisecond) // Too fast, sleep for a while..
	})
}
_ = p.WaitDone(context.TODO())

sum := uint32(0)
count := 0
for _, v := range values {
	if v > 0 {
		count++
	}
	sum += v
}
fmt.Println(count)
fmt.Println(sum)
Output:

8
5050

func New

func New(opts Options) *WorkerPool

New creates a new WorkerPool. The pool with default(empty) Options has infinite workers and the workers never die.

func (*WorkerPool) Stats

func (p *WorkerPool) Stats() Stats

Stats returns the current stats.

func (*WorkerPool) Submit

func (p *WorkerPool) Submit(ctx context.Context, fn Func) error

Submit submits a task and waits until it acquired by an available worker or wait until the context done if WaitIfNoWorkersAvailable enabled. The "same" ctx will be passed into Func. NOTE it panics if ctx==nil, pass context.TODO() or context.Background() instead.

func (*WorkerPool) SubmitConcurrentDependent

func (p *WorkerPool) SubmitConcurrentDependent(ctx context.Context, fns ...Func) error

SubmitConcurrentDependent submits multiple *concurrent dependent* tasks and waits until all of them are acquired by available workers or wait until the context done if WaitIfNoWorkersAvailable enabled. The "same" ctx will be passed into Func.

func (*WorkerPool) WaitDone

func (p *WorkerPool) WaitDone(ctx context.Context) error

WaitDone waits until all tasks done or the context done. The pool becomes unusable(read only) after this operation. If you want to wait multiple times, using an extra sync.WaitGroup. NOTE it panics if ctx==nil, pass context.TODO() or context.Background() instead.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL