Documentation
¶
Overview ¶
Generic worker pool (work queue) library with auto-scaling, backpressure, and easy composability of pools into pipelines.
Index ¶
- func ConnectPools[I, O, C, O2, C2 any](p1 *Pool[I, O, C], p2 *Pool[O, O2, C2], handleError func(Result[I, O]))
- func ErrorWrapPauseWorker(dur time.Duration, err error) error
- func ErrorWrapRetryable(err error) error
- func ErrorWrapRetryableUnaccounted(err error) error
- func FixedWorkers() func(c *poolConfig) error
- func LoggerDebug(l *log.Logger) func(c *poolConfig) error
- func LoggerInfo(l *log.Logger) func(c *poolConfig) error
- func MaxActiveWorkers(n int) func(c *poolConfig) error
- func Name(s string) func(c *poolConfig) error
- func ReinitDelay(d time.Duration) func(c *poolConfig) error
- func Retries(n int) func(c *poolConfig) error
- func StopWorkerAfterNumOfJobs(numOfJobs int) func(c *poolConfig) error
- func StopWorkerAfterNumOfJobsFor(numOfJobs int, stopDurationMultiplier float64) func(c *poolConfig) error
- func TargetLoad(v float64) func(c *poolConfig) error
- type Job
- type Pool
- func NewPoolSimple[I any](numOfWorkers int, handler func(job Job[I], workerID int) error, ...) (*Pool[I, struct{}, struct{}], error)
- func NewPoolWithInit[I, C any](numOfWorkers int, handler func(job Job[I], workerID int, connection C) error, ...) (*Pool[I, struct{}, C], error)
- func NewPoolWithResults[I, O any](numOfWorkers int, handler func(job Job[I], workerID int) (O, error), ...) (*Pool[I, O, struct{}], error)
- func NewPoolWithResultsAndInit[I, O, C any](numOfWorkers int, ...) (*Pool[I, O, C], error)
- type Result
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConnectPools ¶
func ConnectPools[I, O, C, O2, C2 any](p1 *Pool[I, O, C], p2 *Pool[O, O2, C2], handleError func(Result[I, O]))
ConnectPools starts a goroutine that reads the results of the first pool, and submits them to the second, if there is no error, or passes them to the handleError function if there is an error.
Once you call StopAndWait() on the first pool, after a while p1.Results get closed, and p2.StopAndWait() is called. This way StopAndWait() propagates through the pipeline.
WARNING: Should only be used if the first pool has a not-nil Results channel. Which means it was created by the constructors NewPoolWithResults() or NewPoolWithResultsAndInit().
func ErrorWrapPauseWorker ¶
ErrorWrapPauseWorker stops the worker for at least the given duration of time, and starts another worker so concurrency will not decrease. After the pause duration has passed, the worker can start again if it is needed.
func ErrorWrapRetryable ¶
ErrorWrapRetryable marks an error as retryable.
func ErrorWrapRetryableUnaccounted ¶
ErrorWrapRetryableUnaccounted marks an error as retryable and unaccounted. This means the pool will keep retrying the job indefinitely, without incrementing the attempt counter of the job.
func FixedWorkers ¶
func FixedWorkers() func(c *poolConfig) error
FixedWorkers disables auto-scaling and makes the pool use a fixed number of workers equal to the value of maxActiveWorkers.
func LoggerDebug ¶
LoggerDebug sets a logger for debug level logging.
func LoggerInfo ¶
LoggerInfo sets a logger for info level logging.
func MaxActiveWorkers ¶
MaxActiveWorkers sets the maximum number of active workers, if you need it to be lower than numOfWorkers.
Default value = numOfWorkers
func ReinitDelay ¶
ReinitDelay sets the time duration the worker should wait before reattempting init after failure.
func Retries ¶
Retries sets the number of times a job will be retried if it fails with a retryable error (see function ErrorWrapRetryable).
Default value = 1
func StopWorkerAfterNumOfJobs ¶
StopWorkerAfterNumOfJobs stops workers once they process numOfJobs jobs. Stopped workers will be restarted when they are needed.
func StopWorkerAfterNumOfJobsFor ¶
func StopWorkerAfterNumOfJobsFor(numOfJobs int, stopDurationMultiplier float64) func(c *poolConfig) error
StopWorkerAfterNumOfJobsFor stops workers once they process numOfJobs jobs. Workers will remain stopped for at least the time they were active multiplied by stopDurationMultiplier.
func TargetLoad ¶
TargetLoad sets the target load of the pool.
load = n / c, where n = number of jobs in queue or processing, and c = concurrency (current number of started workers).
If load is higher than target load, new workers are started. If it's lower, some workers are stopped.
Types ¶
type Pool ¶
type Pool[I, O, C any] struct { // If the pool is created by the constructors // NewPoolWithResults() or NewPoolWithResultsAndInit(), // results are written to this channel, // and you must consume from this channel in a loop until it is closed. // If the pool is created by the constructors // NewPoolSimple() or NewPoolWithInit(), // this channel is nil. Results chan Result[I, O] // contains filtered or unexported fields }
func NewPoolSimple ¶
func NewPoolSimple[I any](numOfWorkers int, handler func(job Job[I], workerID int) error, options ...func(*poolConfig) error) (*Pool[I, struct{}, struct{}], error)
NewPoolSimple creates a new worker pool.
func NewPoolWithInit ¶
func NewPoolWithInit[I, C any](numOfWorkers int, handler func(job Job[I], workerID int, connection C) error, workerInit func(workerID int) (C, error), workerDeinit func(workerID int, connection C) error, options ...func(*poolConfig) error) (*Pool[I, struct{}, C], error)
NewPoolWithInit creates a new worker pool with workerInit() and workerDeinit() functions.
func NewPoolWithResults ¶
func NewPoolWithResults[I, O any](numOfWorkers int, handler func(job Job[I], workerID int) (O, error), options ...func(*poolConfig) error) (*Pool[I, O, struct{}], error)
NewPoolWithResults creates a new worker pool with Results channel. You must consume from this channel in a loop until it is closed.
func NewPoolWithResultsAndInit ¶
func NewPoolWithResultsAndInit[I, O, C any](numOfWorkers int, handler func(job Job[I], workerID int, connection C) (O, error), workerInit func(workerID int) (C, error), workerDeinit func(workerID int, connection C) error, options ...func(*poolConfig) error) (*Pool[I, O, C], error)
NewPoolWithResultsAndInit creates a new worker pool with workerInit() and workerDeinit() functions and Results channel. You must consume from this channel in a loop until it is closed.
func (*Pool[I, O, C]) StopAndWait ¶
func (p *Pool[I, O, C]) StopAndWait()
StopAndWait shuts down the pool. Once called no more jobs can be submitted, and waits for all enqueued jobs to finish and workers to stop.