Back to

Package entroq

Latest Go to latest

The latest major version is .

Published: Mar 6, 2020 | License: Apache-2.0 | Module:


Copyright 2019 Chris Monson <>

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Package entroq contains the main task queue client and data definitions. The client relies on a backend to implement the actual transactional functionality, the interface for which is also defined here.



const (
	DefaultClaimPollTime = 30 * time.Second
	DefaultClaimDuration = 30 * time.Second

func DefaultErrQMap

func DefaultErrQMap(inbox string) string

DefaultErrQMap appends "/err" to the inbox, and is the default behavior if no overriding error queue mapping options are provided.

func IsCanceled

func IsCanceled(err error) bool

IsCanceled indicates whether the error is a canceled error.

func IsTimeout

func IsTimeout(err error) bool

IsTimeout indicates whether the error is a timeout error.

func NotifyModified

func NotifyModified(n Notifier, inserted, changed []*Task)

NotifyModified takes inserted and changed tasks and notifies once per unique queue/ID pair.

func ProcessTime

func ProcessTime() time.Time

ProcessTime returns the time the calling process thinks it is, in UTC.

func QueuesFromStats

func QueuesFromStats(stats map[string]*QueueStat, err error) (map[string]int, error)

QueuesFromStats can be used for converting the new QueueStats to the old Queues output, making it easier on backend implementers to just define one function (similar to how WaitTryClaim or PollTryClaim can make implementing Claim in terms of TryClaim easier).

type Backend

type Backend interface {
	// Queues returns a mapping from all known queues to their task counts.
	Queues(ctx context.Context, qq *QueuesQuery) (map[string]int, error)

	// QueueStats returns statistics for the specified queues query. Richer
	// than just calling Queues, as it can return more than just the size.
	QueueStats(ctx context.Context, qq *QueuesQuery) (map[string]*QueueStat, error)

	// Tasks retrieves all tasks from the given queue. If claimantID is
	// specified (non-zero), limits those tasks to those that are either
	// expired or belong to the given claimant. Otherwise returns them all.
	Tasks(ctx context.Context, tq *TasksQuery) ([]*Task, error)

	// TryClaim attempts to claim a task from the "top" (or close to it) of the
	// given queue. When claimed, a task is held for the duration specified
	// from the time of the claim. If claiming until a specific wall-clock time
	// is desired, the task should be immediately modified after it is claimed
	// to set At to a specific time. Returns a nil task and a nil error if
	// there is nothing to claim. Will fail with a DependencyError is a
	// specific task ID is requested but not present.
	TryClaim(ctx context.Context, cq *ClaimQuery) (*Task, error)

	// Claim is a blocking version of TryClaim, attempting to claim a task
	// from a queue, and blocking until canceled or a task becomes available.
	// Will fail with a DependencyError is a specific task ID is requested but
	// not present. Never returns both a nil task and a nil error
	// simultaneously: a failure to claim a task is an error (potentially just
	// a timeout).
	Claim(ctx context.Context, cq *ClaimQuery) (*Task, error)

	// Modify attempts to atomically modify the task store, and only succeeds
	// if all dependencies are available and all mutations are either expired
	// or already owned by this claimant. The Modification type provides useful
	// functions for determining whether dependencies are good or bad. This
	// function is intended to return a DependencyError if the transaction could
	// not proceed because dependencies were missing or already claimed (and
	// not expired) by another claimant.
	Modify(ctx context.Context, mod *Modification) (inserted []*Task, changed []*Task, err error)

	// Time returns the time as the backend understands it, in UTC.
	Time(ctx context.Context) (time.Time, error)

	// Close closes any underlying connections. The backend is expected to take
	// ownership of all such connections, so this cleans them up.
	Close() error

Backend describes all of the functions that any backend has to implement to be used as the storage for task queues.

type BackendClaimFunc

type BackendClaimFunc func(ctx context.Context, eq *ClaimQuery) (*Task, error)

BackendClaimFunc is a function that can make claims based on a ClaimQuery. It is a convenience type for backends to use.

type BackendOpener

type BackendOpener func(ctx context.Context) (Backend, error)

BackendOpener is a function that can open a connection to a backend. Creating a client with a specific backend is accomplished by passing one of these functions into New.

type ChangeArg

type ChangeArg func(m *Modification, t *Task)

ChangeArg is an argument to the Changing function used to create arguments for Modify, e.g., to change the queue and set the expiry time of a task to 5 minutes in the future, you would do something like this:

      QueueTo("a new queue"),
	     ArrivalTimeBy(5 * time.Minute)))

func ArrivalTimeBy

func ArrivalTimeBy(d time.Duration) ChangeArg

ArrivalTimeBy sets the arrival time to a time in the future, by the given duration.

func ArrivalTimeTo

func ArrivalTimeTo(at time.Time) ChangeArg

ArrivalTimeTo sets a specific arrival time on a changed task in the Changing function.

func QueueTo

func QueueTo(q string) ChangeArg

QueueTo creates an option to modify a task's queue in the Changing function.

func ValueTo

func ValueTo(v []byte) ChangeArg

ValueTo sets the changing task's value to the given byte slice.

type ClaimOpt

type ClaimOpt func(*ClaimQuery)

ClaimOpt modifies limits on a task claim.

func ClaimAs

func ClaimAs(id uuid.UUID) ClaimOpt

ClaimAs sets the claimant ID for a claim operation. When not set, uses the internal default for this client.

func ClaimFor

func ClaimFor(duration time.Duration) ClaimOpt

ClaimFor sets the duration of a successful claim (the amount of time from now when it expires).

func ClaimPollTime

func ClaimPollTime(d time.Duration) ClaimOpt

ClaimPollTime sets the polling time for a claim. Set to DefaultClaimPollTime if left at 0.

func From

func From(qs ...string) ClaimOpt

From sets the queue(s) for a claim.

type ClaimQuery

type ClaimQuery struct {
	Queues   []string      // Queues to attempt to claim from. Only one wins.
	Claimant uuid.UUID     // The ID of the process trying to make the claim.
	Duration time.Duration // How long the task should be claimed for if successful.
	PollTime time.Duration // Length of time between (possibly interruptible) sleep and polling.

ClaimQuery contains information necessary to attempt to make a claim on a task in a specific queue.

type DependencyError

type DependencyError struct {
	Inserts []*TaskID
	Depends []*TaskID
	Deletes []*TaskID
	Changes []*TaskID

	Claims []*TaskID

	Message string

DependencyError is returned when a dependency is missing when modifying the task store.

func AsDependency

func AsDependency(err error) (DependencyError, bool)

AsDependency indicates whether the given error is a dependency error.

func DependencyErrorf

func DependencyErrorf(msg string, vals ...interface{}) DependencyError

DependencyErrorf creates a new dependency error with the given message.

func (DependencyError) Copy

func (m DependencyError) Copy() DependencyError

Copy produces a new deep copy of this error type.

func (DependencyError) Error

func (m DependencyError) Error() string

Error produces a helpful error string indicating what was missing.

func (DependencyError) HasClaims

func (m DependencyError) HasClaims() bool

HasClaims indicates whether any of the tasks were claimed by another claimant and unexpired.

func (DependencyError) HasCollisions

func (m DependencyError) HasCollisions() bool

HasCollisions indicates whether any of the inserted tasks collided with existing IDs.

func (DependencyError) HasMissing

func (m DependencyError) HasMissing() bool

HasMissing indicates whether there was anything missing in this error.

type DependencyHandler

type DependencyHandler func(err DependencyError) error

DependencyHandler is called (if set) when a worker run finishes with a dependency error. If it returns a non-nil error, that converts into a fatal error.

type EntroQ

type EntroQ struct {
	// contains filtered or unexported fields

EntroQ is a client interface for accessing the task queue.

func New

func New(ctx context.Context, opener BackendOpener, opts ...Option) (*EntroQ, error)

New creates a new task client with the given backend implementation.

cli := New(ctx, backend)

func (*EntroQ) Claim

func (c *EntroQ) Claim(ctx context.Context, opts ...ClaimOpt) (*Task, error)

Claim attempts to get the next unclaimed task from the given queues. It blocks until one becomes available or until the context is done. When it succeeds, it returns a task with the claimant set to the default, or to the value given in options, and an arrival time computed from the duration. The default duration if none is given is DefaultClaimDuration.

func (*EntroQ) Close

func (c *EntroQ) Close() error

Close closes the underlying backend.

func (*EntroQ) DoWithRenew

func (c *EntroQ) DoWithRenew(ctx context.Context, task *Task, lease time.Duration, f func(context.Context) error) (*Task, error)

DoWithRenew runs the provided function while keeping the given task lease renewed.

func (*EntroQ) DoWithRenewAll

func (c *EntroQ) DoWithRenewAll(ctx context.Context, tasks []*Task, lease time.Duration, f func(context.Context) error) ([]*Task, error)

DoWithRenewAll runs the provided function while keeping all given tasks leases renewed.

func (*EntroQ) ID

func (c *EntroQ) ID() uuid.UUID

ID returns the default claimant ID of this client. Used in "bare" calls, like Modify, Claim, etc. To change the ID per call (usually not needed, and can be dangerous), use the "As" calls, e.g., ModifyAs.

func (*EntroQ) Modify

func (c *EntroQ) Modify(ctx context.Context, modArgs ...ModifyArg) (inserted []*Task, changed []*Task, err error)

Modify allows a batch modification operation to be done, gated on the existence of all task IDs and versions specified. Deletions, Updates, and Dependencies must be present. The transaction all fails or all succeeds.

Returns all inserted task IDs, and an error if it could not proceed. If the error was due to missing dependencies, a *DependencyError is returned, which can be checked for by calling AsDependency(err).

func (*EntroQ) NewWorker

func (c *EntroQ) NewWorker(qs ...string) *Worker

NewWorker is a convenience method on an EntroQ client to create a worker.

func (*EntroQ) QueueStats

func (c *EntroQ) QueueStats(ctx context.Context, opts ...QueuesOpt) (map[string]*QueueStat, error)

QueueStats returns a mapping from queue names to task stats.

func (*EntroQ) Queues

func (c *EntroQ) Queues(ctx context.Context, opts ...QueuesOpt) (map[string]int, error)

Queues returns a mapping from all queue names to their task counts.

func (*EntroQ) QueuesEmpty

func (c *EntroQ) QueuesEmpty(ctx context.Context, opts ...QueuesOpt) (bool, error)

QueuesEmpty indicates whether the specified task queues are all empty. If no options are specified, returns an error.

func (*EntroQ) RenewAllFor

func (c *EntroQ) RenewAllFor(ctx context.Context, tasks []*Task, duration time.Duration) ([]*Task, error)

RenewAllFor attempts to renew all given tasks' leases (update arrival times) for the given duration. Returns the new tasks.

func (*EntroQ) RenewFor

func (c *EntroQ) RenewFor(ctx context.Context, task *Task, duration time.Duration) (*Task, error)

RenewFor attempts to renew the given task's lease (update arrival time) for the given duration. Returns the new task.

func (*EntroQ) Tasks

func (c *EntroQ) Tasks(ctx context.Context, queue string, opts ...TasksOpt) ([]*Task, error)

Tasks returns a slice of all tasks in the given queue.

func (*EntroQ) Time

func (c *EntroQ) Time(ctx context.Context) (time.Time, error)

Time gets the time as the backend understands it, in UTC. Default is just time.Now().UTC().

func (*EntroQ) TryClaim

func (c *EntroQ) TryClaim(ctx context.Context, opts ...ClaimOpt) (*Task, error)

TryClaim attempts one time to claim a task from the given queues. If there are no tasks, it returns a nil error *and* a nil task. This allows the caller to decide whether to retry. It can fail if certain (optional) dependency tasks are not present. This can be used, for example, to ensure that configuration tasks haven't changed.

func (*EntroQ) WaitQueuesEmpty

func (c *EntroQ) WaitQueuesEmpty(ctx context.Context, opts ...QueuesOpt) error

WaitQueuesEmpty does a poll-and-wait strategy to block until the queue query returns empty.

type ErrQMap

type ErrQMap func(inbox string) string

ErrQMap is a function that maps from an inbox name to its "move on error" error box name. If no mapping is found, a suitable default should be returned.

type ErrorTaskValue

type ErrorTaskValue struct {
	Task *Task  `json:"task"`
	Err  string `json:"err"`

ErrorTaskValue holds a task that is moved to an error queue, with an error message attached.

type InsertArg

type InsertArg func(*Modification, *TaskData)

InsertArg is an argument to task insertion.

func WithArrivalTime

func WithArrivalTime(at time.Time) InsertArg

WithArrivalTime changes the arrival time to a fixed moment during task insertion.

func WithArrivalTimeIn

func WithArrivalTimeIn(duration time.Duration) InsertArg

WithArrivalTimeIn computes the arrival time based on the duration from now, e.g.,

  InsertingInto("my queue",
    WithTimeIn(2 * time.Minute)))

func WithID

func WithID(id uuid.UUID) InsertArg

WithID sets the task's ID for insertion. This is not normally needed, as the backend will assign a new, unique ID for this task if none is specified. There are cases where assigning an explicit insertion ID (always being careful that it is unique) can be useful, however.

For example, a not uncommon need is for a worker to do the following:

- Claim a task,
- Make database entries corresponding to downstream work,
- Insert tasks for the downstream work and delete claimed task.

If the database entries need to reference the tasks that have not yet been inserted (e.g., if they need to be used to get at the status of a task), it is not safe to simply update the database after insertion, as this introduces a race condition. If, for example, the following strategy is employed, then the task IDs may never make it into the database:

- Claim a task,
- Make database entries
- Insert tasks and delete claimed task
- Update database with new task IDs

In this event, it is entirely possible to successfully process the incoming task and create the outgoing tasks, then lose network connectivity and fail to add those IDs to the databse. Now it is no longer possible to update the database appropriately: the task information is simply lost.

Instead, it is safe to do the following:

- Claim a task
- Make database entries, including with to-be-created task IDs
- Insert tasks with those IDs and delete claimed task.

This avoids the potential data loss condition entirely.

There are other workarounds for this situation, like using a two-step creation process and taking advantage of the ability to move tasks between queues without disturbing their ID (only their version), but this is not uncommon enough to warrant requiring the extra worker logic just to get a task ID into the database.

func WithSkipColliding

func WithSkipColliding(s bool) InsertArg

WithSkipColliding sets the insert argument to allow itself to be removed if the only error encountered is an ID collision. This can help when it is desired to insert multiple tasks, but a previous subset was already inserted with similar IDs. Sometimes you want to specify a superset to "catch what we missed".

func WithValue

func WithValue(value []byte) InsertArg

WithValue sets the task's byte slice value during insertion.

  InsertingInto("my queue",
    WithValue([]byte("hi there"))))

type Modification

type Modification struct {
	Claimant uuid.UUID

	Inserts []*TaskData
	Changes []*Task
	Deletes []*TaskID
	Depends []*TaskID
	// contains filtered or unexported fields

Modification contains all of the information for a single batch modification in the task store.

func NewModification

func NewModification(claimant uuid.UUID, modArgs ...ModifyArg) *Modification

NewModification creates a new modification: insertions, deletions, changes, and dependencies all together. When creating this for the purpose of passing to WithModification, set the claimant to uuid.Nil (it is ignored in that case).

func (*Modification) AllDependencies

func (m *Modification) AllDependencies() (map[uuid.UUID]int32, error)

AllDependencies returns a dependency map from ID to version for tasks that must exist and match version numbers. It returns an error if there are duplicates. Changes, Deletions, Dependencies and Insertions with IDs must be disjoint sets.

When using this to query backend storage for the presence of tasks, note that it is safe to ignore the version if you use the DependencyError method to determine whether a transaction can proceed. That checks versions appropriate for all different types of modification.

func (*Modification) DependencyError

func (m *Modification) DependencyError(found map[uuid.UUID]*Task) error

DependencyError returns a DependencyError if there are problems with the dependencies found in the backend, or nil if everything is fine. Problems include missing or claimed dependencies, both of which will block a modification.

type ModifyArg

type ModifyArg func(m *Modification)

ModifyArg is an argument to the Modify function, which does batch modifications to the task store.

func Changing

func Changing(task *Task, changeArgs ...ChangeArg) ModifyArg

Changing adds a task update to a Modify call, e.g., to modify the queue a task belongs in:

cli.Modify(ctx, Changing(myTask, QueueTo("a different queue name")))

func Deleting

func Deleting(id uuid.UUID, version int32) ModifyArg

Deleting adds a deletion to a Modify call, e.g.,

cli.Modify(ctx, Deleting(id, version))

func DependingOn

func DependingOn(id uuid.UUID, version int32) ModifyArg

DependingOn adds a dependency to a Modify call, e.g., to insert a task into "my queue" with data "hey", but only succeeding if a task with anotherID and someVersion exists:

  InsertingInto("my queue",
    DependingOn(anotherID, someVersion))

func Inserting

func Inserting(tds ...*TaskData) ModifyArg

Inserting creates an insert modification from TaskData:

		Queue: "myqueue",
		At:    time.Now.Add(1 * time.Minute),
		Value: []byte("hi there"),

Or, better still,

	    WithArrivalTimeIn(1 * time.Minute),
	    WithValue([]byte("hi there"))))

func InsertingInto

func InsertingInto(q string, insertArgs ...InsertArg) ModifyArg

InsertingInto creates an insert modification. Use like this:

cli.Modify(InsertingInto("my queue name", WithValue([]byte("hi there"))))

func ModifyAs

func ModifyAs(id uuid.UUID) ModifyArg

ModifyAs sets the claimant ID for a particular modify call. Usually not needed, can be dangerous unless used with extreme care. The client default is used if this option is not provided.

func WithModification

func WithModification(src *Modification) ModifyArg

WithModification returns a ModifyArg that merges the given Modification with whatever it is so far. Ignores Claimant field, and simply appends to all others.

type MoveTaskError

type MoveTaskError struct {
	Err error

MoveTaskError causes a task to be completely serialized, wrapped in a larger JSON object with error information, and moved to a specified queue. This can be useful when non-fatal task-specific errors happen in a worker and we want to stash them somewhere instead of just causing the worker to crash, but allows us to handle that as an early error return.

func AsMoveTaskError

func AsMoveTaskError(err error) (*MoveTaskError, bool)

AsMoveTaskError returns the underlying error and true iff the underlying error indicates a worker task should be moved to the error queue instead o causing the worker to exit.

func NewMoveTaskError

func NewMoveTaskError(err error) *MoveTaskError

NewMoveTaskError creates a new MoveTaskError from the given error.

func (*MoveTaskError) Error

func (e *MoveTaskError) Error() string

Error produces an error string.

type Notifier

type Notifier interface {
	// Notify signals an event on the key. Wakes up one waiter, if any, or is
	// dropped if no waiters are waiting.
	Notify(key string)

Notifier can be notified on a given key (e.g., queue name);

type NotifyWaiter

type NotifyWaiter interface {

NotifyWaiter can wait for and notify events.

type Option

type Option func(*EntroQ)

Option is an option that modifies how EntroQ clients are created.

func WithClaimantID

func WithClaimantID(id uuid.UUID) Option

WithClaimantID sets the default claimaint ID for this client.

type QueueStat

type QueueStat struct {
	Name      string `json:"name"`      // The queue name.
	Size      int    `json:"size"`      // The total number of tasks.
	Claimed   int    `json:"claimed"`   // The number of currently claimed tasks.
	Available int    `json:"available"` // The number of available tasks.

	MaxClaims int `json:"maxClaims"` // The maximum number of claims for a task in the queue.

QueueStat holds high-level information about a queue. Note that available + claimed may not add up to size. This is because a task can be unavailable (AT in the future) without being claimed by anyone.

type QueuesOpt

type QueuesOpt func(*QueuesQuery)

QueuesOpt modifies how queue requests are made.

func LimitQueues

func LimitQueues(limit int) QueuesOpt

LimitQueues sets the limit on the number of queues that are returned.

func MatchExact

func MatchExact(matches ...string) QueuesOpt

MatchExact adds an allowable exact match for a queue listing.

func MatchPrefix

func MatchPrefix(prefixes ...string) QueuesOpt

MatchPrefix adds allowable prefix matches for a queue listing.

type QueuesQuery

type QueuesQuery struct {
	// MatchPrefix specifies allowable prefix matches. If empty, limitations
	// are not set based on prefix matching. All prefix match conditions are ORed.
	// If both this and MatchExact are empty or nil, no limitations are set on
	// queue name: all will be returned.
	MatchPrefix []string

	// MatchExact specifies allowable exact matches. If empty, limitations are
	// not set on exact queue names.
	MatchExact []string

	// Limit specifies an upper bound on the number of queue names returned.
	Limit int

QueuesQuery modifies a queue listing request.

type Task

type Task struct {
	Queue string `json:"queue"`

	ID      uuid.UUID `json:"id"`
	Version int32     `json:"version"`

	At       time.Time `json:"at"`
	Claimant uuid.UUID `json:"claimant"`
	Claims   int32     `json:"claims"`
	Value    []byte    `json:"value"`

	Created  time.Time `json:"created"`
	Modified time.Time `json:"modified"`

Task represents a unit of work, with a byte slice value payload. Note that Claims is the number of times a task has successfully been claimed. This is different than the version number, which increments for every modification, not just claims.

func PollTryClaim

func PollTryClaim(ctx context.Context, eq *ClaimQuery, tc BackendClaimFunc) (*Task, error)

PollTryClaim runs a loop in which the TryClaim function is called between sleeps with exponential backoff (up to a point). Backend implementations may choose to use this as their Claim implementation.

func WaitTryClaim

func WaitTryClaim(ctx context.Context, eq *ClaimQuery, tc BackendClaimFunc, w Waiter) (*Task, error)

WaitTryClaim runs a loop in which the TryClaim function is called, then if no tasks are available, the given wait function is used to attempt to wait for a task to become available on the queue.

The wait function should exit (more or less) immediately if the context is canceled, and should return a nil error if the wait was successful (something became available).

func (*Task) AsChange

func (t *Task) AsChange(args ...ChangeArg) ModifyArg

AsChange returns a ModifyArg that can be used in the Modify function, e.g.,

cli.Modify(ctx, task1.AsChange(ArrivalTimeBy(2 * time.Minute)))

The above is shorthand for

cli.Modify(ctx, Changing(task1, ArrivalTimeBy(2 * time.Minute)))

func (*Task) AsDeletion

func (t *Task) AsDeletion() ModifyArg

AsDeletion returns a ModifyArg that can be used in the Modify function, e.g.,

cli.Modify(ctx, task1.AsDeletion())

The above would cause the given task to be deleted, if it can be. It is shorthand for

cli.Modify(ctx, Deleting(task1.ID(), task1.Version()))

func (*Task) AsDependency

func (t *Task) AsDependency() ModifyArg

AsDependency returns a ModifyArg that can be used to create a Modify dependency, e.g.,

cli.Modify(ctx, task.AsDependency())

That is shorthand for

cli.Modify(ctx, DependingOn(task.ID(), task.Version()))

func (*Task) Copy

func (t *Task) Copy() *Task

Copy copies this task's data and everything.

func (*Task) CopyOmitValue

func (t *Task) CopyOmitValue() *Task

CopyOmitValue copies this task but leaves the value blank.

func (*Task) Data

func (t *Task) Data() *TaskData

Data returns the data for this task.

func (*Task) IDVersion

func (t *Task) IDVersion() *TaskID

ID returns a Task ID from this task.

func (*Task) String

func (t *Task) String() string

String returns a useful representation of this task.

type TaskData

type TaskData struct {
	Queue string
	At    time.Time
	Value []byte

	// ID is an optional task ID to be used for task insertion.
	// Default (uuid.Nil) causes the backend to assign one, and that is
	// sufficient for many cases. If you desire to make a database entry that
	// *references* a task, however, in that case it can make sense to specify
	// an explicit task ID for insertion. This allows a common workflow cycle
	// 	consume task -> db update -> insert tasks
	// to be done safely, where the database update needs to refer to
	// to-be-inserted tasks.
	ID uuid.UUID
	// contains filtered or unexported fields

TaskData contains just the data, not the identifier or metadata. Used for insertions.

func (*TaskData) String

func (t *TaskData) String() string

String returns a string representation of the task data, excluding the value.

type TaskID

type TaskID struct {
	ID      uuid.UUID
	Version int32

TaskID contains the identifying parts of a task. If IDs don't match (identifier and version together), then operations fail on those tasks.

func (TaskID) AsDeletion

func (t TaskID) AsDeletion() ModifyArg

AsDeletion produces an appropriate ModifyArg to delete the task with this ID.

func (TaskID) AsDependency

func (t TaskID) AsDependency() ModifyArg

AsDependency produces an appropriate ModifyArg to depend on this task ID.

func (TaskID) String

func (t TaskID) String() string

String produces the id:version string representation.

type TasksOpt

type TasksOpt func(*EntroQ, *TasksQuery)

TasksOpt is an option that can be passed into Tasks to control what it returns.

func LimitClaimant

func LimitClaimant(id uuid.UUID) TasksOpt

LimitClaimant only returns tasks with the given claimant, or expired tasks.

func LimitSelf

func LimitSelf() TasksOpt

LimitSelf only returns self-claimed tasks or expired tasks.

func LimitTasks

func LimitTasks(limit int) TasksOpt

LimitTasks sets the limit on the number of tasks to return. A value <= 0 indicates "no limit".

func OmitValues

func OmitValues() TasksOpt

OmitValues tells a tasks query to only return metadata, not values.

func WithTaskID

func WithTaskID(ids ...uuid.UUID) TasksOpt

WithTaskID adds a task ID to the set of IDs that can be returned in a task query. The default is "all that match other specs" if no IDs are specified. Note that versions are not part of the ID.

type TasksQuery

type TasksQuery struct {
	Queue    string
	Claimant uuid.UUID
	Limit    int
	IDs      []uuid.UUID

	// OmitValues specifies that only metadata should be returned.
	// Backends are not required to honor this flag, though any
	// service receiving it in a request should ensure that values
	// are not passed over the wire.
	OmitValues bool

TasksQuery holds information for a tasks query.

type Waiter

type Waiter interface {
	// Wait waits for an event on the given set of keys, calling cond after
	// poll intervals until one of them is notified, cond returns true, or the
	// context is canceled.
	// If cond is nil, this function returns when the channel is notified,
	// the poll interval is exceeded, or the context is canceled. Only the last
	// event causes a non-nil error.
	// If poll is 0, it can never be exceeded.
	// A common use is to use poll=0 and cond=nil, causing this to simply wait
	// for a notification.
	Wait(ctx context.Context, keys []string, poll time.Duration, cond func() bool) error

Waiter can wait for an event on a given key (e.g., queue name).

type Work

type Work func(ctx context.Context, task *Task) ([]ModifyArg, error)

Work is a function that is called by Run. It does work for one task, then returns any necessary modifications.

If this function returns a MoveTaskError, the original task is moved into a queue specified by calling ErrQMap on the original queue name. This is useful for keeping track of failed tasks by moving them out of the way instead of deleting them or allowing them to be picked up again.

type Worker

type Worker struct {
	// Qs contains the queues to work on.
	Qs []string

	// ErrQMap maps an inbox to the queue tasks are moved to if a MoveTaskError
	// is returned from a worker's run function.
	ErrQMap ErrQMap

	// OnDepErr can hold a function to be called when a dependency error is
	// encountered. if it returns a non-nil error, it will become fatal.
	OnDepErr DependencyHandler
	// contains filtered or unexported fields

Worker creates an iterator-like protocol for processing tasks in a queue, one at a time, in a loop. Each worker should only be accessed from a single goroutine. If multiple goroutines are desired, they should each use their own worker instance.


w := eqClient.NewWorker("queue_name")
err := w.Run(ctx, func(ctx context.Context, task *Task) ([]ModifyArg, error) {
	// Do stuff with the task.
	// It's safe to mark it for deletion, too. It is renewed in the background.
	// If renewal changed its version, that is rewritten before modification.
	return []ModifyArg{task.AsDeletion()}, nil
// Handle the error, which is nil if the context was canceled (but not if
// it timed out).

func NewWorker

func NewWorker(eq *EntroQ, qs ...string) *Worker

NewWorker creates a new worker that makes it easy to claim and operate on tasks in an endless loop.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context, f Work) (err error)

Run attempts to run the given function once per each claimed task, in a loop, until the context is canceled or an unrecoverable error is encountered. The function can return modifications that should be done after it exits, and version numbers for claim renewals will be automatically updated.

func (*Worker) WithOpts

func (w *Worker) WithOpts(opts ...WorkerOption) *Worker

type WorkerOption

type WorkerOption func(*Worker)

WorkerOption can be passed to AnalyticWorker to modify the worker

func WithDependencyHandler

func WithDependencyHandler(f DependencyHandler) WorkerOption

WithDependencyHandler sets a function to be called when a worker encounters a dependency error. If this function returns a non-nil error, the worker will exit.

Note that workers always exit on non-dependency errors, but usually treat dependency errors as things that can be retried. Specifying a handler for dependency errors allows different behavior as needed.

One possible use case for a dependency error handler is to reload a configuration task for the next round: if the task is depended on, but has been changed, the task can be retried, but configuration should also be reloaded, which could be done in a handler.

func WithErrQMap

func WithErrQMap(f ErrQMap) WorkerOption

WithErrQMap sets a function that maps from inbox queue names to error queue names. Defaults to DefaultErrQMap.

func WithLease

func WithLease(d time.Duration) WorkerOption

WithLease sets the frequency of task renewal. Tasks will be claimed for an amount of time slightly longer than this so that they have a chance of being renewed before expiring.

Package Files

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier