workers

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2021 License: MIT Imports: 5 Imported by: 0

README

workers

godoc ci coverage goreport

Go package that allows to run a pool of workers to run a job concurrently in the background.

Usage

Create a pool and start running a job.

package main

import (
    "log"
    "context"
    "time"

    "github.com/hmoragrega/workers"
)

func main() {
    var pool Pool

    job := workers.JobFunc(func(ctx context.Context) error {
        // my job code 
        return nil
    })

    if err := pool.Start(job); err != nil {
        log.Fatal("cannot start pool", err)
    }

    defer func() {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()

        if err := pool.Close(ctx); err != nil {
            log.Fatal("cannot close pool", err)
        }    
    }()
}
Pool

A pool runs a single job trough a number of concurrent workers.

By default, a pool will have one worker, and will allow to increase the number of workers indefinitely or even remove them all to pause the job.

There are a few configuration parameters that can tweak the pool behaviour

type Config struct {
    // Min indicates the minimum number of workers that can run concurrently.
    // By default the pool can have 0 workers, pausing it effectively.
    Min int

    // Max indicates the maximum number of workers that can run concurrently.
    // the default "0" indicates an infinite number of workers.
    Max int

    // Initial indicates the initial number of workers that should be running.
    // The default value will be the greater number between 1 or the given minimum.
    Initial int
}

To have a pool with a tweaked config you can call NewWithConfig

pool, err := workers.NewWithConfig(workers.Config{
   Min:     3,
   Max:     10,
   Initial: 5,
})
Starting the pool

To start the pool give it a job to run:

if err := pool.Start(job); err != nil {
    log.Println("cannot start the pool", err)
}

The operation will fail if:

  • the pool is has already been closed.
  • the pool is already running.
Stopping the pool

Stopping the pool will request all the workers stop and wait until all of them finish its ongoing jobs, or the given context is cancelled.

if err := pool.Close(ctx); err != nil {
    log.Println("cannot start the pool", err)
}

Close will also wait for workers that were removed but are still executing its last job

The operation will fail if:

  • the pool is has already been closed.
  • the pool is not running

Alternative CloseWithTimeout can be used passing a time.Duration as a helper.

Adding workers

To add a new worker to the pool you can call

if err := pool.More(); err != nil {
    log.Println("cannot add more workers", err)
}

The operation will fail if:

  • the maximum number of workers has been reached
  • the pool has already been closed
  • the pool is not running
Removing workers

To remove a worker you can use Less.

if err := pool.Less(); err != nil {
    log.Println("cannot remove more workers", err)
}

Less will remove a worker from the pool, immediately reducing the number of workers running, and request the worker to stop processing jobs, but it won't wait until the worker finally stops.

Note: the pool will wait until all jobs finish, even those from those workers that were removed.

The operation will fail if:

  • the minimum number of workers has been reached
  • the pool has already been closed
  • the pool is not running
Job

A job represents a task that needs to be performed.

// Job represent some work that needs to be done.
type Job interface {
	// Do executes the job.
	//
	// The only parameter that will receive is a context, the job
	// should try to honor the context cancellation signal as soon
	// as possible.
	//
	// The context will be cancelled when removing workers from
	// the pool or stopping the pool completely.
	Do(ctx context.Context) error
}

Simple jobs can use the helper JobFunc to comply with the interface

// JobFunc is a helper function that is a job.
type JobFunc func(ctx context.Context) error

To extend the functionality of jobs you can use middlewares or wrappers.

Job Middleware

A middleware allows to extend the job capabilities

// Middleware is a function that wraps the job and can
// be used to extend the functionality of the pool.
type Middleware interface {
	Wrap(job Job) Job
}

The helper MiddlewareFunc can be used to wrap simple middleware functions

// MiddlewareFunc is a function that implements the
// job middleware interface.
type MiddlewareFunc func(job Job) Job

Some example of middleware:

  • Counter counts how many jobs start, finish and fail.
  • Elapsed extends the counter middleware providing also:
    • the total amount of time.
    • the average time.
    • the time of the last executed job.
  • Retry it will retry failed jobs a certain amount of times.
  • Wait allows to add a pause between job executions. (Job will still be running concurrently if there are more workers).

As an exercise let's log the job result with our favourite logging library.

// jobLogger is a middleware that logs the result of a job
// with "debug" or "error" level depending on the result.
jobLogger := func(name string) workers.MiddlewareFunc {
	return func(job workers.Job) workers.Job {
		return workers.JobFunc(func(ctx context.Context) error {
			err := job.Do(ctx)

			logger.Cond(err != nil, logger.Error, logger.Debug).
				Log("job executed", "job", jobName, "error", err)

			return err
		})
	}
}

pool := workers.Must(workers.New(jobLogger("my-job")))
pool.Start(workers.JobFunc(func(ctx context.Context) error {
	return someWork()
}))

If you need to add a middleware before starting the job instead of on pool creating there's the little handy function Wrap that will easy applying them for you.

// Wrap is a helper to apply a chain of middleware to a job.
func Wrap(job Job, middlewares ...Middleware) Job
var pool Pool

job := workers.JobFunc(func(ctx context.Context) (err error) {
  	// work
    return err
}

pool.Start(workers.Wrap(job, jobLogger("my-job")))
Job Wrapper

A job wrapper can be used to change the signature of a job.

For example, your job may never return errors, yet still has to comply with the Job interface that requires it.

You could use the NoError wrapper to avoid having to return error from you job function.

job := func(ctx context.Context) {
    // work
}

var pool workers.Pool 
pool.Start(workers.NoError(job))

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrPoolClosed is triggered when trying to start and add or remove
	// workers from the pool after closing it.
	ErrPoolClosed = errors.New("pool is closed")

	// ErrPoolStarted is triggered when trying to start the pool when it's
	// already running.
	ErrPoolStarted = errors.New("pool already started")

	// ErrNotStarted is returned when trying to add or remove workers from
	// the pool after closing it.
	ErrNotStarted = errors.New("pool has not started")

	// ErrInvalidMax is triggered when configuring a pool with an invalid
	// maximum number of workers.
	ErrInvalidMax = errors.New("the maximum is less than the minimum workers")

	// ErrInvalidMin is triggered when configuring a pool with an invalid
	// minimum number of workers.
	ErrInvalidMin = errors.New("negative number of minimum workers")

	// ErrInvalidInitial is triggered when configuring a pool with an invalid
	// initial number of workers.
	ErrInvalidInitial = errors.New("the initial is less the minimum workers")

	// ErrMinReached is triggered when trying to remove a worker when the
	// pool is already running at minimum capacity.
	ErrMinReached = errors.New("minimum number of workers reached")

	// ErrMaxReached is triggered when trying to add a worker when the
	// pool is already running at maximum capacity.
	ErrMaxReached = errors.New("maximum number of workers reached")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// Min indicates the minimum number of workers that can run concurrently.
	// By default the pool can have 0 workers, pausing it effectively.
	Min int

	// Max indicates the maximum number of workers that can run concurrently.
	// the default "0" indicates an infinite number of workers.
	Max int

	// Initial indicates the initial number of workers that should be running.
	// The default value will be the greater number between 1 or the given minimum.
	Initial int
}

Config allows to configure the number of workers that will be running in the pool.

type Job

type Job interface {
	// Do executes the job.
	//
	// The only parameter that will receive is the worker context,
	// the job should try to honor the context cancellation signal
	// as soon as possible.
	//
	// The context will be cancelled when removing workers from
	// the pool or stopping the pool completely
	Do(ctx context.Context) error
}

Job represents some work that needs to be done non-stop.

func Wrap added in v0.4.0

func Wrap(job Job, middlewares ...Middleware) Job

Wrap is a helper to apply a chain of middleware to a job.

type JobFunc added in v0.3.0

type JobFunc func(ctx context.Context) error

JobFunc is a helper function that is a job.

func (JobFunc) Do added in v0.3.0

func (f JobFunc) Do(ctx context.Context) error

Do executes the job work.

type Middleware added in v0.3.0

type Middleware interface {
	Wrap(job Job) Job
}

Middleware is a function that wraps the job and can be used to extend the functionality of the pool.

type MiddlewareFunc added in v0.3.0

type MiddlewareFunc func(job Job) Job

MiddlewareFunc is a function that implements the job middleware interface.

func (MiddlewareFunc) Wrap added in v0.3.0

func (f MiddlewareFunc) Wrap(job Job) Job

Wrap executes the middleware function wrapping the job.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a pool of workers that can be started to run a job non-stop concurrently.

func Must

func Must(p *Pool, err error) *Pool

Must checks if the result of creating a pool has failed and if so, panics.

func New

func New(middlewares ...Middleware) (*Pool, error)

New creates a new pool with the default configuration.

It accepts an arbitrary number of job middlewares to run.

func NewWithConfig

func NewWithConfig(cfg Config, middlewares ...Middleware) (*Pool, error)

NewWithConfig creates a new pool with an specific configuration.

It accepts an arbitrary number of job middlewares to run.

func (*Pool) Close

func (p *Pool) Close(ctx context.Context) error

Close stops all the workers and closes the pool.

Only the first call to Close will shutdown the pool, the next calls will be ignored and return nil.

func (*Pool) CloseWIthTimeout

func (p *Pool) CloseWIthTimeout(timeout time.Duration) error

CloseWIthTimeout closes the pool waiting for a certain amount of time.

func (*Pool) Current

func (p *Pool) Current() int

Current returns the current number of workers.

There may be more workers executing jobs while they are to complete it's last job after being removed, but they will eventually finish and stop processing new jobs.

func (*Pool) Less

func (p *Pool) Less() error

Less removes the number of workers in the pool.

This call does not wait for the worker to finish its current job, if the pool is closed though, the call to close it will wait for all removed workers to finish before returning.

func (*Pool) More

func (p *Pool) More() error

More starts a new worker in the pool.

func (*Pool) Start

func (p *Pool) Start(job Job) error

Start launches the workers and keeps them running until the pool is closed.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL