Documentation
¶
Overview ¶
Copyright 2019 Chris Monson <shiblon@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Package entroq contains the main task queue client and data definitions. The client relies on a backend to implement the actual transactional functionality, the interface for which is also defined here.
Index ¶
- Constants
- func DefaultErrQMap(inbox string) string
- func IsCanceled(err error) bool
- func IsTimeout(err error) bool
- func NotifyModified(n Notifier, inserted, changed []*Task)
- func ProcessTime() time.Time
- func QueuesFromStats(stats map[string]*QueueStat, err error) (map[string]int, error)
- type Backend
- type BackendClaimFunc
- type BackendOpener
- type ChangeArg
- type ClaimOpt
- type ClaimQuery
- type DependencyError
- type DependencyHandler
- type EntroQ
- func (c *EntroQ) Claim(ctx context.Context, opts ...ClaimOpt) (*Task, error)
- func (c *EntroQ) Close() error
- func (c *EntroQ) DoWithRenew(ctx context.Context, task *Task, lease time.Duration, ...) (*Task, error)
- func (c *EntroQ) DoWithRenewAll(ctx context.Context, tasks []*Task, lease time.Duration, ...) ([]*Task, error)
- func (c *EntroQ) ID() uuid.UUID
- func (c *EntroQ) Modify(ctx context.Context, modArgs ...ModifyArg) (inserted []*Task, changed []*Task, err error)
- func (c *EntroQ) NewWorker(qs ...string) *Worker
- func (c *EntroQ) QueueStats(ctx context.Context, opts ...QueuesOpt) (map[string]*QueueStat, error)
- func (c *EntroQ) Queues(ctx context.Context, opts ...QueuesOpt) (map[string]int, error)
- func (c *EntroQ) QueuesEmpty(ctx context.Context, opts ...QueuesOpt) (bool, error)
- func (c *EntroQ) RenewAllFor(ctx context.Context, tasks []*Task, duration time.Duration) (result []*Task, err error)
- func (c *EntroQ) RenewFor(ctx context.Context, task *Task, duration time.Duration) (*Task, error)
- func (c *EntroQ) Tasks(ctx context.Context, queue string, opts ...TasksOpt) ([]*Task, error)
- func (c *EntroQ) Time(ctx context.Context) (time.Time, error)
- func (c *EntroQ) TryClaim(ctx context.Context, opts ...ClaimOpt) (*Task, error)
- func (c *EntroQ) WaitQueuesEmpty(ctx context.Context, opts ...QueuesOpt) error
- type ErrQMap
- type IDOption
- type InsertArg
- type Modification
- type ModifyArg
- func Changing(task *Task, changeArgs ...ChangeArg) ModifyArg
- func Deleting(id uuid.UUID, version int32, opts ...IDOption) ModifyArg
- func DependingOn(id uuid.UUID, version int32, opts ...IDOption) ModifyArg
- func Inserting(tds ...*TaskData) ModifyArg
- func InsertingInto(q string, insertArgs ...InsertArg) ModifyArg
- func ModifyAs(id uuid.UUID) ModifyArg
- func WithModification(src *Modification) ModifyArg
- type MoveTaskError
- type Notifier
- type NotifyWaiter
- type Option
- type QueueStat
- type QueuesOpt
- type QueuesQuery
- type RetryTaskError
- type Task
- func (t *Task) AsChange(args ...ChangeArg) ModifyArg
- func (t *Task) AsDeletion() ModifyArg
- func (t *Task) AsDependency() ModifyArg
- func (t *Task) Copy() *Task
- func (t *Task) CopyOmitValue() *Task
- func (t *Task) CopyWithValue(ok bool) *Task
- func (t *Task) Data() *TaskData
- func (t *Task) IDVersion() *TaskID
- func (t *Task) String() string
- type TaskData
- type TaskID
- type TasksOpt
- type TasksQuery
- type Waiter
- type Work
- type Worker
- type WorkerOption
Constants ¶
const ( DefaultClaimPollTime = 30 * time.Second DefaultClaimDuration = 30 * time.Second )
const DefaultRetryDelay = 30 * time.Second
DefaultRetryDelay is the amount by which to advance the arrival time when a worker task errors out as retryable.
Variables ¶
This section is empty.
Functions ¶
func DefaultErrQMap ¶
DefaultErrQMap appends "/err" to the inbox, and is the default behavior if no overriding error queue mapping options are provided.
func IsCanceled ¶
IsCanceled indicates whether the error is a canceled error.
func NotifyModified ¶
NotifyModified takes inserted and changed tasks and notifies once per unique queue/ID pair.
func ProcessTime ¶
ProcessTime returns the time the calling process thinks it is, in UTC.
func QueuesFromStats ¶
QueuesFromStats can be used for converting the new QueueStats to the old Queues output, making it easier on backend implementers to just define one function (similar to how WaitTryClaim or PollTryClaim can make implementing Claim in terms of TryClaim easier).
Types ¶
type Backend ¶
type Backend interface { // Queues returns a mapping from all known queues to their task counts. Queues(ctx context.Context, qq *QueuesQuery) (map[string]int, error) // QueueStats returns statistics for the specified queues query. Richer // than just calling Queues, as it can return more than just the size. QueueStats(ctx context.Context, qq *QueuesQuery) (map[string]*QueueStat, error) // Tasks retrieves all tasks from the given queue. If claimantID is // specified (non-zero), limits those tasks to those that are either // expired or belong to the given claimant. Otherwise returns them all. Tasks(ctx context.Context, tq *TasksQuery) ([]*Task, error) // TryClaim attempts to claim a task from the "top" (or close to it) of the // given queue. When claimed, a task is held for the duration specified // from the time of the claim. If claiming until a specific wall-clock time // is desired, the task should be immediately modified after it is claimed // to set At to a specific time. Returns a nil task and a nil error if // there is nothing to claim. Will fail with a DependencyError is a // specific task ID is requested but not present. TryClaim(ctx context.Context, cq *ClaimQuery) (*Task, error) // Claim is a blocking version of TryClaim, attempting to claim a task // from a queue, and blocking until canceled or a task becomes available. // // Will fail with a DependencyError is a specific task ID is requested but // not present. Never returns both a nil task and a nil error // simultaneously: a failure to claim a task is an error (potentially just // a timeout). Claim(ctx context.Context, cq *ClaimQuery) (*Task, error) // Modify attempts to atomically modify the task store, and only succeeds // if all dependencies are available and all mutations are either expired // or already owned by this claimant. The Modification type provides useful // functions for determining whether dependencies are good or bad. This // function is intended to return a DependencyError if the transaction could // not proceed because dependencies were missing or already claimed (and // not expired) by another claimant. Modify(ctx context.Context, mod *Modification) (inserted []*Task, changed []*Task, err error) // Time returns the time as the backend understands it, in UTC. Time(ctx context.Context) (time.Time, error) // Close closes any underlying connections. The backend is expected to take // ownership of all such connections, so this cleans them up. Close() error }
Backend describes all of the functions that any backend has to implement to be used as the storage for task queues.
type BackendClaimFunc ¶
type BackendClaimFunc func(ctx context.Context, eq *ClaimQuery) (*Task, error)
BackendClaimFunc is a function that can make claims based on a ClaimQuery. It is a convenience type for backends to use.
type BackendOpener ¶
BackendOpener is a function that can open a connection to a backend. Creating a client with a specific backend is accomplished by passing one of these functions into New.
type ChangeArg ¶
type ChangeArg func(m *Modification, t *Task)
ChangeArg is an argument to the Changing function used to create arguments for Modify, e.g., to change the queue and set the expiry time of a task to 5 minutes in the future, you would do something like this:
cli.Modify(ctx, Changing(myTask, QueueTo("a new queue"), ArrivalTimeBy(5 * time.Minute)))
func ArrivalTimeBy ¶
ArrivalTimeBy sets the arrival time to a time in the future, by the given duration.
func ArrivalTimeTo ¶
ArrivalTimeTo sets a specific arrival time on a changed task in the Changing function.
func AttemptToNext ¶
func AttemptToNext() ChangeArg
AttemptToNext sets the Attempt field in Task to the next value (increments it).
func ErrToZero ¶
func ErrToZero() ChangeArg
ErrToZero sets the Err field to its zero value (clears the error).
type ClaimOpt ¶
type ClaimOpt func(*ClaimQuery)
ClaimOpt modifies limits on a task claim.
func ClaimAs ¶
ClaimAs sets the claimant ID for a claim operation. When not set, uses the internal default for this client.
func ClaimFor ¶
ClaimFor sets the duration of a successful claim (the amount of time from now when it expires).
func ClaimPollTime ¶
ClaimPollTime sets the polling time for a claim. Set to DefaultClaimPollTime if left at 0.
type ClaimQuery ¶
type ClaimQuery struct { Queues []string // Queues to attempt to claim from. Only one wins. Claimant uuid.UUID // The ID of the process trying to make the claim. Duration time.Duration // How long the task should be claimed for if successful. PollTime time.Duration // Length of time between (possibly interruptible) sleep and polling. }
ClaimQuery contains information necessary to attempt to make a claim on a task in a specific queue.
type DependencyError ¶
type DependencyError struct { Inserts []*TaskID Depends []*TaskID Deletes []*TaskID Changes []*TaskID Claims []*TaskID Message string }
DependencyError is returned when a dependency is missing when modifying the task store.
func AsDependency ¶
func AsDependency(err error) (DependencyError, bool)
AsDependency indicates whether the given error is a dependency error.
func DependencyErrorf ¶
func DependencyErrorf(msg string, vals ...interface{}) DependencyError
DependencyErrorf creates a new dependency error with the given message.
func (DependencyError) Copy ¶
func (m DependencyError) Copy() DependencyError
Copy produces a new deep copy of this error type.
func (DependencyError) Error ¶
func (m DependencyError) Error() string
Error produces a helpful error string indicating what was missing.
func (DependencyError) HasClaims ¶
func (m DependencyError) HasClaims() bool
HasClaims indicates whether any of the tasks were claimed by another claimant and unexpired.
func (DependencyError) HasCollisions ¶
func (m DependencyError) HasCollisions() bool
HasCollisions indicates whether any of the inserted tasks collided with existing IDs.
func (DependencyError) HasMissing ¶
func (m DependencyError) HasMissing() bool
HasMissing indicates whether there was anything missing in this error.
func (DependencyError) OnlyClaims ¶
func (m DependencyError) OnlyClaims() bool
OnlyClaims indicates that the error was only related to claimants. Useful for backends to do "force" operations, making it easy to ignore this particular error.
type DependencyHandler ¶
type DependencyHandler func(err DependencyError) error
DependencyHandler is called (if set) when a worker run finishes with a dependency error. If it returns a non-nil error, that converts into a fatal error.
type EntroQ ¶
type EntroQ struct {
// contains filtered or unexported fields
}
EntroQ is a client interface for accessing the task queue.
func New ¶
New creates a new task client with the given backend implementation, for example, to use an in-memory implementation:
cli, err := New(ctx, mem.Opener())
func (*EntroQ) Claim ¶
Claim attempts to get the next unclaimed task from the given queues. It blocks until one becomes available or until the context is done. When it succeeds, it returns a task with the claimant set to the default, or to the value given in options, and an arrival time computed from the duration. The default duration if none is given is DefaultClaimDuration.
func (*EntroQ) DoWithRenew ¶
func (c *EntroQ) DoWithRenew(ctx context.Context, task *Task, lease time.Duration, f func(context.Context) error) (*Task, error)
DoWithRenew runs the provided function while keeping the given task lease renewed.
func (*EntroQ) DoWithRenewAll ¶
func (c *EntroQ) DoWithRenewAll(ctx context.Context, tasks []*Task, lease time.Duration, f func(context.Context) error) ([]*Task, error)
DoWithRenewAll runs the provided function while keeping all given tasks leases renewed.
func (*EntroQ) ID ¶
ID returns the default claimant ID of this client. Used in "bare" calls, like Modify, Claim, etc. To change the ID per call (usually not needed, and can be dangerous), use the "As" calls, e.g., ModifyAs.
func (*EntroQ) Modify ¶
func (c *EntroQ) Modify(ctx context.Context, modArgs ...ModifyArg) (inserted []*Task, changed []*Task, err error)
Modify allows a batch modification operation to be done, gated on the existence of all task IDs and versions specified. Deletions, Updates, and Dependencies must be present. The transaction all fails or all succeeds.
Returns all inserted task IDs, and an error if it could not proceed. If the error was due to missing dependencies, a *DependencyError is returned, which can be checked for by calling AsDependency(err).
func (*EntroQ) NewWorker ¶
NewWorker is a convenience method on an EntroQ client to create a worker.
func (*EntroQ) QueueStats ¶
QueueStats returns a mapping from queue names to task stats.
func (*EntroQ) QueuesEmpty ¶
QueuesEmpty indicates whether the specified task queues are all empty. If no options are specified, returns an error.
func (*EntroQ) RenewAllFor ¶
func (c *EntroQ) RenewAllFor(ctx context.Context, tasks []*Task, duration time.Duration) (result []*Task, err error)
RenewAllFor attempts to renew all given tasks' leases (update arrival times) for the given duration. Returns the new tasks.
func (*EntroQ) RenewFor ¶
RenewFor attempts to renew the given task's lease (update arrival time) for the given duration. Returns the new task.
func (*EntroQ) Time ¶
Time gets the time as the backend understands it, in UTC. Default is just time.Now().UTC().
func (*EntroQ) TryClaim ¶
TryClaim attempts one time to claim a task from the given queues. If there are no tasks, it returns a nil error *and* a nil task. This allows the caller to decide whether to retry. It can fail if certain (optional) dependency tasks are not present. This can be used, for example, to ensure that configuration tasks haven't changed.
type ErrQMap ¶
ErrQMap is a function that maps from an inbox name to its "move on error" error box name. If no mapping is found, a suitable default should be returned.
type IDOption ¶
type IDOption func(id *TaskID)
IDOption is an option for things that require task ID information. Allows additional ID-related metadata to be passed.
func WithIDQueue ¶
WithIDQueue specifies the queue for a particular task ID.
type InsertArg ¶
type InsertArg func(*Modification, *TaskData)
InsertArg is an argument to task insertion.
func WithArrivalTime ¶
WithArrivalTime changes the arrival time to a fixed moment during task insertion.
func WithArrivalTimeIn ¶
WithArrivalTimeIn computes the arrival time based on the duration from now, e.g.,
cli.Modify(ctx, InsertingInto("my queue", WithTimeIn(2 * time.Minute)))
func WithAttempt ¶
WithAttempt sets the number of attempts for this task. Usually not needed, handled automatically by the worker.
func WithErr ¶
WithErr sets the error field of a task during insertion. Usually not needed, as tasks are typically modified to add errors, not inserted with them.
func WithID ¶
WithID sets the task's ID for insertion. This is not normally needed, as the backend will assign a new, unique ID for this task if none is specified. There are cases where assigning an explicit insertion ID (always being careful that it is unique) can be useful, however.
For example, a not uncommon need is for a worker to do the following:
- Claim a task,
- Make database entries corresponding to downstream work,
- Insert tasks for the downstream work and delete claimed task.
If the database entries need to reference the tasks that have not yet been inserted (e.g., if they need to be used to get at the status of a task), it is not safe to simply update the database after insertion, as this introduces a race condition. If, for example, the following strategy is employed, then the task IDs may never make it into the database:
- Claim a task,
- Make database entries
- Insert tasks and delete claimed task
- Update database with new task IDs
In this event, it is entirely possible to successfully process the incoming task and create the outgoing tasks, then lose network connectivity and fail to add those IDs to the databse. Now it is no longer possible to update the database appropriately: the task information is simply lost.
Instead, it is safe to do the following:
- Claim a task
- Make database entries, including with to-be-created task IDs
- Insert tasks with those IDs and delete claimed task.
This avoids the potential data loss condition entirely.
There are other workarounds for this situation, like using a two-step creation process and taking advantage of the ability to move tasks between queues without disturbing their ID (only their version), but this is not uncommon enough to warrant requiring the extra worker logic just to get a task ID into the database.
func WithSkipColliding ¶
WithSkipColliding sets the insert argument to allow itself to be removed if the only error encountered is an ID collision. This can help when it is desired to insert multiple tasks, but a previous subset was already inserted with similar IDs. Sometimes you want to specify a superset to "catch what we missed".
type Modification ¶
type Modification struct { Claimant uuid.UUID `json:"claimant"` Inserts []*TaskData `json:"inserts"` Changes []*Task `json:"changes"` Deletes []*TaskID `json:"deletes"` Depends []*TaskID `json:"depends"` // contains filtered or unexported fields }
Modification contains all of the information for a single batch modification in the task store.
func NewModification ¶
func NewModification(claimant uuid.UUID, modArgs ...ModifyArg) *Modification
NewModification creates a new modification: insertions, deletions, changes, and dependencies all together. When creating this for the purpose of passing to WithModification, set the claimant to uuid.Nil (it is ignored in that case).
func (*Modification) AllDependencies ¶
func (m *Modification) AllDependencies() (map[uuid.UUID]int32, error)
AllDependencies returns a dependency map from ID to version for tasks that must exist and match version numbers. It returns an error if there are duplicates. Changes, Deletions, Dependencies and Insertions with IDs must be disjoint sets.
When using this to query backend storage for the presence of tasks, note that it is safe to ignore the version if you use the DependencyError method to determine whether a transaction can proceed. That checks versions appropriate for all different types of modification.
func (*Modification) DependencyError ¶
func (m *Modification) DependencyError(found map[uuid.UUID]*Task) error
DependencyError returns a DependencyError if there are problems with the dependencies found in the backend, or nil if everything is fine. Problems include missing or claimed dependencies, both of which will block a modification.
func (*Modification) String ¶
func (m *Modification) String() string
String produces a friendly version of this modification.
type ModifyArg ¶
type ModifyArg func(m *Modification)
ModifyArg is an argument to the Modify function, which does batch modifications to the task store.
func Changing ¶
Changing adds a task update to a Modify call, e.g., to modify the queue a task belongs in:
cli.Modify(ctx, Changing(myTask, QueueTo("a different queue name")))
func Deleting ¶
Deleting adds a deletion to a Modify call, e.g.,
cli.Modify(ctx, Deleting(id, version))
func DependingOn ¶
DependingOn adds a dependency to a Modify call, e.g., to insert a task into "my queue" with data "hey", but only succeeding if a task with anotherID and someVersion exists:
cli.Modify(ctx, InsertingInto("my queue", WithValue([]byte("hey"))), DependingOn(anotherID, someVersion))
func Inserting ¶
Inserting creates an insert modification from TaskData:
cli.Modify(ctx, Inserting(&TaskData{ Queue: "myqueue", At: time.Now.Add(1 * time.Minute), Value: []byte("hi there"), }))
Or, better still,
cli.Modify(ctx, InsertingInto("myqueue", WithArrivalTimeIn(1 * time.Minute), WithValue([]byte("hi there"))))
func InsertingInto ¶
InsertingInto creates an insert modification. Use like this:
cli.Modify(InsertingInto("my queue name", WithValue([]byte("hi there"))))
func ModifyAs ¶
ModifyAs sets the claimant ID for a particular modify call. Usually not needed, can be dangerous unless used with extreme care. The client default is used if this option is not provided.
func WithModification ¶
func WithModification(src *Modification) ModifyArg
WithModification returns a ModifyArg that merges the given Modification with whatever it is so far. Ignores Claimant field, and simply appends to all others.
type MoveTaskError ¶
type MoveTaskError struct {
Err error
}
MoveTaskError causes a task to be moved to a specified queue. This can be useful when non-fatal task-specific errors happen in a worker and we want to stash them somewhere instead of just causing the worker to crash, but allows us to handle that as an early error return. The error is added to the task.
func MoveTaskErrorf ¶
func MoveTaskErrorf(format string, values ...interface{}) *MoveTaskError
MoveTaskErrorf creates a MoveTaskError given a format string and values, just like fmt.Errorf.
func NewMoveTaskError ¶
func NewMoveTaskError(err error) *MoveTaskError
NewMoveTaskError creates a new MoveTaskError from the given error.
func (*MoveTaskError) Error ¶
func (e *MoveTaskError) Error() string
Error produces an error string.
type Notifier ¶
type Notifier interface { // Notify signals an event on the key. Wakes up one waiter, if any, or is // dropped if no waiters are waiting. Notify(key string) }
Notifier can be notified on a given key (e.g., queue name);
type NotifyWaiter ¶
NotifyWaiter can wait for and notify events.
type Option ¶
type Option func(*EntroQ)
Option is an option that modifies how EntroQ clients are created.
func WithClaimantID ¶
WithClaimantID sets the default claimaint ID for this client.
type QueueStat ¶
type QueueStat struct { Name string `json:"name"` // The queue name. Size int `json:"size"` // The total number of tasks. Claimed int `json:"claimed"` // The number of currently claimed tasks. Available int `json:"available"` // The number of available tasks. MaxClaims int `json:"maxClaims"` // The maximum number of claims for a task in the queue. }
QueueStat holds high-level information about a queue. Note that available + claimed may not add up to size. This is because a task can be unavailable (AT in the future) without being claimed by anyone.
type QueuesOpt ¶
type QueuesOpt func(*QueuesQuery)
QueuesOpt modifies how queue requests are made.
func LimitQueues ¶
LimitQueues sets the limit on the number of queues that are returned.
func MatchExact ¶
MatchExact adds an allowable exact match for a queue listing.
func MatchPrefix ¶
MatchPrefix adds allowable prefix matches for a queue listing.
type QueuesQuery ¶
type QueuesQuery struct { // MatchPrefix specifies allowable prefix matches. If empty, limitations // are not set based on prefix matching. All prefix match conditions are ORed. // If both this and MatchExact are empty or nil, no limitations are set on // queue name: all will be returned. MatchPrefix []string // MatchExact specifies allowable exact matches. If empty, limitations are // not set on exact queue names. MatchExact []string // Limit specifies an upper bound on the number of queue names returned. Limit int }
QueuesQuery modifies a queue listing request.
type RetryTaskError ¶
type RetryTaskError struct {
Err error
}
RetryTaskError causes a task to be retried, incrementing its Attempt field and setting its Err to the text of the error. If MaxAttempts is positive and nonzero, and has been reached, then this behaves in the same ways as a MoveTaskError.
func NewRetryTaskError ¶
func NewRetryTaskError(err error) *RetryTaskError
NewRetryTaskError creates a new RetryTaskError from the given error.
func RetryTaskErrorf ¶
func RetryTaskErrorf(format string, values ...interface{}) *RetryTaskError
RetryTaskErrorf creates a RetryTaskError in the same way that you would create an error with fmt.Errorf.
func (*RetryTaskError) Error ¶
func (e *RetryTaskError) Error() string
Error produces an error string.
type Task ¶
type Task struct { Queue string `json:"queue"` ID uuid.UUID `json:"id"` Version int32 `json:"version"` At time.Time `json:"at"` Claimant uuid.UUID `json:"claimant"` Claims int32 `json:"claims"` Value []byte `json:"value"` Created time.Time `json:"created"` Modified time.Time `json:"modified"` // FromQueue specifies the previous queue for a task that is moving to another queue. // Usually not present, can be used for change authorization (since two queues are in play, there). FromQueue string `json:"fromqueue,omitempty"` // Worker retry logic uses these fields when moving tasks and when retrying them. // It is left up to the consumer to determine how many attempts is too many // and to produce a suitable retry or move error. Attempt int32 `json:"attempt"` Err string `json:"err"` }
Task represents a unit of work, with a byte slice value payload. Note that Claims is the number of times a task has successfully been claimed. This is different than the version number, which increments for every modification, not just claims.
func PollTryClaim ¶
func PollTryClaim(ctx context.Context, eq *ClaimQuery, tc BackendClaimFunc) (*Task, error)
PollTryClaim runs a loop in which the TryClaim function is called between sleeps with exponential backoff (up to a point). Backend implementations may choose to use this as their Claim implementation.
func WaitTryClaim ¶
func WaitTryClaim(ctx context.Context, eq *ClaimQuery, tc BackendClaimFunc, w Waiter) (*Task, error)
WaitTryClaim runs a loop in which the TryClaim function is called, then if no tasks are available, the given wait function is used to attempt to wait for a task to become available on the queue.
The wait function should exit (more or less) immediately if the context is canceled, and should return a nil error if the wait was successful (something became available).
func (*Task) AsChange ¶
AsChange returns a ModifyArg that can be used in the Modify function, e.g.,
cli.Modify(ctx, task1.AsChange(ArrivalTimeBy(2 * time.Minute)))
The above is shorthand for
cli.Modify(ctx, Changing(task1, ArrivalTimeBy(2 * time.Minute)))
func (*Task) AsDeletion ¶
AsDeletion returns a ModifyArg that can be used in the Modify function, e.g.,
cli.Modify(ctx, task1.AsDeletion())
The above would cause the given task to be deleted, if it can be. It is shorthand for
cli.Modify(ctx, Deleting(task1.ID, task1.Version, WithIDQueue(task1.Queue)))
func (*Task) AsDependency ¶
AsDependency returns a ModifyArg that can be used to create a Modify dependency, e.g.,
cli.Modify(ctx, task.AsDependency())
That is shorthand for
cli.Modify(ctx, DependingOn(task.ID, task.Version, WithIDQueue(task.Queue)))
func (*Task) CopyOmitValue ¶
CopyOmitValue copies this task but leaves the value blank.
func (*Task) CopyWithValue ¶
CopyWithValue lets you specify whether the value should be copied.
type TaskData ¶
type TaskData struct { Queue string `json:"queue"` At time.Time `json:"at"` Value []byte `json:"value"` // Attempt indicates which "attempt number" this task is on. Used by workers. Attempt int32 `json:"attempt"` // Err contains error information for this task. Used by workers. Err string `json:"err"` // ID is an optional task ID to be used for task insertion. // Default (uuid.Nil) causes the backend to assign one, and that is // sufficient for many cases. If you desire to make a database entry that // *references* a task, however, in that case it can make sense to specify // an explicit task ID for insertion. This allows a common workflow cycle // // consume task -> db update -> insert tasks // // to be done safely, where the database update needs to refer to // to-be-inserted tasks. ID uuid.UUID `json:"id"` // These timings are here so that journaling can restore full state. // Usually they are blank, and there are no convenience methods to allow // them to be set. Leave them at default values in all cases. Created time.Time `json:"created"` Modified time.Time `json:"modified"` // contains filtered or unexported fields }
TaskData contains just the data, not the identifier or metadata. Used for insertions.
type TaskID ¶
type TaskID struct { ID uuid.UUID `json:"id"` Version int32 `json:"version"` Queue string `json:"queue,omitempty"` }
TaskID contains the identifying parts of a task. If IDs don't match (identifier and version together), then operations fail on those tasks.
Also contains the name of the queue in which this task resides. Can be omitted, as it does not effect functionality, but might be required for authorization, which is performed based on queue name. Present whenever using tasks as a source of IDs.
func (TaskID) AsDeletion ¶
AsDeletion produces an appropriate ModifyArg to delete the task with this ID.
func (TaskID) AsDependency ¶
AsDependency produces an appropriate ModifyArg to depend on this task ID.
type TasksOpt ¶
type TasksOpt func(*EntroQ, *TasksQuery)
TasksOpt is an option that can be passed into Tasks to control what it returns.
func LimitClaimant ¶
LimitClaimant only returns tasks with the given claimant, or expired tasks.
func LimitSelf ¶
func LimitSelf() TasksOpt
LimitSelf only returns self-claimed tasks or expired tasks.
func LimitTasks ¶
LimitTasks sets the limit on the number of tasks to return. A value <= 0 indicates "no limit".
func OmitValues ¶
func OmitValues() TasksOpt
OmitValues tells a tasks query to only return metadata, not values.
func WithTaskID ¶
WithTaskID adds a task ID to the set of IDs that can be returned in a task query. The default is "all that match other specs" if no IDs are specified. Note that versions are not part of the ID.
type TasksQuery ¶
type TasksQuery struct { Queue string Claimant uuid.UUID Limit int IDs []uuid.UUID // OmitValues specifies that only metadata should be returned. // Backends are not required to honor this flag, though any // service receiving it in a request should ensure that values // are not passed over the wire. OmitValues bool }
TasksQuery holds information for a tasks query.
type Waiter ¶
type Waiter interface { // Wait waits for an event on the given set of keys, calling cond after // poll intervals until one of them is notified, cond returns true, or the // context is canceled. // // If cond is nil, this function returns when the channel is notified, // the poll interval is exceeded, or the context is canceled. Only the last // event causes a non-nil error. // // If poll is 0, it can never be exceeded. // // A common use is to use poll=0 and cond=nil, causing this to simply wait // for a notification. Wait(ctx context.Context, keys []string, poll time.Duration, cond func() bool) error }
Waiter can wait for an event on a given key (e.g., queue name).
type Work ¶
Work is a function that is called by Run. It does work for one task, then returns any necessary modifications.
If this function returns a MoveTaskError, the original task is moved into a queue specified by calling ErrQMap on the original queue name. This is useful for keeping track of failed tasks by moving them out of the way instead of deleting them or allowing them to be picked up again.
If this function returns a RetryTaskError, the original task has its attempt field incremented, the err field is updated to contain the text of the error, and the worker goes around again, leaving it to be reclaimed. If the maximum number of attempts has been reached, however, the error acts like a MoveTaskError, instead.
type Worker ¶
type Worker struct { // Qs contains the queues to work on. Qs []string // ErrQMap maps an inbox to the queue tasks are moved to if a MoveTaskError // is returned from a worker's run function. ErrQMap ErrQMap // OnDepErr can hold a function to be called when a dependency error is // encountered. if it returns a non-nil error, it will become fatal. OnDepErr DependencyHandler // MaxAttempts indicates how many attempts are too many before a retryable // error becomes permanent and the task is moved to an error queue. MaxAttempts int32 // contains filtered or unexported fields }
Worker creates an iterator-like protocol for processing tasks in a queue, one at a time, in a loop. Each worker should only be accessed from a single goroutine. If multiple goroutines are desired, they should each use their own worker instance.
Example:
w := eqClient.NewWorker("queue_name") err := w.Run(ctx, func(ctx context.Context, task *Task) ([]ModifyArg, error) { // Do stuff with the task. // It's safe to mark it for deletion, too. It is renewed in the background. // If renewal changed its version, that is rewritten before modification. return []ModifyArg{task.AsDeletion()}, nil }) // Handle the error, which is nil if the context was canceled (but not if // it timed out).
func NewWorker ¶
NewWorker creates a new worker that makes it easy to claim and operate on tasks in an endless loop.
func (*Worker) Run ¶
Run attempts to run the given function once per each claimed task, in a loop, until the context is canceled or an unrecoverable error is encountered. The function can return modifications that should be done after it exits, and version numbers for claim renewals will be automatically updated.
func (*Worker) WithOpts ¶
func (w *Worker) WithOpts(opts ...WorkerOption) *Worker
WithOpts sets options on a newly-created worker.
type WorkerOption ¶
type WorkerOption func(*Worker)
WorkerOption can be passed to AnalyticWorker to modify the worker
func WithBaseRetryDelay ¶
func WithBaseRetryDelay(d time.Duration) WorkerOption
WithBaseRetryDelay sets the base delay for a retried task (the first attempt). Without any backoff settings, this is used for every retry. When used, the task is modified when its attempt is incremented to have its availabiliy time incremented by this amount from now.
func WithDependencyHandler ¶
func WithDependencyHandler(f DependencyHandler) WorkerOption
WithDependencyHandler sets a function to be called when a worker encounters a dependency error. If this function returns a non-nil error, the worker will exit.
Note that workers always exit on non-dependency errors, but usually treat dependency errors as things that can be retried. Specifying a handler for dependency errors allows different behavior as needed.
One possible use case for a dependency error handler is to reload a configuration task for the next round: if the task is depended on, but has been changed, the task can be retried, but configuration should also be reloaded, which could be done in a handler.
func WithErrQMap ¶
func WithErrQMap(f ErrQMap) WorkerOption
WithErrQMap sets a function that maps from inbox queue names to error queue names. Defaults to DefaultErrQMap.
func WithLease ¶
func WithLease(d time.Duration) WorkerOption
WithLease sets the frequency of task renewal. Tasks will be claimed for an amount of time slightly longer than this so that they have a chance of being renewed before expiring.
func WithMaxAttempts ¶
func WithMaxAttempts(m int32) WorkerOption
WithMaxAttempts sets the maximum attempts that are allowed before a RetryTaskError turns into a MoveTaskError (transparently). If this value is 0 (the default), then there is no maximum, and attempts can be incremented indefinitely without a move to an error queue.
Directories
¶
Path | Synopsis |
---|---|
backend
|
|
eqgrpc
Package eqgrpc provides a gRPC backend for EntroQ. This is the backend that is commonly used by clients of an EntroQ task service, set up thus:
|
Package eqgrpc provides a gRPC backend for EntroQ. This is the backend that is commonly used by clients of an EntroQ task service, set up thus: |
eqmem
Package eqmem implements an in-memory entroq that has fine-grained locking and can handle simultaneously stats/task listing and modifications to a large extent.
|
Package eqmem implements an in-memory entroq that has fine-grained locking and can handle simultaneously stats/task listing and modifications to a large extent. |
eqpg
Package eqpg provides an entroq.Backend using PostgreSQL.
|
Package eqpg provides an entroq.Backend using PostgreSQL. |
cmd
|
|
eqmemsvc
Command eqmemsvc starts up an in-memory EntroQ gRPC service.
|
Command eqmemsvc starts up an in-memory EntroQ gRPC service. |
eqmemsvc/cmd
Package cmd holds the commands for the eqmemsvc application.
|
Package cmd holds the commands for the eqmemsvc application. |
eqpgsvc
Command eqpgsvc starts a PostgreSQL-backed EntroQ gRPC service.
|
Command eqpgsvc starts a PostgreSQL-backed EntroQ gRPC service. |
contrib
|
|
mr
Package mr has a simple MapReduce implementation, one that does everything inside the task manager (no outside files).
|
Package mr has a simple MapReduce implementation, one that does everything inside the task manager (no outside files). |
mrtest
Package mrtest is a test package tightly tied to the mr package, separated out to avoid import cycles when other tests want to use it.
|
Package mrtest is a test package tightly tied to the mr package, separated out to avoid import cycles when other tests want to use it. |
pkg
|
|
authz
Package authz contains standard data structures for representing permissions and authorization requests / responses.
|
Package authz contains standard data structures for representing permissions and authorization requests / responses. |
authz/opahttp
Package opahttp implements the authz.Authorizer using an Open Policy Agent (OPA).
|
Package opahttp implements the authz.Authorizer using an Open Policy Agent (OPA). |
procworker
Package procworker implements a worker that reads a subprocess specification task, executes it, and puts results into an outbox.
|
Package procworker implements a worker that reads a subprocess specification task, executes it, and puts results into an outbox. |
queues
Package queues contains helper functions for manipulation of queue names.
|
Package queues contains helper functions for manipulation of queue names. |
Package qsvc contains the service implementation for registering with gRPC.
|
Package qsvc contains the service implementation for registering with gRPC. |
qtest
Package qtest contains standard testing routines for exercising various backends in similar ways.
|
Package qtest contains standard testing routines for exercising various backends in similar ways. |
Package subq abstracts the idea of subscribing to a particular queue so that changes can be immediately notified.
|
Package subq abstracts the idea of subscribing to a particular queue so that changes can be immediately notified. |