workers

package module
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2021 License: MIT Imports: 5 Imported by: 0

README

workers

godoc ci coverage goreport

Go package that allows to run a pool of workers to run a job concurrently in the background.

Usage

Create a pool and start running a job.

package main

import (
	"context"
	"log"
	"os"
	"os/signal"

	"github.com/hmoragrega/workers"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	stop := make(chan os.Signal, 1)
	signal.Notify(stop, os.Interrupt)

	go func() {
		<-stop
		cancel()
	}()

	job := workers.JobFunc(func(ctx context.Context) error {
		// my job code 
		return nil
	})

	var pool workers.Pool

	if err := pool.Run(ctx, job); err != nil {
		log.Fatal("job pool failed", err)
	}
}
Pool

A pool runs a single job trough a number of concurrent workers.

By default, a pool will have one worker, and will allow to increase the number of workers indefinitely or even remove them all to pause the job.

There are a few configuration parameters that can tweak the pool behaviour

type Config struct {
	// Min indicates the minimum number of workers that can run concurrently.
	// By default the pool can have 0 workers, pausing it effectively.
	Min int

	// Max indicates the maximum number of workers that can run concurrently.
	// the default "0" indicates an infinite number of workers.
	Max int

	// Initial indicates the initial number of workers that should be running.
	// The default value will be the greater number between 1 or the given minimum.
	Initial int

	// StopOnErrors indicates whether the pool should stop when job a returns an error.
	StopOnErrors bool
}

To have a pool with a tweaked config you can call NewWithConfig

pool := workers.Must(NewWithConfig(workers.Config{
	Min:     3,
	Max:     10,
	Initial: 5,
}))
Running non-stop

If you want to keep the pool running in a blocking call you can call Run or RunWithBuilder.

In this mode the pool will run until the given context is terminated.

The pool then will be closed without a timeout.

var pool workers.Pool

if err := pool.Run(ctx, job); err != nil {
	log.Println("pool ended with error", err)
}

Alternatively you can start and stop the pool to have a fine-grained control of the pool live-cycle.

Starting the pool

To start the pool give it a job (or a job builder) to run:

if err := pool.Start(job); err != nil {
	log.Println("cannot start the pool", err)
}

The operation will fail if:

  • the pool is has already been closed.
  • the pool is already running.
Stopping the pool

Stopping the pool will request all the workers stop and wait until all of them finish its ongoing jobs, or the given context is cancelled.

if err := pool.Close(ctx); err != nil {
	log.Println("cannot start the pool", err)
}

Close will also wait for workers that were removed but are still executing its last job

The operation will fail if:

  • the pool is has already been closed.
  • the pool is not running

Alternative CloseWithTimeout can be used passing a time.Duration as a helper.

Adding workers

To add a new worker to the pool you can call

if err := pool.More(); err != nil {
	log.Println("cannot add more workers", err)
}

The operation will fail if:

  • the maximum number of workers has been reached
  • the pool has already been closed
  • the pool is not running
Removing workers

To remove a worker you can use Less.

if err := pool.Less(); err != nil {
	log.Println("cannot remove more workers", err)
}

Less will remove a worker from the pool, immediately reducing the number of workers running, and request the worker to stop processing jobs, but it won't wait until the worker finally stops.

Note: the pool will wait until all jobs finish, even those from those workers that were removed.

The operation will fail if:

  • the minimum number of workers has been reached
  • the pool has already been closed
  • the pool is not running
Job

A job represents a task that needs to be performed.

// Job represent some work that needs to be done.
type Job interface {
	// Do executes the job.
	//
	// The only parameter that will receive is a context, the job
	// should try to honor the context cancellation signal as soon
	// as possible.
	//
	// The context will be cancelled when removing workers from
	// the pool or stopping the pool completely.
	Do(ctx context.Context) error
}

Simple jobs can use the helper JobFunc to comply with the interface

// JobFunc is a helper function that is a job.
type JobFunc func(ctx context.Context) error

To extend the functionality of jobs you can use builders, middlewares or wrappers.

Builder Jobs

Sometimes you want to share data across job execution within the same worker.

For this you can use a job builder to indicate that every worker that joins the pool will have its own job.

NOTE: in this case, the jobs are not going to be running concurrently, making much easier to avoid data races.

// JobBuilder is a job that needs to be built during
// the initialization for each worker.
type JobBuilder interface {
	// New generates a new job for a new worker.
	New() Job
}

You will have to call StartWithBuilder method to start the pool with a job builder.

var (
	numberOfWorkers = 3
	workSlots       = make([]int, numberOfWorkers)	
	workerID int32
)

builder := JobBuilderFunc(func() Job {
	id := atomic.AddInt32(&workerID, 1)

	var count int
	return JobFunc(func(ctx context.Context) error {
		// count is not shared across worker goroutines
		// no need to protect it against data races
		count++
		// same for the slice, since each worker
		// updates only its own index.
		workSlots[id-1] = count
		return nil
   	})
})

p.StartWithBuilder(builder)
Job Middleware

A middleware allows to extend the job capabilities

// Middleware is a function that wraps the job and can
// be used to extend the functionality of the pool.
type Middleware interface {
	Wrap(job Job) Job
}

The helper MiddlewareFunc can be used to wrap simple middleware functions

// MiddlewareFunc is a function that implements the
// job middleware interface.
type MiddlewareFunc func(job Job) Job

Some example of middleware:

  • Counter counts how many jobs start, finish and fail.
  • Elapsed extends the counter middleware providing also:
    • the total amount of time.
    • the average time.
    • the time of the last executed job.
  • Retry it will retry failed jobs a certain amount of times.
  • Wait allows to add a pause between job executions. (Job will still be running concurrently if there are more workers).

As an exercise let's log the job result with our favourite logging library.

// loggerMiddleware is a middleware that logs the result of a job
// with "debug" or "error" level depending on the result.
loggerMiddleware := func(name string) workers.MiddlewareFunc {
	return func(job workers.Job) workers.Job {
		return workers.JobFunc(func(ctx context.Context) error {
			err := job.Do(ctx)

			logger.Cond(err != nil, logger.Error, logger.Debug).
				Log("job executed", "job", jobName, "error", err)

			return err
		})
	}
}

pool := workers.New(loggerMiddleware("my-job"))
pool.Start(workers.JobFunc(func(ctx context.Context) error {
	return someWork()
}))

If you need to add a middleware before starting the job instead of on pool creating there's the little handy function Wrap that will easy applying them for you.

// Wrap is a helper to apply a chain of middleware to a job.
func Wrap(job Job, middlewares ...Middleware) Job
var pool Pool

job := workers.JobFunc(func(ctx context.Context) (err error) {
  	// work
	return err
}

pool.Start(workers.Wrap(job, jobLogger("my-job")))
Job Wrapper

A job wrapper can be used to change the signature of a job.

For example, your job may never return errors, yet still has to comply with the Job interface that requires it.

You could use the NoError wrapper to avoid having to return error from you job function.

job := func(ctx context.Context) {
	// work
}

var pool workers.Pool 
pool.Start(workers.NoError(job))

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrPoolClosed is triggered when trying to start and add or remove
	// workers from the pool after closing it.
	ErrPoolClosed = errors.New("pool is closed")

	// ErrPoolStarted is triggered when trying to start the pool when it's
	// already running.
	ErrPoolStarted = errors.New("pool already started")

	// ErrNotStarted is returned when trying to add or remove workers from
	// the pool after closing it.
	ErrNotStarted = errors.New("pool has not started")

	// ErrInvalidMax is triggered when configuring a pool with an invalid
	// maximum number of workers.
	ErrInvalidMax = errors.New("the maximum is less than the minimum workers")

	// ErrInvalidMin is triggered when configuring a pool with an invalid
	// minimum number of workers.
	ErrInvalidMin = errors.New("negative number of minimum workers")

	// ErrInvalidInitial is triggered when configuring a pool with an invalid
	// initial number of workers.
	ErrInvalidInitial = errors.New("the initial is less the minimum workers")

	// ErrMinReached is triggered when trying to remove a worker when the
	// pool is already running at minimum capacity.
	ErrMinReached = errors.New("minimum number of workers reached")

	// ErrMaxReached is triggered when trying to add a worker when the
	// pool is already running at maximum capacity.
	ErrMaxReached = errors.New("maximum number of workers reached")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// Min indicates the minimum number of workers that can run concurrently.
	// By default the pool can have 0 workers, pausing it effectively.
	Min int

	// Max indicates the maximum number of workers that can run concurrently.
	// the default "0" indicates an infinite number of workers.
	Max int

	// Initial indicates the initial number of workers that should be running.
	// The default value will be the greater number between 1 or the given minimum.
	Initial int

	// StopOnErrors indicates whether the pool should stop when job a returns an error.
	StopOnErrors bool
}

Config allows to configure the number of workers that will be running in the pool.

type Job

type Job interface {
	// Do executes the job.
	//
	// The only parameter that will receive is the worker context,
	// the job should try to honor the context cancellation signal
	// as soon as possible.
	//
	// The context will be cancelled when removing workers from
	// the pool or stopping the pool completely
	Do(ctx context.Context) error
}

Job represents some work that needs to be done non-stop.

func Wrap added in v0.4.0

func Wrap(job Job, middlewares ...Middleware) Job

Wrap is a helper to apply a chain of middleware to a job.

type JobBuilder added in v0.6.0

type JobBuilder interface {
	// New generates a new job for a new worker.
	New() Job
}

JobBuilder is a job that needs to be built during the initialization for each worker.

type JobBuilderFunc added in v0.6.0

type JobBuilderFunc func() Job

JobBuilderFunc is a helper to use function as job builders.

func (JobBuilderFunc) New added in v0.6.0

func (f JobBuilderFunc) New() Job

New builds the new job using the underlying function.

type JobFunc added in v0.3.0

type JobFunc func(ctx context.Context) error

JobFunc is a helper function that is a job.

func (JobFunc) Do added in v0.3.0

func (f JobFunc) Do(ctx context.Context) error

Do executes the job work.

type Middleware added in v0.3.0

type Middleware interface {
	Wrap(job Job) Job
}

Middleware is a function that wraps the job and can be used to extend the functionality of the pool.

type MiddlewareFunc added in v0.3.0

type MiddlewareFunc func(job Job) Job

MiddlewareFunc is a function that implements the job middleware interface.

func (MiddlewareFunc) Wrap added in v0.3.0

func (f MiddlewareFunc) Wrap(job Job) Job

Wrap executes the middleware function wrapping the job.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a pool of workers that can be started to run a job non-stop concurrently.

func Must

func Must(p *Pool, err error) *Pool

Must checks if the result of creating a pool has failed and if so, panics.

func New

func New(middlewares ...Middleware) *Pool

New creates a new pool with the default configuration.

It accepts an arbitrary number of job middlewares to run.

func NewWithConfig

func NewWithConfig(cfg Config, middlewares ...Middleware) (*Pool, error)

NewWithConfig creates a new pool with an specific configuration.

It accepts an arbitrary number of job middlewares to run.

func (*Pool) Close

func (p *Pool) Close(ctx context.Context) error

Close stops all the workers and closes the pool.

Only the first call to Close will shutdown the pool, the next calls will be ignored and return nil.

func (*Pool) CloseWithTimeout added in v0.6.0

func (p *Pool) CloseWithTimeout(timeout time.Duration) error

CloseWithTimeout closes the pool waiting for a certain amount of time.

func (*Pool) Current

func (p *Pool) Current() int

Current returns the current number of workers.

There may be more workers executing jobs while they are to complete it's last job after being removed, but they will eventually finish and stop processing new jobs.

func (*Pool) Less

func (p *Pool) Less() error

Less removes the number of workers in the pool.

This call does not wait for the worker to finish its current job, if the pool is closed though, the call to close it will wait for all removed workers to finish before returning.

func (*Pool) More

func (p *Pool) More() error

More starts a new worker in the pool.

func (*Pool) Run added in v0.7.0

func (p *Pool) Run(ctx context.Context, job Job) error

Run is a blocking call that will start the pool and keep it running until the context is terminated.

The pool then will be stopped without a timeout

func (*Pool) RunWithBuilder added in v0.7.0

func (p *Pool) RunWithBuilder(ctx context.Context, jobBuilder JobBuilder) error

RunWithBuilder is a blocking call that will start the pool with a job builder and keep it running until the context is terminated.

The pool then will be stopped without a timeout

func (*Pool) Start

func (p *Pool) Start(job Job) error

Start launches the workers and keeps them running until the pool is closed.

func (*Pool) StartWithBuilder added in v0.7.0

func (p *Pool) StartWithBuilder(jobBuilder JobBuilder) error

StartWithBuilder launches the workers and keeps them running until the pool is closed.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL