axe

package
v0.34.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2024 License: MIT Imports: 14 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Await added in v0.33.1

func Await(store *coal.Store, timeout time.Duration, fns ...func() error) (int, error)

Await will await all jobs created during the execution of the callback. It will wait for at least one job to complete and return the number of processed jobs. If a job fails or is cancelled its reasons is returned as an error. A timeout may be provided to stop after some time.

func AwaitJob added in v0.33.1

func AwaitJob(store *coal.Store, timeout time.Duration, job Job) (int, error)

AwaitJob will enqueue the specified job and wait until it and all other jobs queued during its execution are finished. It will return the number of processed jobs. A timeout may be provided to stop after some time.

func Cancel added in v0.21.0

func Cancel(ctx context.Context, store *coal.Store, job Job, reason string) error

Cancel will cancel the specified job with the provided reason. Only jobs in the "dequeued" state can be cancelled.

func Complete added in v0.21.0

func Complete(ctx context.Context, store *coal.Store, job Job) error

Complete will complete the specified job. Only jobs in the "dequeued" state can be completed.

func Dequeue added in v0.21.0

func Dequeue(ctx context.Context, store *coal.Store, job Job, timeout time.Duration) (bool, int, error)

Dequeue will dequeue the specified job. The provided timeout will be set to allow the job to be dequeued if the worker failed to set its state. Only jobs in the "enqueued", "dequeued" (passed timeout) or "failed" state are dequeued. It will return whether a job has been dequeued.

func Enqueue

func Enqueue(ctx context.Context, store *coal.Store, job Job, delay, isolation time.Duration) (bool, error)

Enqueue will enqueue the specified job with the provided delay and isolation. It will return whether a job has been enqueued. If the context carries a transaction it must be associated with the specified store.

The job is labeled, no job is queued if there is already a job with the same label in the enqueued, dequeued or failed state. If isolation is non-zero the same rules applies also to unlabeled jobs in addition to that finished jobs must be older than the specified duration.

func Extend added in v0.34.0

func Extend(ctx context.Context, store *coal.Store, job Job, timeout time.Duration) error

Extend will extend the specified job by the provided duration.

func Fail added in v0.21.0

func Fail(ctx context.Context, store *coal.Store, job Job, reason string, delay time.Duration) error

Fail will fail the specified job with the provided reason. It may delay the job if requested. Only jobs in the "dequeued" state can be failed.

func Update added in v0.31.0

func Update(ctx context.Context, store *coal.Store, job Job, status string, progress float64) error

Update will update the specified job and set the provided execution status and progress.

Types

type Base added in v0.28.0

type Base struct {
	// The ID of the document.
	DocID coal.ID

	// The label of the job.
	Label string
}

Base can be embedded in a struct to turn it into a job.

func B added in v0.28.0

func B(label string) Base

B is a shorthand to construct a base with a label.

func (*Base) GetAccessor added in v0.28.0

func (b *Base) GetAccessor(v interface{}) *stick.Accessor

GetAccessor implements the Model interface.

func (*Base) GetBase added in v0.28.0

func (b *Base) GetBase() *Base

GetBase implements the Job interface.

func (*Base) ID added in v0.28.0

func (b *Base) ID() coal.ID

ID will return the job ID.

type Blueprint added in v0.22.1

type Blueprint struct {
	// The job to be enqueued.
	Job Job

	// The job delay. If specified, the job will not be dequeued until the
	// specified duration has passed.
	Delay time.Duration

	// The job isolation. If specified, the job will only be enqueued if no job
	// has been executed in the specified duration.
	Isolation time.Duration
}

Blueprint describes a queueable job.

type Context added in v0.20.0

type Context struct {
	// The context that is cancelled when the task timeout has been reached.
	//
	// Values: opentracing.Span, *xo.Tracer
	context.Context

	// The executed job.
	Job Job

	// The current attempt to execute the job.
	//
	// Usage: Read Only
	Attempt int

	// The task that executes this job.
	//
	// Usage: Read Only
	Task *Task

	// The queue this job was dequeued from.
	//
	// Usage: Read Only
	Queue *Queue

	// The current tracer.
	//
	// Usage: Read Only
	Tracer *xo.Tracer
	// contains filtered or unexported fields
}

Context holds and stores contextual data.

func (*Context) Extend added in v0.34.0

func (c *Context) Extend(timeout, lifetime time.Duration) error

Extend will extend the timeout and lifetime of the job.

func (*Context) Update added in v0.31.0

func (c *Context) Update(status string, progress float64) error

Update will update the job and set the provided execution status and progress.

type Error

type Error struct {
	Reason string
	Retry  bool
}

Error is used to control retry a cancellation. These errors are expected and are not forwarded to the reporter.

func E

func E(reason string, retry bool) *Error

E is a shorthand to construct an error. If retry is true the job will be retried and if false it will be cancelled. These settings take precedence over the tasks max attempts setting.

func (*Error) Error

func (c *Error) Error() string

Error implements the error interface.

type Event added in v0.28.0

type Event struct {
	coal.ItemBase `json:"-" bson:"-"`

	// The time when the event was reported.
	Timestamp time.Time `json:"timestamp"`

	// The new state of the job.
	State State `json:"state"`

	// The reason when failed or cancelled.
	Reason string `json:"reason"`
}

Event is logged during a job execution.

type Job

type Job interface {
	ID() coal.ID
	Validate() error
	GetBase() *Base
	GetAccessor(interface{}) *stick.Accessor
}

Job is a structure used to encode a job.

type Meta added in v0.28.0

type Meta struct {
	// The jobs type.
	Type reflect.Type

	// The jobs name.
	Name string

	// The used transfer coding.
	Coding stick.Coding

	// The accessor.
	Accessor *stick.Accessor
}

Meta contains meta information about a job.

func GetMeta added in v0.28.0

func GetMeta(job Job) *Meta

GetMeta will parse the jobs "axe" tag on the embedded axe.Base struct and return the meta object.

func (*Meta) Make added in v0.28.0

func (m *Meta) Make() Job

Make returns a pointer to a new zero initialized job e.g. *Increment.

type Model

type Model struct {
	coal.Base `json:"-" bson:",inline" coal:"jobs"`

	// The job name.
	Name string `json:"name"`

	// The job label.
	Label string `json:"label"`

	// The encoded job data.
	Data stick.Map `json:"data"`

	// The current state of the job.
	State State `json:"state"`

	// The time when the job was created.
	Created time.Time `json:"created-at" bson:"created_at"`

	// The time when the job is available for execution.
	Available time.Time `json:"available-at" bson:"available_at"`

	// The time when the last or current execution started.
	Started *time.Time `json:"started-at" bson:"started_at"`

	// The time when the last execution ended (completed, failed or cancelled).
	Ended *time.Time `json:"ended-at" bson:"ended_at"`

	// The time when the job was successfully executed (completed or cancelled).
	Finished *time.Time `json:"finished-at" bson:"finished_at"`

	// Attempts is incremented with each execution attempt.
	Attempts int `json:"attempts"`

	// The current execution status.
	Status string `json:":status"`

	// The execution progress.
	Progress float64 `json:"progress"`

	// The individual job events.
	Events []Event `json:"events"`
}

Model stores an executable job.

func (*Model) Validate added in v0.28.0

func (m *Model) Validate() error

Validate will validate the model.

type Options added in v0.28.0

type Options struct {
	// The store used to manage jobs.
	Store *coal.Store

	// The maximum amount of lag that should be applied to every dequeue attempt.
	//
	// By default, multiple workers compete with each other when getting jobs
	// from the same queue. An artificial lag limits multiple simultaneous
	// dequeue attempts and allows the worker with the smallest lag to dequeue
	// the job and inform the other workers to limit parallel dequeue attempts.
	//
	// Default: 100ms.
	MaxLag time.Duration

	// The duration after which a job may be returned again from the board.
	//
	// It may take some time until the board is updated with the new state of
	// the job after a dequeue. The block period prevents another worker from
	// simultaneously trying to dequeue the job. If the initial worker failed to
	// dequeue the job it will be available again after the defined period.
	//
	// Default: 10s.
	BlockPeriod time.Duration

	// The callback that is called with job errors.
	Reporter func(error)
}

Options defines queue options.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue manages job queueing.

func NewQueue

func NewQueue(options Options) *Queue

NewQueue creates and returns a new queue.

func (*Queue) Action added in v0.23.0

func (q *Queue) Action(methods []string, cb func(ctx *fire.Context) Blueprint) *fire.Action

Action is a factory to create an action that can be used to enqueue jobs.

func (*Queue) Add added in v0.21.0

func (q *Queue) Add(task *Task)

Add will add the specified task to the queue.

func (*Queue) Callback

func (q *Queue) Callback(matcher fire.Matcher, cb func(ctx *fire.Context) Blueprint) *fire.Callback

Callback is a factory to create callbacks that can be used to enqueue jobs during request processing.

func (*Queue) Close added in v0.21.0

func (q *Queue) Close()

Close will close the queue.

func (*Queue) Enqueue

func (q *Queue) Enqueue(ctx context.Context, job Job, delay, isolation time.Duration) (bool, error)

Enqueue will enqueue a job. If the context carries a transaction it must be associated with the store that is also used by the queue.

func (*Queue) Run added in v0.21.0

func (q *Queue) Run() chan struct{}

Run will start fetching jobs from the queue and execute them. It will return a channel that is closed once the queue has been synced and is available.

type State added in v0.28.0

type State string

State defines the states of a job.

const (
	Enqueued  State = "enqueued"
	Dequeued  State = "dequeued"
	Completed State = "completed"
	Failed    State = "failed"
	Cancelled State = "cancelled"
)

The available job states.

func (State) Valid added in v0.28.0

func (s State) Valid() bool

Valid returns whether the state is valid.

type Task

type Task struct {
	// The job this task should execute.
	Job Job

	// The callback that is called with jobs for execution. The handler may
	// return errors formatted with E to manually control the state of the job.
	Handler func(ctx *Context) error

	// The callback that is called once a job has been completed or cancelled.
	Notifier func(ctx *Context, cancelled bool, reason string) error

	// The number for spawned workers that dequeue and execute jobs in parallel.
	//
	// Default: 2.
	Workers int

	// The maximum attempts to complete a task. Zero means that the jobs is
	// retried forever. The error retry field will take precedence to this
	// setting and allow retry beyond the configured maximum.
	//
	// Default: 0
	MaxAttempts int

	// The rate at which a worker will request a job from the queue.
	//
	// Default: 100ms.
	Interval time.Duration

	// The minimal delay after a failed task is retried.
	//
	// Default: 1s.
	MinDelay time.Duration

	// The maximal delay after a failed task is retried.
	//
	// Default: 10m.
	MaxDelay time.Duration

	// The exponential increase of the delay after individual attempts.
	//
	// Default: 2.
	DelayFactor float64

	// Time after which the context of a job is cancelled and the execution
	// should be stopped. Should be several minutes less than timeout to prevent
	// race conditions.
	//
	// Default: 5m.
	Lifetime time.Duration

	// The time after which a task can be dequeued again in case the worker was
	// unable to set its state.
	//
	// Default: 10m.
	Timeout time.Duration

	// Set to let the system enqueue a job periodically every given interval.
	//
	// Default: 0.
	Periodicity time.Duration

	// The blueprint of the job that is periodically enqueued.
	//
	// Default: Blueprint{Name: Task.Name}.
	PeriodicJob Blueprint
}

Task describes work that is managed using a job queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL