gorkers

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2022 License: MIT Imports: 6 Imported by: 1

README

gorkers

Maintainability CodeQL GoCover Go Reference

Examples

Getting Started

Import
import (
    "github.com/guilhem/gorkers"
)
Create a worker function 👷

Starting from 0.2, gorkers use go1.18 generics ❤️

Define, your in and out type and create your worker:

work := func(ctx context.Context, in string, out chan<- int) error {
    // work iteration here
}
Create runner 🚶
runner := gorkers.NewRunner(ctx, work, numberOfWorkers, sizeOfBuffer)
  • numberOfWorkers is the number of parallel workers that can be running at the same time
  • sizeOfBuffer is the buffer size of input. If stopped, a runner can lose it's buffer.
Start runner 🏃
if err := runner.Start(); err != nil {
    // error management
}

.Start() can return an error if beforeFunc is in error.

Send work to worker
runner.Send("Hello World")

Send accepts an interface. So send it anything you want.

Wait for the worker to finish
runner.Wait()

.Wait() lock any new .Send() and block until all jobs are finished.

runner.Close()

Use .Close() to prevent any new job to be spawn and sending a context cancellation to any worker.

Stop on errors

StopWhenError is a special function to stop Runner when 1 task return an error.

runner.AfterFunc(gorkers.StopWhenError)
runner.Start()
Log errors

By default errors are ignored. To manage them you can use AfterFunc this way:

logf := func(ctx context.Context, in interface{}, err error) error {
    if err != nil {
        log.Printf("err: %s", err)
    }
    return nil
}
runner.AfterFunc(logf)

Working With Multiple Workers

Passing work form one worker to the next

By using the InFrom method you can tell workerTwo to accept output from workerOne

runnerOne := gorkers.NewRunner(ctx, work1, 100, 100)
runnerTwo := gorkers.NewRunner(ctx, work2, 100, 100).InFrom(workerOne)

runnerOne.Start()
runnerTwo.Start()

runnerOne.Wait().Stop()
runnerTwo.Wait().Stop()
Accepting output from multiple workers

It is possible to accept output from more than one worker but it is up to you to determine what is coming from which worker. (They will send on the same channel.)

runnerOne := gorkers.gewRunner(ctx, NewMyWorker(), 100, 100)
runnerTwo := gorkers.NewRunner(ctx, NewMyWorkerTwo(), 100, 100)
runnerThree := gorkers.NewRunner(ctx, NewMyWorkerThree(), 100, 100).InFrom(workerOne, workerTwo)

Options

Timeout

If your workers needs to stop at a deadline or you just need to have a timeout use the SetTimeout or SetDeadline methods. (These must be in place before setting the workers off to work.)

 // Setting a timeout of 2 seconds
 runner.SetWorkerTimeout(2 * time.Second)

.SetWorkerTimeout() is a timeout for a worker instance to finish.

Deadline
// Setting a deadline of 4 hours from now
runner.SetDeadline(time.Now().Add(4 \* time.Hour))

.SetDeadline() is a limit for runner to finish.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInputClosed = errors.New("input closed")
View Source
var ErrOutAlready = errors.New("out already set")

Functions

func StopWhenError added in v0.1.0

func StopWhenError[I any](ctx context.Context, in I, err error) error

Types

type AfterFunc

type AfterFunc[I, O any] func(ctx context.Context, in I, err error) error

type BeforeFunc

type BeforeFunc[I, O any] func(ctx context.Context) error

type Out added in v0.2.0

type Out[I any] interface {
	SetOut(c chan I) error
}

type Runner

type Runner[I, O any] struct {
	// contains filtered or unexported fields
}

func NewRunner

func NewRunner[I, O any](ctx context.Context, w WorkFunc[I, O], maxWorkers, buffer int64) *Runner[I, O]

NewRunner Factory function for a new Runner. The Runner will handle running the workers logic.

func (*Runner[I, O]) AfterFunc

func (r *Runner[I, O]) AfterFunc(f AfterFunc[I, O]) *Runner[I, O]

AfterFunc Function to be run after worker has stopped. It can be used for logging and error management. input can be retreive with context value:

ctx.Value(workers.InputKey{})

⚠️ If an error is returned it stop Runner execution.

func (*Runner[I, O]) BeforeFunc

func (r *Runner[I, O]) BeforeFunc(f BeforeFunc[I, O]) *Runner[I, O]

BeforeFunc Function to be run before worker starts processing.

func (*Runner[I, O]) Fail added in v0.1.0

func (r *Runner[I, O]) Fail() bool

func (*Runner[I, O]) InFrom

func (r *Runner[I, O]) InFrom(w ...Out[I]) *Runner[I, O]

InFrom Set a worker to accept output from another worker(s).

func (*Runner[I, O]) Metrics added in v0.1.0

func (r *Runner[I, O]) Metrics() (send, ok, fail uint32)

func (*Runner[I, O]) Send

func (r *Runner[I, O]) Send(in I) error

Send Send an object to the worker for processing if context is not Done.

func (*Runner[I, O]) SetDeadline

func (r *Runner[I, O]) SetDeadline(t time.Time) *Runner[I, O]

SetDeadline allows a time to be set when the Runner should stop. ⚠️ Should only be called before Start

func (*Runner[I, O]) SetOut

func (r *Runner[I, O]) SetOut(c chan O) error

SetOut Allows the setting of a workers out channel, if not already set.

func (*Runner[I, O]) SetWorkerTimeout

func (r *Runner[I, O]) SetWorkerTimeout(duration time.Duration) *Runner[I, O]

SetWorkerTimeout allows a time duration to be set when the workers should stop. ⚠️ Should only be called before Start

func (*Runner[I, O]) Start

func (r *Runner[I, O]) Start() error

Start execute beforeFunc and launch worker processing.

func (*Runner[I, O]) Stop

func (r *Runner[I, O]) Stop() *Runner[I, O]

Stop Stops the processing of a worker and waits for workers to finish.

func (*Runner[I, O]) Wait

func (r *Runner[I, O]) Wait() *Runner[I, O]

Wait close the input channel and waits it to drain and process.

type WorkFunc

type WorkFunc[I, O any] func(ctx context.Context, in I, out chan<- O) error

WorkFunc get input and could put in outChan for followers. ⚠️ outChan could be closed if follower is stoped before producer. error returned can be process by afterFunc but will be ignored by default.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL