async

package
v1.20220411.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2022 License: MIT Imports: 6 Imported by: 6

Documentation

Overview

Package async provides synchronization primitives and background workers. This is a core package that is used by a lot of other packages.

Index

Constants

View Source
const (
	LatchStopped  int32 = 0
	LatchStarting int32 = 1
	LatchResuming int32 = 2
	LatchStarted  int32 = 3
	LatchActive   int32 = 4
	LatchPausing  int32 = 5
	LatchPaused   int32 = 6
	LatchStopping int32 = 7
)

Latch states

View Source
const (
	DefaultQueueMaxWork        = 1 << 10
	DefaultInterval            = 500 * time.Millisecond
	DefaultShutdownGracePeriod = 10 * time.Second
)

Constants

Variables

View Source
var (
	ErrCannotStart  ex.Class = "cannot start; already started"
	ErrCannotStop   ex.Class = "cannot stop; already stopped"
	ErrCannotCancel ex.Class = "cannot cancel; already canceled"
)

Errors

Functions

func Recover added in v1.20201204.1

func Recover(action func() error, errors chan error)

Recover runs an action and passes any errors to the given errors channel.

This call blocks, if you need it to be backgrounded, you should call it like:

go Recover(action, errors)
<-errors

func RecoverGroup added in v1.20201204.1

func RecoverGroup(action func() error, errors chan error, wg *sync.WaitGroup)

RecoverGroup runs a recovery against a specific wait group with an error collector. It calls Recover internally.

Types

type Actioner added in v1.20210615.7

type Actioner interface {
	Action(context.Context, interface{}) (interface{}, error)
}

Actioner is a type that can be used as a tracked action.

type ActionerFunc added in v1.20210615.7

type ActionerFunc func(context.Context, interface{}) (interface{}, error)

ActionerFunc is a function that implements action.

func (ActionerFunc) Action added in v1.20210615.7

func (af ActionerFunc) Action(ctx context.Context, args interface{}) (interface{}, error)

Action implements actioner for the function.

type Batch added in v1.20201204.1

type Batch struct {
	Action            WorkAction
	SkipRecoverPanics bool
	Parallelism       int
	Work              chan interface{}
	Errors            chan error
}

Batch is a batch of work executed by a fixed count of workers.

func NewBatch added in v1.20201204.1

func NewBatch(work chan interface{}, action WorkAction, options ...BatchOption) *Batch

NewBatch creates a new batch processor. Batch processes are a known quantity of work that needs to be processed in parallel.

func (*Batch) Process added in v1.20201204.1

func (b *Batch) Process(ctx context.Context)

Process executes the action for all the work items.

type BatchOption added in v1.20201204.1

type BatchOption func(*Batch)

BatchOption is an option for the batch worker.

func OptBatchErrors added in v1.20201204.1

func OptBatchErrors(errors chan error) BatchOption

OptBatchErrors sets the batch worker error return channel.

func OptBatchParallelism added in v1.20201204.1

func OptBatchParallelism(parallelism int) BatchOption

OptBatchParallelism sets the batch worker parallelism, or the number of workers to create.

func OptBatchSkipRecoverPanics added in v1.20210908.5

func OptBatchSkipRecoverPanics(skipRecoverPanics bool) BatchOption

OptBatchSkipRecoverPanics sets the batch worker to throw (or to recover) panics.

type Checker added in v1.20210615.7

type Checker interface {
	Check(context.Context) error
}

Checker is a type that can be checked for SLA status.

type CheckerFunc added in v1.20210615.7

type CheckerFunc func(context.Context) error

CheckerFunc implements Checker.

func (CheckerFunc) Check added in v1.20210615.7

func (cf CheckerFunc) Check(ctx context.Context) error

Check implements Checker.

type ContextAction added in v1.20201204.1

type ContextAction func(ctx context.Context) error

ContextAction is an action that is given a context and returns an error.

type Errors added in v1.20210908.5

type Errors chan error

Errors is a channel for errors

func (Errors) All added in v1.20210908.5

func (e Errors) All() error

All returns all the non-nil errors in the channel as a multi-error.

func (Errors) First added in v1.20210908.5

func (e Errors) First() error

First returns the first (non-nil) error.

type Future added in v1.20210306.2

type Future struct {
	// contains filtered or unexported fields
}

Future is a deferred action.

func Await added in v1.20210306.2

func Await(ctx context.Context, action ContextAction) *Future

Await returns a new future.

func (*Future) Cancel added in v1.20210306.2

func (f *Future) Cancel() error

Cancel quits the future.

func (*Future) Complete added in v1.20210306.2

func (f *Future) Complete() error

Complete blocks on the future completing.

func (*Future) Finished added in v1.20210306.2

func (f *Future) Finished() <-chan error

Finished returns a channel that signals it is finished.

type Interceptor added in v1.20210615.7

type Interceptor interface {
	Intercept(action Actioner) Actioner
}

Interceptor returns an actioner for a given actioner.

func Interceptors added in v1.20210615.7

func Interceptors(interceptors ...Interceptor) Interceptor

Interceptors chains calls to interceptors as a single interceptor.

type InterceptorFunc added in v1.20210615.7

type InterceptorFunc func(Actioner) Actioner

InterceptorFunc is a function that implements action.

func (InterceptorFunc) Intercept added in v1.20210615.7

func (fn InterceptorFunc) Intercept(action Actioner) Actioner

Intercept implements Interceptor for the function.

type Interval

type Interval struct {
	*Latch
	Context     context.Context
	Interval    time.Duration
	Action      ContextAction
	Delay       time.Duration
	StopOnError bool
	Errors      chan error
}

Interval is a background worker that performs an action on an interval.

func NewInterval

func NewInterval(action ContextAction, interval time.Duration, options ...IntervalOption) *Interval

NewInterval returns a new worker that runs an action on an interval.

Example:

iw := NewInterval(func(ctx context.Context) error { return nil }, 500*time.Millisecond)
go iw.Start()
<-iw.Started()

func (*Interval) Dispatch added in v1.20201204.1

func (i *Interval) Dispatch() (err error)

Dispatch is the main dispatch loop.

func (*Interval) Start

func (i *Interval) Start() error

Start starts the worker.

This will start the internal ticker, with a default initial delay of the given interval, and will return an ErrCannotStart if the interval worker is already started.

This call will block.

func (*Interval) Stop

func (i *Interval) Stop() error

Stop stops the worker.

type IntervalOption added in v1.20201204.1

type IntervalOption func(*Interval)

IntervalOption is an option for the interval worker.

func OptIntervalContext added in v1.20201204.1

func OptIntervalContext(ctx context.Context) IntervalOption

OptIntervalContext sets the interval worker context.

func OptIntervalDelay added in v1.20201204.1

func OptIntervalDelay(d time.Duration) IntervalOption

OptIntervalDelay sets the interval worker start delay.

func OptIntervalErrors added in v1.20201204.1

func OptIntervalErrors(errors chan error) IntervalOption

OptIntervalErrors sets the interval worker start error channel.

func OptIntervalStopOnError added in v1.20210306.2

func OptIntervalStopOnError(stopOnError bool) IntervalOption

OptIntervalStopOnError sets if the interval worker should stop on action error.

type Latch

type Latch struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Latch is a helper to coordinate goroutine lifecycles, specifically waiting for goroutines to start and end.

The lifecycle is generally as follows:

0 - stopped - goto 1
1 - starting - goto 2
2 - started - goto 3
3 - stopping - goto 0

Control flow is coordinated with chan struct{}, which acts as a semaphore but can only alert (1) listener as it is buffered.

In order to start a `stopped` latch, you must call `.Reset()` first to initialize channels.

func NewLatch

func NewLatch() *Latch

NewLatch creates a new latch.

func (*Latch) CanStart

func (l *Latch) CanStart() bool

CanStart returns if the latch can start.

func (*Latch) CanStop

func (l *Latch) CanStop() bool

CanStop returns if the latch can stop.

func (*Latch) IsStarted added in v1.20201204.1

func (l *Latch) IsStarted() bool

IsStarted returns if the latch state is LatchStarted.

func (*Latch) IsStarting

func (l *Latch) IsStarting() bool

IsStarting returns if the latch state is LatchStarting

func (*Latch) IsStopped

func (l *Latch) IsStopped() (isStopped bool)

IsStopped returns if the latch state is LatchStopped.

func (*Latch) IsStopping

func (l *Latch) IsStopping() bool

IsStopping returns if the latch state is LatchStopping.

func (*Latch) NotifyStarted

func (l *Latch) NotifyStarted() (notifyStarted <-chan struct{})

NotifyStarted returns the started signal. It is used to coordinate the transition from starting -> started. There can only be (1) effective listener at a time for these events.

func (*Latch) NotifyStarting

func (l *Latch) NotifyStarting() (notifyStarting <-chan struct{})

NotifyStarting returns the starting signal. It is used to coordinate the transition from stopped -> starting. There can only be (1) effective listener at a time for these events.

func (*Latch) NotifyStopped

func (l *Latch) NotifyStopped() (notifyStopped <-chan struct{})

NotifyStopped returns the stopped signal. It is used to coordinate the transition from stopping -> stopped. There can only be (1) effective listener at a time for these events.

func (*Latch) NotifyStopping

func (l *Latch) NotifyStopping() (notifyStopping <-chan struct{})

NotifyStopping returns the should stop signal. It is used to trigger the transition from running -> stopping -> stopped. There can only be (1) effective listener at a time for these events.

func (*Latch) Reset

func (l *Latch) Reset()

Reset resets the latch.

func (*Latch) Started

func (l *Latch) Started()

Started signals that the latch is started and has entered the `IsStarted` state.

func (*Latch) Starting

func (l *Latch) Starting()

Starting signals the latch is starting. This is typically done before you kick off a goroutine.

func (*Latch) Stopped

func (l *Latch) Stopped()

Stopped signals the latch has stopped.

func (*Latch) Stopping

func (l *Latch) Stopping()

Stopping signals the latch to stop. It could also be thought of as `SignalStopping`.

func (*Latch) WaitStarted added in v1.20201204.1

func (l *Latch) WaitStarted()

WaitStarted triggers `Starting` and waits for the `Started` signal.

func (*Latch) WaitStopped added in v1.20201204.1

func (l *Latch) WaitStopped()

WaitStopped triggers `Stopping` and waits for the `Stopped` signal.

type NoopActioner added in v1.20210615.7

type NoopActioner struct{}

NoopActioner is an actioner type that does nothing.

func (NoopActioner) Action added in v1.20210615.7

func (n NoopActioner) Action(_ context.Context, _ interface{}) (interface{}, error)

Action implements actioner

type Queue added in v1.20201204.1

type Queue struct {
	*Latch

	Action              WorkAction
	Context             context.Context
	Errors              chan error
	Parallelism         int
	MaxWork             int
	ShutdownGracePeriod time.Duration

	// these will typically be set by Start
	AvailableWorkers chan *Worker
	Workers          []*Worker
	Work             chan interface{}
}

Queue is a queue with multiple workers.

func NewQueue

func NewQueue(action WorkAction, options ...QueueOption) *Queue

NewQueue returns a new parallel queue.

func (*Queue) Background added in v1.20201204.1

func (q *Queue) Background() context.Context

Background returns a background context.

func (*Queue) Close added in v1.20201204.1

func (q *Queue) Close() error

Close stops the queue. Any work left in the queue will be discarded.

func (*Queue) Dispatch added in v1.20201204.1

func (q *Queue) Dispatch()

Dispatch processes work items in a loop.

func (*Queue) Enqueue added in v1.20201204.1

func (q *Queue) Enqueue(obj interface{})

Enqueue adds an item to the work queue.

func (*Queue) ReturnWorker added in v1.20201204.1

func (q *Queue) ReturnWorker(ctx context.Context, worker *Worker) error

ReturnWorker returns a given worker to the worker queue.

func (*Queue) Start added in v1.20201204.1

func (q *Queue) Start() error

Start starts the queue and its workers. This call blocks.

func (*Queue) Stop added in v1.20201204.1

func (q *Queue) Stop() error

Stop stops the queue and processes any remaining items.

type QueueOption added in v1.20201204.1

type QueueOption func(*Queue)

QueueOption is an option for the queue worker.

func OptQueueContext added in v1.20201204.1

func OptQueueContext(ctx context.Context) QueueOption

OptQueueContext sets the queue worker context.

func OptQueueErrors added in v1.20201204.1

func OptQueueErrors(errors chan error) QueueOption

OptQueueErrors sets the queue worker start error channel.

func OptQueueMaxWork added in v1.20201204.1

func OptQueueMaxWork(maxWork int) QueueOption

OptQueueMaxWork sets the queue worker max work.

func OptQueueParallelism added in v1.20201204.1

func OptQueueParallelism(parallelism int) QueueOption

OptQueueParallelism sets the queue worker parallelism.

type SignalGroup added in v1.20210306.2

type SignalGroup struct {
	// contains filtered or unexported fields
}

SignalGroup is a wait group but it awaits on a signal.

func (*SignalGroup) Add added in v1.20210306.2

func (sg *SignalGroup) Add(delta int)

Add adds delta.

func (*SignalGroup) Done added in v1.20210306.2

func (sg *SignalGroup) Done()

Done decrements delta.

func (*SignalGroup) Wait added in v1.20210306.2

func (sg *SignalGroup) Wait() <-chan struct{}

Wait returns a channel you can select from.

type WorkAction added in v1.20201204.1

type WorkAction func(context.Context, interface{}) error

WorkAction is an action handler for a queue.

type Worker added in v1.20201204.1

type Worker struct {
	*Latch

	Context   context.Context
	Action    WorkAction
	Finalizer WorkerFinalizer

	SkipRecoverPanics bool
	Errors            chan error
	Work              chan interface{}
}

Worker is a worker that is pushed work over a channel. It is used by other work distribution types (i.e. queue and batch) but can also be used independently.

func NewWorker added in v1.20201204.1

func NewWorker(action WorkAction) *Worker

NewWorker creates a new worker.

func (*Worker) Background added in v1.20201204.1

func (w *Worker) Background() context.Context

Background returns the queue worker background context.

func (*Worker) Dispatch added in v1.20201204.1

func (w *Worker) Dispatch()

Dispatch starts the listen loop for work.

func (*Worker) Enqueue added in v1.20201204.1

func (w *Worker) Enqueue(obj interface{})

Enqueue adds an item to the work queue.

func (*Worker) Execute added in v1.20201204.1

func (w *Worker) Execute(ctx context.Context, workItem interface{})

Execute invokes the action and recovers panics.

func (*Worker) HandleError added in v1.20201204.1

func (w *Worker) HandleError(err error)

HandleError sends a non-nil err to the error collector if one is provided.

func (*Worker) NotifyStarted added in v1.20201204.1

func (w *Worker) NotifyStarted() <-chan struct{}

NotifyStarted returns the underlying latch signal.

func (*Worker) NotifyStopped added in v1.20201204.1

func (w *Worker) NotifyStopped() <-chan struct{}

NotifyStopped returns the underlying latch signal.

func (*Worker) Start added in v1.20201204.1

func (w *Worker) Start() error

Start starts the worker with a given context.

func (*Worker) Stop added in v1.20201204.1

func (w *Worker) Stop() error

Stop stops the worker.

If there is an item left in the work channel it will be processed synchronously.

func (*Worker) StopContext added in v1.20210215.2

func (w *Worker) StopContext(ctx context.Context)

StopContext stops the worker in a given cancellation context.

type WorkerFinalizer added in v1.20201204.1

type WorkerFinalizer func(context.Context, *Worker) error

WorkerFinalizer is an action handler for a queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL