task

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2018 License: MIT Imports: 6 Imported by: 0

README

Task - package for repetitive tasks

Director observes workers and tell them what and when they should do. Workers do work.

Check more at godoc

Available parameters
Director params
  1. WithIdGenerator(func() string) - rule, how to give a name to a worker.
Worker params
  1. Id(string) sets name for worker.
  2. Repeat(int) execute worker n times. 0 by default. For negative values of n executes infinitely.
  3. Infinitely() execute worker infinitely.
  4. WithTimeout(time.Duration) set timeout for loop executions.
  5. WithTimeoutFunc(func(time.Time) time.Duration) set timeout rule for loop executions.
  6. WithArgs(...interface{}) sets args for first worker execution.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DirectorAlreadyWorks = errors.New("director is working already")
	DirectorNotWorks     = errors.New("director is not working yet")
)
View Source
var TimeoutError = errors.New("execution timeout")

Functions

This section is empty.

Types

type Director

type Director struct {
	// contains filtered or unexported fields
}

The main figure in working process.

func NewDirector

func NewDirector(options ...DirectorOption) *Director

NewDirector creates director, which observes workers. Director may take some options

func (*Director) Begin

func (d *Director) Begin(params ...WorkerParameter)

Begin begins workers executions. Panics, when call on already working director.

func (*Director) Wait

func (d *Director) Wait()

Wait waits until all workplaces done their work. Panics on calling not working director.

func (*Director) With

func (d *Director) With(worker Worker, params ...WorkerParameter) *Director

With attaches Worker to Director with given parameters, worker with parameters is a Workplace.

type DirectorOption

type DirectorOption func(*directorOptions)

Generic option for director.

func WithIdGenerator

func WithIdGenerator(generator func() string) DirectorOption

WithIdGenerator sets generator function that should generate unique ids for workers, when id for worker not set directly.

type TimeoutMiddleware

type TimeoutMiddleware struct {
	// contains filtered or unexported fields
}

func NewWorkerTimeout

func NewWorkerTimeout(next Worker, timeout time.Duration) *TimeoutMiddleware

func (*TimeoutMiddleware) Work

func (s *TimeoutMiddleware) Work(ctx context.Context, args ...interface{}) ([]interface{}, error)

type Worker

type Worker interface {
	Work(ctx context.Context, args ...interface{}) ([]interface{}, error)
}

Worker is an instance that should do some work. It takes execution context and any amount of arguments and returns any amount of results and error. Arguments comes from triggers or executions of previous worker in chain.

type WorkerParameter

type WorkerParameter func(*workerParams)

Generic parameter for worker.

func Every

func Every(duration time.Duration) WorkerParameter

Every adds duration to ticker list. Each Every applied as OR, so Every(time.Second), Every(time.Second*2) will start one worker each second and two workers each two seconds.

func Id

func Id(id string) WorkerParameter

Id sets name for worker. Random UUID by default.

func IgnoreWorkerErrors

func IgnoreWorkerErrors(ignore bool) WorkerParameter

IgnoreWorkerErrors sets flag, that continues worker execution if error was occurred.

False by default.

func Infinitely

func Infinitely() WorkerParameter

Infinitely is a shortcut for Repeat(-1).

func Next

func Next(ids ...string) WorkerParameter

Next adds worker names to 'next' list. After execution main worker director concurrently executes all workers in list with provided arguments taken from main worker results.

func NotifyError

func NotifyError(errCh chan<- error) WorkerParameter

NotifyError sends error to channel provided channel if any was occurred.

func PanicOnError

func PanicOnError(p bool) WorkerParameter

PanicOnError sets panic flag. if error was occurred, director will panic.

False by default.

func Repeat

func Repeat(n int) WorkerParameter

Repeat sets amount of executions for worker. For negative values of `n` worker will executes infinitely. 0 by default.

func TriggerOn

func TriggerOn(channels ...<-chan interface{}) WorkerParameter

TriggerOn adds channels to subscribe list. Execute worker, when something received from any channel.

func WithArgs

func WithArgs(args ...interface{}) WorkerParameter

WithArgs sets args for first worker execution. nil by default.

func WithContext

func WithContext(ctx context.Context) WorkerParameter

WithContext sets worker execution context. Director looks at context.Done() and when it triggers, director stops all new worker executions. It does not stops currently working workers, which were stated as next element in chain execution by Next() declaration. In this case, you should stop it by yourself.

context.Background by default.

func WithDelay

func WithDelay(duration time.Duration) WorkerParameter

WithDelay sets constant delay for loop executions. 0 by default.

func WithDelayFunc

func WithDelayFunc(delayFunc func(last time.Time) time.Duration) WorkerParameter

WithDelayFunc sets delay rule for loop executions.

type WorkplaceStatus

type WorkplaceStatus struct {
	// contains filtered or unexported fields
}

func NewWorkerStatus

func NewWorkerStatus(next Worker) *WorkplaceStatus

func (*WorkplaceStatus) Busy

func (s *WorkplaceStatus) Busy() bool

func (*WorkplaceStatus) Status

func (s *WorkplaceStatus) Status() (working, executed uint64)

func (*WorkplaceStatus) Work

func (s *WorkplaceStatus) Work(ctx context.Context, args ...interface{}) ([]interface{}, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL