workerpool

package module
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2020 License: MIT Imports: 3 Imported by: 0

README

Worker Pool GoDoc Build Status Coverage Status Go Report Card

The worker pool library abstracts the setup around creating worker pools, so all you need to take care of is the actual business logic.

How it works

The work will be evenly distributed to N workers which process in parallel. Successful responses are passed back through a success channel and errors through an error channel. A callback can be specified to provide behavior for successes and errors. The actual logic of the worker is specified by providing a function which handles the logic and returns either a result or an error.

Usage

The worker pool is defined as follow:

func NewWorkerPool(
	workerCount int,
	onSuccess SuccessFunc,
	onError ErrorFunc,
	doWork DoWorkFunc,
)
  • The workerCount is the number of workers which will process jobs in parallel.
  • onSuccess is the callback which is called for each successful result
  • onError is the callback which is called for each failed result
  • doWork is the function which is called to process each job. This is the part which works concurrently.

Here is a short example:

successes := make([]int, 0)

workerPool := NewWorkerPool(
    func(result interface{}) { //On success
        r := result.(int)
        successes = append(successes, r) 
    },
    func(err error) { //On error
        log.Println(err.Error())
    },
    func(job interface{}) (result interface{}, err error) { //Do work
        j := job.(int)
        if j > 4 {
            return nil, fmt.Errorf("number too big: %d", j)
        }
        return j, nil
    })

//Do the work
if err := workerPool.Work(
	    ctx,
	    3, //The number of workers which should work in parallel
	    []int{1, 2, 3, 4, 5, 6, 7}, //The items to be processed
	); err != nil {
    log.Println(err.Error())
}
Cancelling the jobs

Sometimes you may want to stop processing early if, for example, enough results have been found. This can be done by canceling the context passed into the worker pool:

ctx, cancel := context.WithCancel(context.Background())

successes := make([]int, 0)

workerPool := NewWorkerPool(
    3, //The number of workers which should work in parallel
    func(result interface{}) { //On success
        r := result.(int)
        successes = append(successes, r) 
        if len(successes) > 3 {
            cancel() //we have enough results
        }
    },
    func(err error) { //On error
        log.Println(err.Error())
    },
    func(job interface{}) (result interface{}, err error) { //Do work
        j := job.(int)
        if j > 4 {
            return nil, fmt.Errorf("number too big: %d", j)
        }
        return j, nil
    })

//Do the work
if err := workerPool.Work(ctx, []int{1, 2, 3, 4, 5, 6, 7}); err != nil {
    log.Println(err.Error())
}

In the above code, once 3 successes have been found, it will signal the worker pool to stop processing futher.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool abstracts the setup around creating worker pools.

func NewWorkerPool

func NewWorkerPool(
	onSuccess successFunc,
	onError errorFunc,
	doWork doWorkFunc,
) *WorkerPool

NewWorkerPool creates a new WorkerPool instance with the given onSuccess, onError, and doWork callbacks.

func (*WorkerPool) Work

func (w *WorkerPool) Work(ctx context.Context, workerCount int, jobsSlice interface{}) error

Work spawns the workers and creates the concurrency control channels, and then distributes the given jobs to each worker. When the given context is canceled, the work will be halted. An error is returned if the given jobSlice is not a slice.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL