task

package
v0.2024.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2024 License: GPL-3.0 Imports: 9 Imported by: 0

Documentation

Overview

Package task defines Task, a unit of work which can be retried if it fails.

Index

Examples

Constants

View Source
const RetryUnlimited int = math.MaxInt

RetryUnlimited allows a Task to be restarted forever. Used as an argument to WithRetry.

Variables

View Source
var ErrCannotBeConfigured = errors.New("cannot be configured")

ErrCannotBeConfigured is returned when attempting to Configure an automation that does not implement Configurer.

View Source
var ErrCannotBeStopped = errors.New("cannot be stopped")

ErrCannotBeStopped is returned when attempting to Stop an automation that does not implement Stopper.

Functions

func Configurable

func Configurable(s any) bool

Configurable returns whether the given automation can be configured.

func Configure

func Configure(s any, configData []byte) error

Configure attempts to configure the given automation. If the automation does not implement Configurer then ErrCannotBeConfigured will be returned.

func Run

func Run(ctx context.Context, task Task, options ...Option) error

Run will run a Task to completion in a blocking fashion. By default, the Task is run once. Pass Options in order to add automatic retry with backoff, logging etc.

This is a convenience function that constructs a Runner and calls Runner.Step in a loop until the runner completes.

func Stop

func Stop(s any) error

Stop attempts to stop the given automation. If the automation does not implement Stopper then ErrCannotBeStopped will be returned.

func Stoppable

func Stoppable(s any) bool

Stoppable returns whether the given automation can be stopped.

Types

type Configurer

type Configurer interface {
	Configure(configData []byte) error
}

Configurer describes types that can be configured. Configuration data is represented as a []byte and the automation is expected to decode this data into an internally significant data structure.

type Group

type Group struct {
	// contains filtered or unexported fields
}

func (*Group) Spawn

func (g *Group) Spawn(ctx context.Context, tag string, task Task, options ...Option) *GroupTask

func (*Group) Tasks

func (g *Group) Tasks() map[string]*GroupTask

type GroupTask

type GroupTask struct {
	// contains filtered or unexported fields
}

func (*GroupTask) Cancel

func (gt *GroupTask) Cancel()

func (*GroupTask) State

func (gt *GroupTask) State() (state State, err error)

func (*GroupTask) Tag

func (gt *GroupTask) Tag() string

func (*GroupTask) Wait

func (gt *GroupTask) Wait(ctx context.Context) (done bool)

type Intermittent

type Intermittent struct {
	// contains filtered or unexported fields
}

Intermittent manages the lifecycle of a long-running background operation that needs to outlive a single context. A context can be attached to the Intermittent by calling Attach, which will start the background operation if it is not already running. Once all attached contexts are cancelled, the background operation will be stopped.

Attach is safe for use by multiple go routines.

func NewIntermittent

func NewIntermittent(operation StartFn) *Intermittent

func Poll

func Poll(action func(context.Context), interval time.Duration) *Intermittent

Poll creates a task that calls a function at a regular interval while it is running. The action will not be run until the returned Intermittent.Attach is called.

func (*Intermittent) Attach

func (t *Intermittent) Attach(listener context.Context) error

Attach adds ensures that the background task will remain running for at least as long as listener is not done. If the background task is not running it will be started.

Returns an error iff the background task is started by this call and when starting it returns an error.

type Lifecycle

type Lifecycle[C any] struct {
	// ApplyConfig is a function that gets called each time config is changed.
	ApplyConfig RunConfigFunc[C]
	Logger      *zap.Logger
	// ReadConfig converts bytes into C.
	// Defaults to json.Unmarshal.
	ReadConfig func(bytes []byte) (C, error)
	// contains filtered or unexported fields
}

Lifecycle manages the lifecycle of a driver as per task.Starter, task.Configurer, and task.Stopper. Embed Lifecycle into your own driver type and provide a RunConfigFunc that binds your driver based on config.

func NewLifecycle

func NewLifecycle[C any](runFunc RunConfigFunc[C]) *Lifecycle[C]

NewLifecycle creates a new Lifecycle that calls runFunc each time config is loaded.

func (*Lifecycle[C]) Configure

func (s *Lifecycle[C]) Configure(configData []byte) error

Configure instructs the driver to setup and announce any devices found in configData. configData should be an encoded JSON object matching config.Root.

Configure must not be called before Start, but once Started can be called concurrently.

func (*Lifecycle[C]) CurrentState

func (s *Lifecycle[C]) CurrentState() Status

func (*Lifecycle[C]) Start

func (s *Lifecycle[C]) Start(_ context.Context) error

Start makes this driver available to be configured. Call Stop when you're done with the driver to free up resources.

Start must be called before Configure. Once started Configure and Stop may be called from any go routine.

func (*Lifecycle[C]) Stop

func (s *Lifecycle[C]) Stop() error

Stop stops the driver and releases resources. Stop races with Start before Start has completed, but can be called concurrently once started.

func (*Lifecycle[C]) WaitForStateChange

func (s *Lifecycle[C]) WaitForStateChange(ctx context.Context, sourceState Status) error

type Next

type Next int

Next allows a task to specify a preferred retry behaviour.

const (
	// Normal mode of operation - Task is retried if it returns a non-nil error.
	Normal Next = iota
	// StopNow will prevent the task from being restarted, even if it returns a non-nil error.
	StopNow
	// RetryNow will restart the task immediately, without any delay. The backoff will also be reset.
	RetryNow
	// ResetBackoff will reset the Task restart delay to its starting value, and then continue as normal.
	ResetBackoff
)

func (Next) String

func (n Next) String() string

type Option

type Option func(o *Runner)

func WithBackoff

func WithBackoff(start time.Duration, max time.Duration) Option

WithBackoff adds exponential backoff when retrying tasks. The delay begins at start, and is capped at max. After each attempt, the delay increases by a factor of 1.5.

func WithErrorLogger

func WithErrorLogger(logger *zap.Logger) Option

WithErrorLogger will log to the provided logger every time the Task returns a non-nil error.

func WithRetry

func WithRetry(attempts int) Option

WithRetry places a limit on the number of times a Task may be restarted before we give up. By default, a Task will only be run once. Pass RetryUnlimited to retry forever.

func WithRetryDelay

func WithRetryDelay(delay time.Duration) Option

WithRetryDelay adds a fixed delay between a Task returning and the next attempt starting. By default, there is no delay and retries happen immediately.

func WithTimeout

func WithTimeout(timeout time.Duration) Option

WithTimeout imposes a time limit on each individual invocation of the Task.

type RunConfigFunc

type RunConfigFunc[C any] func(ctx context.Context, cfg C) error

RunConfigFunc is called when a drivers config changes and should apply those changes. The ctx will be cancelled if the driver stops or the config is replaced. RunConfigFunc should only block while cfg is being applied, any long running tasks should run in separate go routines.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

A Runner allows the caller control over the invocation of a Task. Call Step to run the Task - the return values will tell you if and when to call Step again.

func NewRunner

func NewRunner(task Task, options ...Option) *Runner

func (*Runner) Step

func (r *Runner) Step(ctx context.Context) (err error, again bool, delay time.Duration)

Step will run this Runner's Task once. The error from the task is returned as err. If the task should be run again due to the applicable retry options, then the return value again will be true and the required delay before the next invocation is returned in delay.

Callers will generally want to call Step in a loop until again=false.

Example
n := 0
t := Task(func(ctx context.Context) (Next, error) {
	n++
	return Normal, errors.New("an error")
})

runner := NewRunner(t, WithRetry(3))
for {
	_, again, delay := runner.Step(context.Background())
	if !again {
		break
	}
	time.Sleep(delay)
}
fmt.Println(n)
Output:

3

type StartFn

type StartFn = func(init context.Context) (stop StopFn, err error)

StartFn is called by an Intermittent to start a background operation. The provided context.Context init can be used for initialisation - it is simply the context passed to Intermittent.Attach. However, it is not suitable for long-running background operations - create a long-running context using context.WithCancel(context.Background()). If starting the background operations succeeds, then a StartFn should return a stop function and err should be nil. The stop function will be called when the background operation needs to stop. If starting the background operation fails, then return a non-nil err; the value of the stop function is ignored.

type Starter

type Starter interface {
	// Start instructs the automation to start.
	// The ctx represents how long the type can spend starting before it should give up.
	Start(ctx context.Context) error
}

Starter describes types that can be started.

type State

type State int
const (
	StatePending State = iota + 1 // Task is waiting to start.
	StateRunning                  // Task is running now.
	StateDelay                    // Task is in its delay period before the next attempt.
	StateStopped                  // Task is not running, and won't run again.
)

func (State) String

func (s State) String() string

type Status

type Status string
const (
	StatusInactive Status = "inactive" // Stopped task. Status after calling Stop
	StatusLoading  Status = "loading"  // The task is loading configuration. Status while Configure is running
	StatusActive   Status = "active"   // The task has valid config and is serving requests. Status after calling Start
	StatusError    Status = "error"    // The task failed to start. If start fails, the driver will be in this state.
)

type StopFn

type StopFn = func()

type Stopper

type Stopper interface {
	// Stop instructs the automation to stop what it is doing.
	Stop() error
}

Stopper describes types that can be stopped.

type Task

type Task func(ctx context.Context) (Next, error)

A Task is a unit of work which can be restarted if it fails.

Directories

Path Synopsis
Package serviceapi implements gen.ServiceApi backed by a service.Map.
Package serviceapi implements gen.ServiceApi backed by a service.Map.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL