package module
Version: v1.0.0-...-19a698a Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2019 License: LGPL-3.0 Imports: 5 Imported by: 119


worker - Utilities for handling long lived Go workers




View Source
const (

	// KeyState applies to a worker; possible values are "starting", "started",
	// "stopping", or "stopped". Or it might be something else, in distant
	// Reporter implementations; don't make assumptions.
	KeyState = "state"

	// KeyReport holds an arbitrary map of information returned by a manifold
	// Worker that is also a Reporter.
	KeyReport = "report"

	// KeyLastStart holds the time of when the worker was last started.
	KeyLastStart = "started"

The Key constants describe the constant features of an Engine's Report.

View Source
const DefaultRestartDelay = 3 * time.Second

DefaultRestartDelay holds the default length of time that a worker will wait between exiting and being restarted by a Runner.


View Source
var (
	ErrNotFound = errors.New("worker not found")
	ErrStopped  = errors.New("aborted waiting for worker")
	ErrDead     = errors.New("worker runner is not running")


func Dead

func Dead(worker Worker) <-chan struct{}

Dead returns a channel that will be closed when the supplied Worker has completed. If the worker implements

interface {Dead() <-chan struct{}}

then if the result of that method is non-nil, it will be returned.

Don't be too casual about calling Dead -- for example, in a standard select loop, `case <-worker.Dead(w):` will create one new goroutine per iteration, which is... untidy.

func Stop

func Stop(worker Worker) error

Stop kills the given Worker and waits for it to complete.


type Clock

type Clock interface {
	Now() time.Time
	After(time.Duration) <-chan time.Time

Clock represents the methods needed from the clock.

type Logger

type Logger interface {
	Debugf(string, ...interface{})
	Infof(string, ...interface{})
	Errorf(string, ...interface{})

Logger represents the various logging methods used by the runner.

type Reporter

type Reporter interface {

	// Report returns a map describing the state of the receiver. It is expected
	// to be goroutine-safe.
	// It is polite and helpful to use the Key* constants and conventions defined
	// and described in this package, where appropriate, but that's for the
	// convenience of the humans that read the reports; we don't and shouldn't
	// have any code that depends on particular Report formats.
	Report() map[string]interface{}

Reporter defines an interface for extracting human-relevant information from a worker.

type Runner

type Runner struct {
	// contains filtered or unexported fields

Runner runs a set of workers, restarting them as necessary when they fail.

func NewRunner

func NewRunner(p RunnerParams) *Runner

NewRunner creates a new Runner. When a worker finishes, if its error is deemed fatal (determined by calling isFatal), all the other workers will be stopped and the runner itself will finish. Of all the fatal errors returned by the stopped workers, only the most important one, determined by calling moreImportant, will be returned from Runner.Wait. Non-fatal errors will not be returned.

The function isFatal(err) returns whether err is a fatal error. The function moreImportant(err0, err1) returns whether err0 is considered more important than err1.

func (*Runner) Kill

func (runner *Runner) Kill()

Kill implements Worker.Kill

func (*Runner) Report

func (runner *Runner) Report() map[string]interface{}

Report implements Reporter.

func (*Runner) StartWorker

func (runner *Runner) StartWorker(id string, startFunc func() (Worker, error)) error

StartWorker starts a worker running associated with the given id. The startFunc function will be called to create the worker; when the worker exits, it will be restarted as long as it does not return a fatal error.

If there is already a worker with the given id, nothing will be done.

StartWorker returns ErrDead if the runner is not running.

func (*Runner) StopWorker

func (runner *Runner) StopWorker(id string) error

StopWorker stops the worker associated with the given id. It does nothing if there is no such worker.

StopWorker returns ErrDead if the runner is not running.

func (*Runner) Wait

func (runner *Runner) Wait() error

Wait implements Worker.Wait

func (*Runner) Worker

func (runner *Runner) Worker(id string, stop <-chan struct{}) (Worker, error)

Worker returns the current worker for the given id. If a worker has been started with the given id but is not currently available, it will wait until it is available, stopping waiting if it receives a value on the stop channel.

If there is no worker started with the given id, Worker will return ErrNotFound. If it was stopped while waiting, Worker will return ErrStopped. If the runner has been killed while waiting, Worker will return ErrDead.

type RunnerParams

type RunnerParams struct {
	// IsFatal is called when a worker exits. If it returns
	// true, all the other workers
	// will be stopped and the runner itself will finish.
	// If IsFatal is nil, all errors will be treated as fatal.
	IsFatal func(error) bool

	// When the runner exits because one or more
	// workers have returned a fatal error, only the most important one,
	// will be returned. MoreImportant should report whether
	// err0 is more important than err1.
	// If MoreImportant is nil, the first error reported will be
	// returned.
	MoreImportant func(err0, err1 error) bool

	// RestartDelay holds the length of time the runner will
	// wait after a worker has exited with a non-fatal error
	// before it is restarted.
	// If this is zero, DefaultRestartDelay will be used.
	RestartDelay time.Duration

	// Clock is used for timekeeping. If it's nil, clock.WallClock
	// will be used.
	Clock Clock

	// Logger is used to provide an implementation for where the logging
	// messages go for the runner. If it's nil, no logging output.
	Logger Logger

RunnerParams holds the parameters for a NewRunner call.

type Worker

type Worker interface {
	// Kill asks the worker to stop and returns immediately.

	// Wait waits for the worker to complete and returns any
	// error encountered when it was running or stopping.
	Wait() error

Worker describes any type whose validity and/or activity is bounded in time. Most frequently, they will represent the duration of some task or tasks running on internal goroutines, but it's possible and rational to use them to represent any resource that might become invalid.

Worker implementations must be goroutine-safe.


Path Synopsis
Catacomb leverages tomb.Tomb to bind the lifetimes of, and track the errors of, a group of related workers.
Catacomb leverages tomb.Tomb to bind the lifetimes of, and track the errors of, a group of related workers.
Package dependency exists to address a general problem with shared resources and the management of their lifetimes.
Package dependency exists to address a general problem with shared resources and the management of their lifetimes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL