workers

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2021 License: MIT Imports: 6 Imported by: 0

README

workers

ci coverage godoc

Go package that allows to run a pool of workers to run job concurrently in the background.

Usage

Create a pool of workers passing a job and start the pool.

package main

import (
    "log"
    "context"
    "time"

    "github.com/hmoragrega/workers"
)

func main() {
    job := workers.JobFunc(func(ctx context.Context) {
        // my job code 
    })

    pool := workers.Must(workers.New(job))

    if err := pool.Start(); err != nil {
        log.Fatal("cannot start pool", err)
    }

    defer func() {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()

        if err := pool.Close(ctx); err != nil {
            log.Fatal("cannot close pool", err)
        }    
    }()
}
Pool

A pool runs a single job trough a number of concurrent workers.

By default, a pool will have one worker, and will allow to increase the number of workers indefinitely.

There are a few configuration parameters that can tweak the pool behaviour

type Config struct {
    // Min indicates the minimum number of workers that can run concurrently.
    // When 0 is given the minimum is defaulted to 1.
    Min int

    // Max indicates the maximum number of workers that can run concurrently.
    // the default "0" indicates an infinite number of workers.
    Max int

    // Initial indicates the initial number of workers that should be running.
    // When 0 is given the minimum is used.
    Initial int
}

To have a pool with a tweaked config you can call NewWithConfig

pool, err := workers.NewWithConfig(job, workers.Config{
   Min:     3,
   Max:     10,
   Initial: 5,
})
Starting the pool

The pool won't process any job until it is started

if err := pool.Start(); err != nil {
    log.Println("cannot start the pool", err)
}

The operation will fail if:

  • the pool is not configured; New was not called
  • the pool is closed; Close was called
  • the pool is already running; Start was already called
Stopping the pool

Stopping the pool will request all the workers stop and wait until all of them finish its ongoing jobs or the given context is cancelled.

if err := pool.Close(ctx); err != nil {
    log.Println("cannot start the pool", err)
}

Close will also wait for workers that were removed but are still executing its last job

The operation will fail if:

  • the pool is not configured; New was not called
  • the pool is already closed; Close was already called
  • the pool is not running; Start was not called.

Alternative CloseWithTimeout can be used passing a time.Duration as a helper.

Adding workers

To add a new worker to the pool you can call

if err := pool.More(); err != nil {
    log.Println("cannot add more workers", err)
}

The operation will fail if:

  • the maximum number of workers has been reached
  • the pool is not configured; New was not called
  • the pool is closed; Close was called
  • the pool is not running; Start was not called
Removing workers

To remove a worker you can use Less.

if err := pool.Less(); err != nil {
    log.Println("cannot remove more workers", err)
}

Less will remove a worker from the poo, immediately reduce the number of worker running, and request the worker to stop processing jobs, but it won't wait until the worker finally stops.

These stopping workers will still be taken into account when the closing the pool, waiting for them to finish.

The operation will fail if:

  • the minimum number of workers has been reached
  • the pool is not configured; New was not called
  • the pool is closed; Close was called
  • the pool is not running; Start was not called
Job

A job is a simple function that accepts only one parameter, the worker context.

// Job represent some work that needs to be done non-stop.
type Job interface {
	// Do executes the job.
	//
	// The only parameter that will receive is a context, the job
	// should try to honor the context cancellation signal as soon
	// as possible.
	//
	// The context will be cancelled when removing workers from
	// the pool or stopping the pool completely.
	Do(ctx context.Context)
}

Simple jobs can use the helper JobFunc to comply with the interface

// JobFunc is a helper function that is a job.
type JobFunc func(ctx context.Context)

There are two ways of extending the job functionality

Job Middleware

A middleware allows to extend the job capabilities

// Middleware is a function that wraps the job and can
// be used to extend the functionality of the pool.
type Middleware interface {
	Next(job Job) Job
}

The helper MiddlewareFunc can be used to wrap simple middleware functions

// MiddlewareFunc is a function that implements the
// job middleware interface.
type MiddlewareFunc func(job Job) Job

Some example of middleware:

  • Counter counts how many jobs start and finish.
  • Elapsed extends the counter middleware providing also:
    • the total amount of time.
    • the average time.
    • the time of the last executed job.
  • Wait allows to add a pause between worker jobs. (Job will still be running concurrently if there are more workers)
Job Wrapper

A job wrapper is a function that can transform and extend the job signature.

Some common scenario that can benefit of job wrappers are jobs that may fail and return an error. We could, for example, retry the job a certain amount of times.

As an exercise let's log the job result with our favourite logging library using the "WithError" wrapper;

// jobLogger is a reusable logger wrapper for jobs.
jobLogger := func(jobName string) func(error) {
    return func(error) {
        if err != nil {
            logger.Error("job failed", "job", jobName, "error", err)
            return    
        }
        logger.Debug("job success", "job", jobName)
    }
}

job := function(ctx context.Context) error {
    err := someWorkThatCanFail()
    return err
}

pool := workers.Must(workers.New(
    wrapper.WithError(job, jobLogger("foo")
))

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPoolClosed     = errors.New("pool is closed")
	ErrPoolStarted    = errors.New("pool already started")
	ErrNotStarted     = errors.New("pool has not started")
	ErrNotConfigured  = errors.New("pool not configured")
	ErrInvalidMax     = errors.New("maximum workers must be equal or greater than minimum")
	ErrInvalidMin     = errors.New("minimum workers must be at least one")
	ErrInvalidInitial = errors.New("initial workers must match at least the minimum")
	ErrMinReached     = errors.New("minimum number of workers reached")
	ErrMaxReached     = errors.New("maximum number of workers reached")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// Min indicates the minimum number of workers that can run concurrently.
	// When 0 is given the minimum is defaulted to 1.
	Min int

	// Max indicates the maximum number of workers that can run concurrently.
	// the default "0" indicates an infinite number of workers.
	Max int

	// Initial indicates the initial number of workers that should be running.
	// When 0 is given the minimum is used.
	Initial int
}

Config allows to configure the number of workers that will be running in the pool.

type Job

type Job interface {
	// Do executes the job.
	//
	// The only parameter that will receive is the worker context,
	// the job should try to honor the context cancellation signal
	// as soon as possible.
	//
	// The context will be cancelled when removing workers from
	// the pool or stopping the pool completely.
	Do(ctx context.Context)
}

Job represents some work that needs to be done non-stop.

type JobFunc added in v0.3.0

type JobFunc func(ctx context.Context)

JobFunc is a helper function that is a job.

func (JobFunc) Do added in v0.3.0

func (f JobFunc) Do(ctx context.Context)

Do executes the job work.

type Middleware added in v0.3.0

type Middleware interface {
	Wrap(job Job) Job
}

Middleware is a function that wraps the job and can be used to extend the functionality of the pool.

type MiddlewareFunc added in v0.3.0

type MiddlewareFunc func(job Job) Job

MiddlewareFunc is a function that implements the job middleware interface.

func (MiddlewareFunc) Wrap added in v0.3.0

func (f MiddlewareFunc) Wrap(job Job) Job

Wrap executes the middleware function wrapping the job.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a pool of workers that can be started to run a job non-stop concurrently.

func Must

func Must(p *Pool, err error) *Pool

Must checks if the result of creating a pool has failed and if so, panics.

func New

func New(job Job, middlewares ...Middleware) (*Pool, error)

New creates a new pool with the default configuration.

It accepts an arbitrary number of job middlewares to run.

func NewWithConfig

func NewWithConfig(job Job, cfg Config, middlewares ...Middleware) (*Pool, error)

NewWithConfig creates a new pool with an specific configuration.

It accepts an arbitrary number of job middlewares to run.

func (*Pool) Close

func (p *Pool) Close(ctx context.Context) error

Close stops all the workers and closes the pool.

Only the first call to Close will shutdown the pool, the next calls will be ignored and return nil.

func (*Pool) CloseWIthTimeout

func (p *Pool) CloseWIthTimeout(timeout time.Duration) error

CloseWIthTimeout closes the pool waiting for a certain amount of time.

func (*Pool) Current

func (p *Pool) Current() int

Current returns the current number of workers.

There may be more workers executing jobs while they are to complete it's last job after being removed, but they will eventually finish and stop processing new jobs.

func (*Pool) Less

func (p *Pool) Less() error

Less removes the number of workers in the pool.

This call does not wait for the worker to finish its current job, if the pool is closed though, the call to close it will wait for all removed workers to finish before returning.

func (*Pool) More

func (p *Pool) More() error

More starts a new worker in the pool.

func (*Pool) Start

func (p *Pool) Start() error

Start launches the workers and keeps them running until the pool is closed.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL