workerpool

package module
Version: v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2022 License: Apache-2.0 Imports: 4 Imported by: 3

README

Workerpool

Go Reference CI Go Report Card

Package workerpool implements a concurrency limiting worker pool. Worker routines are spawned on demand as tasks are submitted; up to the configured limit of concurrent workers.

When the limit of concurrently running workers is reached, submitting a task blocks until a worker is able to pick it up. This behavior is intentional as it prevents from accumulating tasks which could grow unbounded. Therefore, it is the responsibility of the caller to queue up tasks if that's the intended behavior.

One caveat is that while the number of concurrently running workers is limited, task results are not and they accumulate until they are collected. Therefore, if a large number of tasks can be expected, the workerpool should be periodically drained (e.g. every 10k tasks).

This package is mostly useful when tasks are CPU bound and spawning too many routines would be detrimental to performance. It features a straightforward API and no external dependencies. See the section below for a usage example.

Example

package main

import (
	"context"
	"fmt"
	"os"
	"runtime"

	"github.com/cilium/workerpool"
)

// IsPrime returns true if n is prime, false otherwise.
func IsPrime(n int64) bool {
	if n < 2 {
		return false
	}
	for p := int64(2); p*p <= n; p++ {
		if n%p == 0 {
			return false
		}
	}
	return true
}

func main() {
	wp := workerpool.New(runtime.NumCPU())
	for i, n := 0, int64(1_000_000_000_000_000_000); n < 1_000_000_000_000_000_100; i, n = i+1, n+1 {
		n := n // https://golang.org/doc/faq#closures_and_goroutines
		id := fmt.Sprintf("task #%d", i)
		// Use Submit to submit tasks for processing. Submit blocks when no
		// worker is available to pick up the task.
		err := wp.Submit(id, func(_ context.Context) error {
			fmt.Println("isprime", n)
			if IsPrime(n) {
				fmt.Println(n, "is prime!")
			}
			return nil
		})
		// Submit fails when the pool is closed (ErrClosed) or being drained
		// (ErrDrained). Check for the error when appropriate.
		if err != nil {
			fmt.Fprintln(os.Stderr, err)
			return
		}
	}

	// Drain prevents submitting new tasks and blocks until all submitted tasks
	// complete.
	tasks, err := wp.Drain()
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}

	// Iterating over the results is useful if non-nil errors can be expected.
	for _, task := range tasks {
		// Err returns the error that the task returned after execution.
		if err := task.Err(); err != nil {
			fmt.Println("task", task, "failed:", err)
		}
	}

	// Close should be called once the worker pool is no longer necessary.
	if err := wp.Close(); err != nil {
		fmt.Fprintln(os.Stderr, err)
	}
}

Documentation

Overview

Package workerpool implements a concurrency limiting worker pool. Worker routines are spawned on demand as tasks are submitted; up to the configured limit of concurrent workers.

When the limit of concurrently running workers is reached, submitting a task blocks until a worker is able to pick it up. This behavior is intentional as it prevents from accumulating tasks which could grow unbounded. Therefore, it is the responsibility of the caller to queue up tasks if that's the intended behavior.

One caveat is that while the number of concurrently running workers is limited, task results are not and they accumulate until they are collected. Therefore, if a large number of tasks can be expected, the workerpool should be periodically drained (e.g. every 10k tasks).

Example
// SPDX-License-Identifier: Apache-2.0
// Copyright 2021 Authors of Cilium

package main

import (
	"context"
	"fmt"
	"os"
	"runtime"

	"github.com/cilium/workerpool"
)

// IsPrime returns true if n is prime, false otherwise.
func IsPrime(n int64) bool {
	if n < 2 {
		return false
	}
	for p := int64(2); p*p <= n; p++ {
		if n%p == 0 {
			return false
		}
	}
	return true
}

func main() {
	wp := workerpool.New(runtime.NumCPU())
	for i, n := 0, int64(1_000_000_000_000_000_000); n < 1_000_000_000_000_000_100; i, n = i+1, n+1 {
		n := n // https://golang.org/doc/faq#closures_and_goroutines
		id := fmt.Sprintf("task #%d", i)
		// Use Submit to submit tasks for processing. Submit blocks when no
		// worker is available to pick up the task.
		err := wp.Submit(id, func(_ context.Context) error {
			fmt.Println("isprime", n)
			if IsPrime(n) {
				fmt.Println(n, "is prime!")
			}
			return nil
		})
		// Submit fails when the pool is closed (ErrClosed) or being drained
		// (ErrDrained). Check for the error when appropriate.
		if err != nil {
			fmt.Fprintln(os.Stderr, err)
			return
		}
	}

	// Drain prevents submitting new tasks and blocks until all submitted tasks
	// complete.
	tasks, err := wp.Drain()
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}

	// Iterating over the results is useful if non-nil errors can be expected.
	for _, task := range tasks {
		// Err returns the error that the task returned after execution.
		if err := task.Err(); err != nil {
			fmt.Println("task", task, "failed:", err)
		}
	}

	// Close should be called once the worker pool is no longer necessary.
	if err := wp.Close(); err != nil {
		fmt.Fprintln(os.Stderr, err)
	}
}
Output:

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrDraining is returned when an operation is not possible because
	// draining is in progress.
	ErrDraining = errors.New("drain operation in progress")
	// ErrClosed is returned when operations are attempted after a call to Close.
	ErrClosed = errors.New("worker pool is closed")
)

Functions

This section is empty.

Types

type Task

type Task interface {
	// String returns the task identifier.
	fmt.Stringer
	// Err returns the error resulting from processing the
	// unit of work.
	Err() error
}

Task is a unit of work.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool spawns, on demand, a number of worker routines to process submitted tasks concurrently. The number of concurrent routines never exceeds the specified limit.

func New

func New(n int) *WorkerPool

New creates a new pool of workers where at most n workers process submitted tasks concurrently. New panics if n ≤ 0.

func (*WorkerPool) Cap

func (wp *WorkerPool) Cap() int

Cap returns the concurrent workers capacity, see New().

func (*WorkerPool) Close

func (wp *WorkerPool) Close() error

Close closes the worker pool, rendering it unable to process new tasks. Close sends the cancellation signal to any running task and waits for all workers, if any, to return. Close will return ErrClosed if it has already been called.

func (*WorkerPool) Drain

func (wp *WorkerPool) Drain() ([]Task, error)

Drain waits until all tasks are completed. This operation prevents submitting new tasks to the worker pool. Drain returns the results of the tasks that have been processed. If a drain operation is already in progress, ErrDraining is returned. If the worker pool is closed, ErrClosed is returned.

func (*WorkerPool) Len added in v1.1.0

func (wp *WorkerPool) Len() int

Len returns the count of concurrent workers currently running.

func (*WorkerPool) Submit

func (wp *WorkerPool) Submit(id string, f func(ctx context.Context) error) error

Submit submits f for processing by a worker. The given id is useful for identifying the task once it is completed. The task f must return when the context ctx is cancelled.

Submit blocks until a routine start processing the task.

If a drain operation is in progress, ErrDraining is returned and the task is not submitted for processing. If the worker pool is closed, ErrClosed is returned and the task is not submitted for processing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL