entroq

package module
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2021 License: Apache-2.0 Imports: 11 Imported by: 0

README

EntroQ

A task queue with strong competing-consumer semantics and transactional updates.

Pronounced "Entro-Q" ("Entro-Queue"), as in the letter than comes after "Entro-P". We aim to take the next step away from parallel systems chaos. It is also the descendent of github.com/shiblon/taskstore, an earlier and less robust attempt at the same idea.

It is designed to be as simple and modular as possible, doing one thing well. It is not a pubsub system, a database, or a generic RPC mechanism. It is only a competing-consumer work queue, and will only ever be that. As such, it has also been designed to do that one thing really well.

Use

A Docker container is available on Docker Hub as shiblon/entroq. You can use this to start an EntroQ service, then talk to it using the provided Go or Python libraries. It exposes port 37706 by default.

If you merely want a single-process in-memory work queue for Go, you can just use the in-memory implementation of the library without any server at all. For other languages, you should use the service and the gRPC-based language-specific library to talk to it.

There is also a command-line client that you can use to talk to the EntroQ service:

go install entrogo.com/entroq/cmd/eqc
eqc --help

You can then run eqc to talk to an EntroQ service (such as one started in the shiblon/entroq container).

There is also a Python-based command line, installable via pip:

python3 -m pip install git+https://github.com/shiblon/entroq
python3 -m entroq --help

Concepts

EntroQ supports precisely two atomic mutating operations:

  • Claim an available task from one or more queues, or
  • Update a set of tasks (delete, change, insert), optionally depending on other task versions.

There are a few read-only accessors, as well, such as a way to list tasks within a queue, a way to list queues with their sizes, etc. These read-only operations do not have any transactional properties, and are best-effort snapshots of queue state.

Both Claim and Modify change the version number of every task they affect. Any time any task is mutated, its version increments. Thus, if one process manages to mutate a task, any other process that was working on it will find that it is not available, and will fail. This is the key concept behind the "commit once" semantics of EntroQ.

Unlike many pub/sub systems that are used as competing consumer queues, this eliminates the possibility of work getting dropped after delivery, or work being committed more than once. Some additional detail on this approach is given below, but the fundamental principle is this:

Work commits should never be lost or duplicated.
Tasks

A task, at its most basic, is defined by these traits:

  • Queue Name
  • Globally Unique ID
  • Version
  • Arrival Time
  • Value

Tasks also hold other information, such as the ID of the current client that holds a claim on it, when it was created, and when it was modified, and how many times it has been claimed, but these are implementation details.

Queues

EntroQ can hold multiple "queues", though these do not exist as first-class concepts in the system. A queue only exists so long as there are tasks that want to be in it. If no tasks are in a particular queue, that queue does not exist. In database terms, you can think of the queue as being a property of the task, not the other way around. Indeed, in the Postgres backend, it is simply a column in a tasks table.

Because task IDs are durable, and only the version increments during mutation, it is possible to track a single task through multiple modifications. Any attempted mutation, however, will fail if both the ID and version don't match the task to be mutated. The ID of the task causing a modification failure is returned with the error.

This allows for a robust competing-consumer worker setup, where workers are guaranteed to not accidentally clobber each other's tasks.

Claims

A Claim occurs on one or more queues. A randomly chosen task among those that have an Arrival Time (spelled at) in the past is returned if any are available. Otherwise this call blocks. There is a similar call TryClaim that is non-blocking (and returns a nil value if no task could be claimed immediately), if that is desired.

When more than one queue is specified in a Claim operation, only one task will be returned from one of those queues. This allows a worker to be a fan-in consumer, fairly pulling tasks from multiple queues.

When successfully claiming a task, the following happen atomically:

  • Its version is incremented, and
  • Its arrival time is advanced to a point in the future.

Incrementing the version ensures that any previous claimants will fail to update that task: the version number will not match. Setting the arrival time in the future gives the new claimant time to do the work, or to establish a cycle of "at updates", where it renews the lease on the task until it is finished. It also allows the network time to return the task to the claimant.

Note: best practice is to keep the initial claim time relatively low, then rely on the claimant to update the arrival time periodically. This allows for situations like network partitions to be recovered from relatively quickly: a new worker will more quickly pick up a task that missed a 30-second renewal window than one that was reserved for 15 minutes with a claimant that died early in its lease.

Modify

The Modify call can do multiple modifications at once, in an atomic transaction. There are four kinds of modifications that can be made:

  • Insert: add one or more tasks to desired queues.
  • Delete: delete one or more tasks.
  • Change: change the value, queue, or arrival time of a given task.
  • Depend: require presence of one or more tasks and versions.

If any of the dependencies are not present (tasks to be deleted, changed, or depended upon, or tasks to be inserted when a colliding ID is specified), then the entire operation fails with an error indicating exactly which tasks were not present in the requested version or had colliding identifiers in the case of insertion.

Workers

Once you create an EntroQ client instance, you can use it to create what is called a "worker". A worker is essentially a claim-renew-modify loop on one or more queues. These can be run in goroutines (or threads, or your language's equivalent). Creating a worker using the standard library allows you to focus on writing only the logic that happens once a task has been acquired. In the background the claimed task is renewed for as long as the worker code is running.

The code that a worker runs is responsible for doing something with the claimed task, then returning the intended modifications that should happen when it is successful. For modification of the claimed task, the standard worker code handles updating the version number in case that task has been renewed in the background (and thus had its version increment while the work was being done).

This is a very common way of using EntroQ: stand up an EntroQ service, then start up one or more workers responsible for handling the flow of tasks through your system. Initial injection of tasks can easily be done with the provided libraries or command-line clients.

Commit Once, Maybe Work Twice

Tasks in EntroQ can only be acknowledged (deleted or moved) once. It is possible for more than one claimant (or "worker") to be performing the same tasks at the same time, but only one of them will succeed in deleting that task. Because the other will fail to mutate its task, it will also know to discard any work that it did.

Thus, it's possible for the actual work to be done more than once, but that work will only be durably recorded once. Because of this, idempotent worker design is still important, and some helpful principles are described below.

Meanwhile, to give some more detail about how two workers might end up working on the same task, consider an "Early and Slow" (ES) worker and a "Late and Quick" (LQ) worker. "Early and Slow" is the first to claim a particular task, but then takes a long time getting it done. This delay may be due to a network partition, or a slow disk on a machine, memory pressure, or process restarts. Whatever the reason, ES claims a task, then doesn't acknowledge it before the deadline.

While ES is busy working on its task, but not acknowleding or renewing it, "Late and Quick" (LQ) successfully claims the task after its arrival time is reached. If, while it is working on it, ES tries to commit its task, it has an earlier version of the task and the commit fails. LQ then finishes and succeeds, because it holds the most recent version of the task, which is the one represented in the queue.

This also works if LQ finishes before ES does, in which case ES tries to finally commit its work, but the task that is in the queue is no longer there: it has been deleted because LQ finished it and requested deletion.

These semantics, where a claim is a mutating event, and every alteration to a task changes its version, make it safe for multiple workers to attempt to do the same work without the danger of it being committed (reported) twice.

Safe Work

It is possible to abuse the ID/version distinction, by asking EntroQ to tell you about a given task ID, then overriding the claimant ID and task version. This is, in fact, how "force delete" works in the provided command-line client. If you wish to mutate a task, you should have first claimed it. Otherwise you have no guarantee that the operation is safe. Merely reading it (e.g., using the Tasks call) is not sufficient to guarantee safe mutation.

If you feel the need to use these facilities in normal worker code, however, that should be a sign that the design is wrong. Only in extreme cases, like manual intervention, should these ever be used. As a general rule,

Only work on claimed tasks, and never override the claimant or version.

If you need to force something, you probably want to use the command-line client, not a worker, and then you should be sure of the potential consequences to your system.

To further ensure safety when using a competing-consumer work queue like EntroQ, it is important to adhere to a few simple principles:

  • All outside mutations should be idempotent, and
  • Any output files should be uniquely named every time.

The first principle of idempotence allows things like database writes to be done safely by multiple workers (remembering the ES vs. LQ concept above). As an example, instead of incrementing a value in a database, it should simply be set to its final value. Sometimes an increment is really what you want, in which case you can make that operation idempotent by storing the final value in the task itself so that the worker simply records that. That way, no matter how many workers make the change, they make it to the same value. The principle is this:

Use stable, absolute values in mutations, not relative updates.

The second principle of unique file names applies to every worker that attempts to write anything. Each worker, even those working on the same task, should generate a random or time-based token in the file name for anything that it writes to the file system. While this can generate garbage that needs to be collected later, it also guarantees that partial writes do not corrupt complete ones. File system semantics are quite different from database semantics, and having uniquely-named outputs for every write helps to guarantee that corruption is avoided.

In short, when there is any likelihood of a file being written by more than one process,

Choose garbage collection over write efficiency.

Thankfully, adhering to these safety principles is usually pretty easy, resulting in great benefits to system stability and trustworthiness.

Backends

To create a Go program that makes use of EntroQ, use the entroq.New function and hand it a BackendOpener. An "opener" is a function that can, given a context and suitable parameters, create a backend for the client to use as its implementation.

To create a Python client, you can use the entroq package, which always speaks gRPC to a backend EntroQ service.

In Go, there are three primary backend implementations:

  • In-memory,
  • PostgreSQL, and
  • A gRPC client.
In-Memory Backend

The mem backend allows your EntroQ instance to work completely in-process. You can use exactly the same library calls to talk to it as you would if it were somewhere on the network, making it easy to start in memory and progress to persistent and/or networked implementations later as needed.

The following is a short example of how to create an in-memory work queue:

package main

import (
  "context"

  "entrogo.com/entroq"
  "entrogo.com/entroq/mem"
)

func main() {
  ctx := context.Background()
  eq := entroq.New(ctx, mem.Opener())

  // Use eq.Modify, eq.Insert, eq.Claim, etc., probably in goroutines.
}
gRPC Backend

The grpc backend is somewhat special. It converts an entroq.EntroQ client into a gRPC client that can talk to the proviced qsvc implementation, described below.

This allows you to stand up a gRPC endpoint in front of your "real" persistent backend, giving you authentication management and all of the other goodies that gRPC provides on the server side, all exposed via protocol buffers and the standard gRPC service interface.

All clients would then use the grpc backend to connect to this service, again with gRPC authentication and all else that it provides. This is the preferred way to use the EntroQ client library.

As a basic example of how to set up a gRPC-based EntroQ client:

package main

import (
  "context"

  "entrogo.com/entroq"
  "entrogo.com/entroq/grpc"
)

func main() {
  ctx := context.Background()
  eq := entroq.New(ctx, grpc.Opener(":37706"))

  // Use eq.Modify, eq.Insert, eq.Claim, etc., probably in goroutines.
}

The opener accepts a host name and a number of other gRPC-related optional parameters, including mTLS parameters and other familiar gRPC controls.

PostgreSQL Backend

The pg backend uses a PostgreSQL database. This is a performant, persistent backend option.

package main

import (
  "context"

  "entrogo.com/entroq"
  "entrogo.com/entroq/pg"
)

func main() {
  ctx := context.Background()
  eq := entroq.New(ctx, pg.Opener(":5432", pg.WithDB("postgres"), pg.WithUsername("myuser")))
  // The above supports other postgres-related parameters, as well.

  // Use eq.Modify, eq.Insert, eq.Claim, etc., probably in goroutines.
}

This backend is highly PostgreSQL-specific, as it requires the ability to issue a SELECT ... FOR UPDATE SKIP LOCKED query in order to performantly claim tasks. MySQL has similar support, so a similar backend could be written for it if desired.

Unfortunately, CockroachDB does not contain support for the necessary SQL statements, even though it speaks the PostgreSQL wire format. It cannot be used in place of PostgreSQL without implementing an entirely new backend (not impossible, just not done).

Starting a PostgreSQL Instance

If you wish to start an EntroQ service backed by PostgreSQL, you have two easy options: run a container with the database and the EntroQ service both inside of it, or run the database and EntroQ service separately.

Note that no matter how you run things, there is no need to create any tables in your database. The EntroQ service checks for the existence of a tasks table and creates it if it is not present.

If running the service and database in the same container, you can choose one of the images at Docker Hub with the tag prefix pg-. For example, you might choose to run

shiblon/entroq:pg-v0.3

This starts a container with both postgres and the EntroQ service running next to one another, communicating via the container's local network. The base image is a PostgreSQL image, so any environment variables you would normally use to configure the database are available to you. You can also mount a filesystem at /var/lib/postgresql/data to get persistence across container restarts. The EntroQ service is exposed on port 37706.

The eqc command-line utility is included in the entroq container, so you can play around with it using docker exec. If the container's name is stored in $container:

docker exec $container eqc --help

If you prefer to have the EntroQ service and database running in separate containers, an example docker-compose file is shown below that should give you the idea of how they interoperate.

Note that we use /tmp for the example below. This is not recommended in production for obvious reasons, but should illuminate the way things fit together. Note that the image name does not include the pg- tag prefix in the example below: the image containing a database is not needed in this case.

version: "3"
services:
  database:
    image: "postgres:12"
    deploy:
      restart_policy:
        condition: any
    restart: always
    volumes:
      - /tmp/postgres/data:/var/lib/postgresql/data

  queue:
    image: "shiblon/entroq:v0.3"
    depends_on:
      - database
    deploy:
      restart_policy:
        condition: any
    restart: always
    ports:
      - 37706:37706
    command:
      - "pg"
      - "--dbaddr=database:5432"
      - "--attempts=10"

This starts up PostgreSQL and EntroQ, where EntroQ will make multiple attempts to connect to the database before giving up, allowing PostgreSQL some time to get its act together.

QSvc

The qsvc directory contains the gRPC service implementation that is found in the Docker container shiblon/entroq. This service exposes the endpoints found in proto. Working services using various backends are found in the cmd directories, e.g.,

  • cmd/eqmemsvc
  • cmd/eqpgsvc

You can build any of these and start them on desired ports and with desired backend connections based on flag settings.

There is no service backend for grpc, though one could conceivably make sense as a sort of proxy. But in that case you should really just use a standard gRPC proxy.

When using one of these services, this is your main server and should be treated as a singleton. Clients should use the grpc backend to connect to it.

Does making the server a singleton affect performance? Yes, of course, you can't scale a singleton, but in practice if you are hitting a work queue that hard you likely have a granularity problem in your design.

Python Implementation

In contrib/py there is an implementation of a gRPC client in Python, including a basic CLI. You can use this to talk to a gRPC EntroQ service, and it includes a basic worker implementation so that you can relatively easily replace uses of Celery in Python.

The Python implementation is made to be pip-installable directly from github:

$ python3 -m pip install git+https://github.com/shiblon/entroq

This creates the entroq module and supporting protocol buffers beneath it. Use of these can be seen in the command-line client, implemented in __main__.py.

Examples

One of the most complete Go examples that is also used as a stress test is a naive implementation of MapReduce, in contrib/mr. If you look in there, you will see numerous idiomatic uses of the competing queue concept, complete with workers, using queue empty tests a part of system semantics, queues assigned to specific processes, and others.

Documentation

Overview

Copyright 2019 Chris Monson <shiblon@gmail.com>

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Package entroq contains the main task queue client and data definitions. The client relies on a backend to implement the actual transactional functionality, the interface for which is also defined here.

Index

Constants

View Source
const (
	DefaultClaimPollTime = 30 * time.Second
	DefaultClaimDuration = 30 * time.Second
)
View Source
const DefaultRetryDelay = 30 * time.Second

Variables

This section is empty.

Functions

func DefaultErrQMap added in v0.3.0

func DefaultErrQMap(inbox string) string

DefaultErrQMap appends "/err" to the inbox, and is the default behavior if no overriding error queue mapping options are provided.

func IsCanceled

func IsCanceled(err error) bool

IsCanceled indicates whether the error is a canceled error.

func IsTimeout

func IsTimeout(err error) bool

IsTimeout indicates whether the error is a timeout error.

func NotifyModified

func NotifyModified(n Notifier, inserted, changed []*Task)

NotifyModified takes inserted and changed tasks and notifies once per unique queue/ID pair.

func ProcessTime added in v0.3.0

func ProcessTime() time.Time

ProcessTime returns the time the calling process thinks it is, in UTC.

func QueuesFromStats added in v0.3.3

func QueuesFromStats(stats map[string]*QueueStat, err error) (map[string]int, error)

QueuesFromStats can be used for converting the new QueueStats to the old Queues output, making it easier on backend implementers to just define one function (similar to how WaitTryClaim or PollTryClaim can make implementing Claim in terms of TryClaim easier).

Types

type Backend

type Backend interface {
	// Queues returns a mapping from all known queues to their task counts.
	Queues(ctx context.Context, qq *QueuesQuery) (map[string]int, error)

	// QueueStats returns statistics for the specified queues query. Richer
	// than just calling Queues, as it can return more than just the size.
	QueueStats(ctx context.Context, qq *QueuesQuery) (map[string]*QueueStat, error)

	// Tasks retrieves all tasks from the given queue. If claimantID is
	// specified (non-zero), limits those tasks to those that are either
	// expired or belong to the given claimant. Otherwise returns them all.
	Tasks(ctx context.Context, tq *TasksQuery) ([]*Task, error)

	// TryClaim attempts to claim a task from the "top" (or close to it) of the
	// given queue. When claimed, a task is held for the duration specified
	// from the time of the claim. If claiming until a specific wall-clock time
	// is desired, the task should be immediately modified after it is claimed
	// to set At to a specific time. Returns a nil task and a nil error if
	// there is nothing to claim. Will fail with a DependencyError is a
	// specific task ID is requested but not present.
	TryClaim(ctx context.Context, cq *ClaimQuery) (*Task, error)

	// Claim is a blocking version of TryClaim, attempting to claim a task
	// from a queue, and blocking until canceled or a task becomes available.
	//
	// Will fail with a DependencyError is a specific task ID is requested but
	// not present. Never returns both a nil task and a nil error
	// simultaneously: a failure to claim a task is an error (potentially just
	// a timeout).
	Claim(ctx context.Context, cq *ClaimQuery) (*Task, error)

	// Modify attempts to atomically modify the task store, and only succeeds
	// if all dependencies are available and all mutations are either expired
	// or already owned by this claimant. The Modification type provides useful
	// functions for determining whether dependencies are good or bad. This
	// function is intended to return a DependencyError if the transaction could
	// not proceed because dependencies were missing or already claimed (and
	// not expired) by another claimant.
	Modify(ctx context.Context, mod *Modification) (inserted []*Task, changed []*Task, err error)

	// Time returns the time as the backend understands it, in UTC.
	Time(ctx context.Context) (time.Time, error)

	// Close closes any underlying connections. The backend is expected to take
	// ownership of all such connections, so this cleans them up.
	Close() error
}

Backend describes all of the functions that any backend has to implement to be used as the storage for task queues.

type BackendClaimFunc

type BackendClaimFunc func(ctx context.Context, eq *ClaimQuery) (*Task, error)

BackendClaimFunc is a function that can make claims based on a ClaimQuery. It is a convenience type for backends to use.

type BackendOpener

type BackendOpener func(ctx context.Context) (Backend, error)

BackendOpener is a function that can open a connection to a backend. Creating a client with a specific backend is accomplished by passing one of these functions into New.

type ChangeArg

type ChangeArg func(m *Modification, t *Task)

ChangeArg is an argument to the Changing function used to create arguments for Modify, e.g., to change the queue and set the expiry time of a task to 5 minutes in the future, you would do something like this:

  cli.Modify(ctx,
    Changing(myTask,
      QueueTo("a new queue"),
	     ArrivalTimeBy(5 * time.Minute)))

func ArrivalTimeBy

func ArrivalTimeBy(d time.Duration) ChangeArg

ArrivalTimeBy sets the arrival time to a time in the future, by the given duration.

func ArrivalTimeTo

func ArrivalTimeTo(at time.Time) ChangeArg

ArrivalTimeTo sets a specific arrival time on a changed task in the Changing function.

func AttemptToNext added in v0.4.3

func AttemptToNext() ChangeArg

AttemptToNext sets the Attempt field in Task to the next value (increments it).

func AttemptToZero added in v0.4.3

func AttemptToZero() ChangeArg

AttemptToZero resets the Attempt field to zero.

func ErrTo added in v0.4.3

func ErrTo(e string) ChangeArg

ErrTo sets the Err field in the task.

func ErrToZero added in v0.4.3

func ErrToZero() ChangeArg

ErrToZero sets the Err field to its zero value (clears the error).

func QueueTo

func QueueTo(q string) ChangeArg

QueueTo creates an option to modify a task's queue in the Changing function.

func ValueTo

func ValueTo(v []byte) ChangeArg

ValueTo sets the changing task's value to the given byte slice.

type ClaimOpt

type ClaimOpt func(*ClaimQuery)

ClaimOpt modifies limits on a task claim.

func ClaimAs

func ClaimAs(id uuid.UUID) ClaimOpt

ClaimAs sets the claimant ID for a claim operation. When not set, uses the internal default for this client.

func ClaimFor added in v0.3.0

func ClaimFor(duration time.Duration) ClaimOpt

ClaimFor sets the duration of a successful claim (the amount of time from now when it expires).

func ClaimPollTime

func ClaimPollTime(d time.Duration) ClaimOpt

ClaimPollTime sets the polling time for a claim. Set to DefaultClaimPollTime if left at 0.

func From added in v0.3.0

func From(qs ...string) ClaimOpt

From sets the queue(s) for a claim.

type ClaimQuery

type ClaimQuery struct {
	Queues   []string      // Queues to attempt to claim from. Only one wins.
	Claimant uuid.UUID     // The ID of the process trying to make the claim.
	Duration time.Duration // How long the task should be claimed for if successful.
	PollTime time.Duration // Length of time between (possibly interruptible) sleep and polling.
}

ClaimQuery contains information necessary to attempt to make a claim on a task in a specific queue.

type DependencyError

type DependencyError struct {
	Inserts []*TaskID
	Depends []*TaskID
	Deletes []*TaskID
	Changes []*TaskID

	Claims []*TaskID

	Message string
}

DependencyError is returned when a dependency is missing when modifying the task store.

func AsDependency

func AsDependency(err error) (DependencyError, bool)

AsDependency indicates whether the given error is a dependency error.

func DependencyErrorf

func DependencyErrorf(msg string, vals ...interface{}) DependencyError

DependencyErrorf creates a new dependency error with the given message.

func (DependencyError) Copy

Copy produces a new deep copy of this error type.

func (DependencyError) Error

func (m DependencyError) Error() string

Error produces a helpful error string indicating what was missing.

func (DependencyError) HasClaims

func (m DependencyError) HasClaims() bool

HasClaims indicates whether any of the tasks were claimed by another claimant and unexpired.

func (DependencyError) HasCollisions

func (m DependencyError) HasCollisions() bool

HasCollisions indicates whether any of the inserted tasks collided with existing IDs.

func (DependencyError) HasMissing

func (m DependencyError) HasMissing() bool

HasMissing indicates whether there was anything missing in this error.

type DependencyHandler added in v0.3.17

type DependencyHandler func(err DependencyError) error

DependencyHandler is called (if set) when a worker run finishes with a dependency error. If it returns a non-nil error, that converts into a fatal error.

type EntroQ

type EntroQ struct {
	// contains filtered or unexported fields
}

EntroQ is a client interface for accessing the task queue.

func New

func New(ctx context.Context, opener BackendOpener, opts ...Option) (*EntroQ, error)

New creates a new task client with the given backend implementation, for example, to use an in-memory implementation:

cli, err := New(ctx, mem.Opener())

func (*EntroQ) Claim

func (c *EntroQ) Claim(ctx context.Context, opts ...ClaimOpt) (*Task, error)

Claim attempts to get the next unclaimed task from the given queues. It blocks until one becomes available or until the context is done. When it succeeds, it returns a task with the claimant set to the default, or to the value given in options, and an arrival time computed from the duration. The default duration if none is given is DefaultClaimDuration.

func (*EntroQ) Close

func (c *EntroQ) Close() error

Close closes the underlying backend.

func (*EntroQ) DoWithRenew

func (c *EntroQ) DoWithRenew(ctx context.Context, task *Task, lease time.Duration, f func(context.Context) error) (*Task, error)

DoWithRenew runs the provided function while keeping the given task lease renewed.

func (*EntroQ) DoWithRenewAll

func (c *EntroQ) DoWithRenewAll(ctx context.Context, tasks []*Task, lease time.Duration, f func(context.Context) error) ([]*Task, error)

DoWithRenewAll runs the provided function while keeping all given tasks leases renewed.

func (*EntroQ) ID

func (c *EntroQ) ID() uuid.UUID

ID returns the default claimant ID of this client. Used in "bare" calls, like Modify, Claim, etc. To change the ID per call (usually not needed, and can be dangerous), use the "As" calls, e.g., ModifyAs.

func (*EntroQ) Modify

func (c *EntroQ) Modify(ctx context.Context, modArgs ...ModifyArg) (inserted []*Task, changed []*Task, err error)

Modify allows a batch modification operation to be done, gated on the existence of all task IDs and versions specified. Deletions, Updates, and Dependencies must be present. The transaction all fails or all succeeds.

Returns all inserted task IDs, and an error if it could not proceed. If the error was due to missing dependencies, a *DependencyError is returned, which can be checked for by calling AsDependency(err).

func (*EntroQ) NewWorker

func (c *EntroQ) NewWorker(qs ...string) *Worker

NewWorker is a convenience method on an EntroQ client to create a worker.

func (*EntroQ) QueueStats added in v0.3.3

func (c *EntroQ) QueueStats(ctx context.Context, opts ...QueuesOpt) (map[string]*QueueStat, error)

QueueStats returns a mapping from queue names to task stats.

func (*EntroQ) Queues

func (c *EntroQ) Queues(ctx context.Context, opts ...QueuesOpt) (map[string]int, error)

Queues returns a mapping from all queue names to their task counts.

func (*EntroQ) QueuesEmpty

func (c *EntroQ) QueuesEmpty(ctx context.Context, opts ...QueuesOpt) (bool, error)

QueuesEmpty indicates whether the specified task queues are all empty. If no options are specified, returns an error.

func (*EntroQ) RenewAllFor

func (c *EntroQ) RenewAllFor(ctx context.Context, tasks []*Task, duration time.Duration) ([]*Task, error)

RenewAllFor attempts to renew all given tasks' leases (update arrival times) for the given duration. Returns the new tasks.

func (*EntroQ) RenewFor

func (c *EntroQ) RenewFor(ctx context.Context, task *Task, duration time.Duration) (*Task, error)

RenewFor attempts to renew the given task's lease (update arrival time) for the given duration. Returns the new task.

func (*EntroQ) Tasks

func (c *EntroQ) Tasks(ctx context.Context, queue string, opts ...TasksOpt) ([]*Task, error)

Tasks returns a slice of all tasks in the given queue.

func (*EntroQ) Time added in v0.3.0

func (c *EntroQ) Time(ctx context.Context) (time.Time, error)

Time gets the time as the backend understands it, in UTC. Default is just time.Now().UTC().

func (*EntroQ) TryClaim

func (c *EntroQ) TryClaim(ctx context.Context, opts ...ClaimOpt) (*Task, error)

TryClaim attempts one time to claim a task from the given queues. If there are no tasks, it returns a nil error *and* a nil task. This allows the caller to decide whether to retry. It can fail if certain (optional) dependency tasks are not present. This can be used, for example, to ensure that configuration tasks haven't changed.

func (*EntroQ) WaitQueuesEmpty added in v0.3.0

func (c *EntroQ) WaitQueuesEmpty(ctx context.Context, opts ...QueuesOpt) error

WaitQueuesEmpty does a poll-and-wait strategy to block until the queue query returns empty.

type ErrQMap added in v0.3.0

type ErrQMap func(inbox string) string

ErrQMap is a function that maps from an inbox name to its "move on error" error box name. If no mapping is found, a suitable default should be returned.

type ErrorTaskValue added in v0.2.7

type ErrorTaskValue struct {
	Task *Task  `json:"task"`
	Err  string `json:"err"`
}

ErrorTaskValue holds a task that is moved to an error queue, with an error message attached.

type InsertArg

type InsertArg func(*Modification, *TaskData)

InsertArg is an argument to task insertion.

func WithArrivalTime

func WithArrivalTime(at time.Time) InsertArg

WithArrivalTime changes the arrival time to a fixed moment during task insertion.

func WithArrivalTimeIn

func WithArrivalTimeIn(duration time.Duration) InsertArg

WithArrivalTimeIn computes the arrival time based on the duration from now, e.g.,

cli.Modify(ctx,
  InsertingInto("my queue",
    WithTimeIn(2 * time.Minute)))

func WithAttempt added in v0.4.3

func WithAttempt(value int32) InsertArg

WithAttempt sets the number of attempts for this task. Usually not needed, handled automatically by the worker.

func WithErr added in v0.4.3

func WithErr(value string) InsertArg

WithErr sets the error field of a task during insertion. Usually not needed, as tasks are typically modified to add errors, not inserted with them.

func WithID

func WithID(id uuid.UUID) InsertArg

WithID sets the task's ID for insertion. This is not normally needed, as the backend will assign a new, unique ID for this task if none is specified. There are cases where assigning an explicit insertion ID (always being careful that it is unique) can be useful, however.

For example, a not uncommon need is for a worker to do the following:

  • Claim a task,
  • Make database entries corresponding to downstream work,
  • Insert tasks for the downstream work and delete claimed task.

If the database entries need to reference the tasks that have not yet been inserted (e.g., if they need to be used to get at the status of a task), it is not safe to simply update the database after insertion, as this introduces a race condition. If, for example, the following strategy is employed, then the task IDs may never make it into the database:

  • Claim a task,
  • Make database entries
  • Insert tasks and delete claimed task
  • Update database with new task IDs

In this event, it is entirely possible to successfully process the incoming task and create the outgoing tasks, then lose network connectivity and fail to add those IDs to the databse. Now it is no longer possible to update the database appropriately: the task information is simply lost.

Instead, it is safe to do the following:

  • Claim a task
  • Make database entries, including with to-be-created task IDs
  • Insert tasks with those IDs and delete claimed task.

This avoids the potential data loss condition entirely.

There are other workarounds for this situation, like using a two-step creation process and taking advantage of the ability to move tasks between queues without disturbing their ID (only their version), but this is not uncommon enough to warrant requiring the extra worker logic just to get a task ID into the database.

func WithSkipColliding added in v0.2.0

func WithSkipColliding(s bool) InsertArg

WithSkipColliding sets the insert argument to allow itself to be removed if the only error encountered is an ID collision. This can help when it is desired to insert multiple tasks, but a previous subset was already inserted with similar IDs. Sometimes you want to specify a superset to "catch what we missed".

func WithValue

func WithValue(value []byte) InsertArg

WithValue sets the task's byte slice value during insertion.

cli.Modify(ctx,
  InsertingInto("my queue",
    WithValue([]byte("hi there"))))

type Modification

type Modification struct {
	Claimant uuid.UUID

	Inserts []*TaskData
	Changes []*Task
	Deletes []*TaskID
	Depends []*TaskID
	// contains filtered or unexported fields
}

Modification contains all of the information for a single batch modification in the task store.

func NewModification

func NewModification(claimant uuid.UUID, modArgs ...ModifyArg) *Modification

NewModification creates a new modification: insertions, deletions, changes, and dependencies all together. When creating this for the purpose of passing to WithModification, set the claimant to uuid.Nil (it is ignored in that case).

func (*Modification) AllDependencies

func (m *Modification) AllDependencies() (map[uuid.UUID]int32, error)

AllDependencies returns a dependency map from ID to version for tasks that must exist and match version numbers. It returns an error if there are duplicates. Changes, Deletions, Dependencies and Insertions with IDs must be disjoint sets.

When using this to query backend storage for the presence of tasks, note that it is safe to ignore the version if you use the DependencyError method to determine whether a transaction can proceed. That checks versions appropriate for all different types of modification.

func (*Modification) DependencyError

func (m *Modification) DependencyError(found map[uuid.UUID]*Task) error

DependencyError returns a DependencyError if there are problems with the dependencies found in the backend, or nil if everything is fine. Problems include missing or claimed dependencies, both of which will block a modification.

type ModifyArg

type ModifyArg func(m *Modification)

ModifyArg is an argument to the Modify function, which does batch modifications to the task store.

func Changing

func Changing(task *Task, changeArgs ...ChangeArg) ModifyArg

Changing adds a task update to a Modify call, e.g., to modify the queue a task belongs in:

cli.Modify(ctx, Changing(myTask, QueueTo("a different queue name")))

func Deleting

func Deleting(id uuid.UUID, version int32) ModifyArg

Deleting adds a deletion to a Modify call, e.g.,

cli.Modify(ctx, Deleting(id, version))

func DependingOn

func DependingOn(id uuid.UUID, version int32) ModifyArg

DependingOn adds a dependency to a Modify call, e.g., to insert a task into "my queue" with data "hey", but only succeeding if a task with anotherID and someVersion exists:

cli.Modify(ctx,
  InsertingInto("my queue",
    WithValue([]byte("hey"))),
    DependingOn(anotherID, someVersion))

func Inserting

func Inserting(tds ...*TaskData) ModifyArg

Inserting creates an insert modification from TaskData:

cli.Modify(ctx,
	Inserting(&TaskData{
		Queue: "myqueue",
		At:    time.Now.Add(1 * time.Minute),
		Value: []byte("hi there"),
	}))

Or, better still,

cli.Modify(ctx,
	InsertingInto("myqueue",
	    WithArrivalTimeIn(1 * time.Minute),
	    WithValue([]byte("hi there"))))

func InsertingInto

func InsertingInto(q string, insertArgs ...InsertArg) ModifyArg

InsertingInto creates an insert modification. Use like this:

cli.Modify(InsertingInto("my queue name", WithValue([]byte("hi there"))))

func ModifyAs

func ModifyAs(id uuid.UUID) ModifyArg

ModifyAs sets the claimant ID for a particular modify call. Usually not needed, can be dangerous unless used with extreme care. The client default is used if this option is not provided.

func WithModification

func WithModification(src *Modification) ModifyArg

WithModification returns a ModifyArg that merges the given Modification with whatever it is so far. Ignores Claimant field, and simply appends to all others.

type MoveTaskError added in v0.2.7

type MoveTaskError struct {
	Err     error
	Renewed []*Task
}

MoveTaskError causes a task to be completely serialized, wrapped in a larger JSON object with error information, and moved to a specified queue. This can be useful when non-fatal task-specific errors happen in a worker and we want to stash them somewhere instead of just causing the worker to crash, but allows us to handle that as an early error return.

func AsMoveTaskError added in v0.2.7

func AsMoveTaskError(err error) (*MoveTaskError, bool)

AsMoveTaskError returns the underlying error and true iff the underlying error indicates a worker task should be moved to the error queue instead o causing the worker to exit.

func NewMoveTaskError added in v0.2.7

func NewMoveTaskError(err error) *MoveTaskError

NewMoveTaskError creates a new MoveTaskError from the given error.

func (*MoveTaskError) Error added in v0.2.7

func (e *MoveTaskError) Error() string

Error produces an error string.

func (*MoveTaskError) SetRenewedTask added in v0.4.3

func (e *MoveTaskError) SetRenewedTask(t ...*Task)

SetRenewedTask allows upstream callers (like DoWithRenew) to set the renewed task in the error itself, so that version skew may be overcome when doing things like moving or retrying a task (incrementing attempts, etc.).

type Notifier

type Notifier interface {
	// Notify signals an event on the key. Wakes up one waiter, if any, or is
	// dropped if no waiters are waiting.
	Notify(key string)
}

Notifier can be notified on a given key (e.g., queue name);

type NotifyWaiter

type NotifyWaiter interface {
	Notifier
	Waiter
}

NotifyWaiter can wait for and notify events.

type Option

type Option func(*EntroQ)

Option is an option that modifies how EntroQ clients are created.

func WithClaimantID

func WithClaimantID(id uuid.UUID) Option

WithClaimantID sets the default claimaint ID for this client.

type QueueStat added in v0.3.3

type QueueStat struct {
	Name      string `json:"name"`      // The queue name.
	Size      int    `json:"size"`      // The total number of tasks.
	Claimed   int    `json:"claimed"`   // The number of currently claimed tasks.
	Available int    `json:"available"` // The number of available tasks.

	MaxClaims int `json:"maxClaims"` // The maximum number of claims for a task in the queue.
}

QueueStat holds high-level information about a queue. Note that available + claimed may not add up to size. This is because a task can be unavailable (AT in the future) without being claimed by anyone.

type QueuesOpt

type QueuesOpt func(*QueuesQuery)

QueuesOpt modifies how queue requests are made.

func LimitQueues

func LimitQueues(limit int) QueuesOpt

LimitQueues sets the limit on the number of queues that are returned.

func MatchExact

func MatchExact(matches ...string) QueuesOpt

MatchExact adds an allowable exact match for a queue listing.

func MatchPrefix

func MatchPrefix(prefixes ...string) QueuesOpt

MatchPrefix adds allowable prefix matches for a queue listing.

type QueuesQuery

type QueuesQuery struct {
	// MatchPrefix specifies allowable prefix matches. If empty, limitations
	// are not set based on prefix matching. All prefix match conditions are ORed.
	// If both this and MatchExact are empty or nil, no limitations are set on
	// queue name: all will be returned.
	MatchPrefix []string

	// MatchExact specifies allowable exact matches. If empty, limitations are
	// not set on exact queue names.
	MatchExact []string

	// Limit specifies an upper bound on the number of queue names returned.
	Limit int
}

QueuesQuery modifies a queue listing request.

type RetryTaskError added in v0.4.3

type RetryTaskError struct {
	Err     error
	Renewed []*Task
}

RetryTaskError causes a task to be retried, incrementing its Attempt field and setting its Err to the text of the error. If MaxAttempts is positive and nonzero, and has been reached, then this behaves in the same ways as a MoveTaskError.

func AsRetryTaskError added in v0.4.3

func AsRetryTaskError(err error) (*RetryTaskError, bool)

AsRetryTaskError returns the underlying error and true iff the underlying error is a retry error.

func NewRetryTaskError added in v0.4.3

func NewRetryTaskError(err error) *RetryTaskError

NewRetryTaskError creates a new RetryTaskError from the given error.

func (*RetryTaskError) Error added in v0.4.3

func (e *RetryTaskError) Error() string

Error produces an error string.

func (*RetryTaskError) SetRenewedTask added in v0.4.3

func (e *RetryTaskError) SetRenewedTask(t ...*Task)

SetRenewedTask allows upstream callers (like DoWithRenew) to set the renewed task in the error itself, so that version skew may be overcome when doing things like moving or retrying a task (incrementing attempts, etc.).

type SetRenewedTasker added in v0.4.3

type SetRenewedTasker interface {
	SetRenewedTask(...*Task)
}

SetRenewedTasker matches errors that contain information about the "latest version" of a task. This is used, for example, in DoWithRenewAll. If an error matching this interface is passed back from its "do" function, that error contains information about the final renewed task(s), which will contain the latest version information. This can be used to do final modifications on the task, which would otherwise fail because the original task has new "renewed" versions over time.

type Task

type Task struct {
	Queue string `json:"queue"`

	ID      uuid.UUID `json:"id"`
	Version int32     `json:"version"`

	At       time.Time `json:"at"`
	Claimant uuid.UUID `json:"claimant"`
	Claims   int32     `json:"claims"`
	Value    []byte    `json:"value"`

	Created  time.Time `json:"created"`
	Modified time.Time `json:"modified"`

	// Worker retry logic uses these fields when moving tasks and when retrying them.
	// It is left up to the consumer to determine how many attempts is too many
	// and to produce a suitable retry or move error.
	Attempt int32  `json:"attempt"`
	Err     string `json:"err"`
}

Task represents a unit of work, with a byte slice value payload. Note that Claims is the number of times a task has successfully been claimed. This is different than the version number, which increments for every modification, not just claims.

func PollTryClaim

func PollTryClaim(ctx context.Context, eq *ClaimQuery, tc BackendClaimFunc) (*Task, error)

PollTryClaim runs a loop in which the TryClaim function is called between sleeps with exponential backoff (up to a point). Backend implementations may choose to use this as their Claim implementation.

func WaitTryClaim

func WaitTryClaim(ctx context.Context, eq *ClaimQuery, tc BackendClaimFunc, w Waiter) (*Task, error)

WaitTryClaim runs a loop in which the TryClaim function is called, then if no tasks are available, the given wait function is used to attempt to wait for a task to become available on the queue.

The wait function should exit (more or less) immediately if the context is canceled, and should return a nil error if the wait was successful (something became available).

func (*Task) AsChange

func (t *Task) AsChange(args ...ChangeArg) ModifyArg

AsChange returns a ModifyArg that can be used in the Modify function, e.g.,

cli.Modify(ctx, task1.AsChange(ArrivalTimeBy(2 * time.Minute)))

The above is shorthand for

cli.Modify(ctx, Changing(task1, ArrivalTimeBy(2 * time.Minute)))

func (*Task) AsDeletion

func (t *Task) AsDeletion() ModifyArg

AsDeletion returns a ModifyArg that can be used in the Modify function, e.g.,

cli.Modify(ctx, task1.AsDeletion())

The above would cause the given task to be deleted, if it can be. It is shorthand for

cli.Modify(ctx, Deleting(task1.ID(), task1.Version()))

func (*Task) AsDependency

func (t *Task) AsDependency() ModifyArg

AsDependency returns a ModifyArg that can be used to create a Modify dependency, e.g.,

cli.Modify(ctx, task.AsDependency())

That is shorthand for

cli.Modify(ctx, DependingOn(task.ID(), task.Version()))

func (*Task) Copy

func (t *Task) Copy() *Task

Copy copies this task's data and everything.

func (*Task) CopyOmitValue added in v0.3.9

func (t *Task) CopyOmitValue() *Task

CopyOmitValue copies this task but leaves the value blank.

func (*Task) Data

func (t *Task) Data() *TaskData

Data returns the data for this task.

func (*Task) IDVersion

func (t *Task) IDVersion() *TaskID

ID returns a Task ID from this task.

func (*Task) String

func (t *Task) String() string

String returns a useful representation of this task.

type TaskData

type TaskData struct {
	Queue string    `json:"queue"`
	At    time.Time `json:"at"`
	Value []byte    `json:"value"`

	// These can be changed by the user, so are part of the task data.
	Attempt int32  `json:"attempt"`
	Err     string `json:"err"`

	// ID is an optional task ID to be used for task insertion.
	// Default (uuid.Nil) causes the backend to assign one, and that is
	// sufficient for many cases. If you desire to make a database entry that
	// *references* a task, however, in that case it can make sense to specify
	// an explicit task ID for insertion. This allows a common workflow cycle
	//
	// 	consume task -> db update -> insert tasks
	//
	// to be done safely, where the database update needs to refer to
	// to-be-inserted tasks.
	ID uuid.UUID `json:"id"`
	// contains filtered or unexported fields
}

TaskData contains just the data, not the identifier or metadata. Used for insertions.

func (*TaskData) String added in v0.2.0

func (t *TaskData) String() string

String returns a string representation of the task data, excluding the value.

type TaskID

type TaskID struct {
	ID      uuid.UUID
	Version int32
}

TaskID contains the identifying parts of a task. If IDs don't match (identifier and version together), then operations fail on those tasks.

func (TaskID) AsDeletion added in v0.3.4

func (t TaskID) AsDeletion() ModifyArg

AsDeletion produces an appropriate ModifyArg to delete the task with this ID.

func (TaskID) AsDependency added in v0.3.17

func (t TaskID) AsDependency() ModifyArg

AsDependency produces an appropriate ModifyArg to depend on this task ID.

func (TaskID) String

func (t TaskID) String() string

String produces the id:version string representation.

type TasksOpt

type TasksOpt func(*EntroQ, *TasksQuery)

TasksOpt is an option that can be passed into Tasks to control what it returns.

func LimitClaimant

func LimitClaimant(id uuid.UUID) TasksOpt

LimitClaimant only returns tasks with the given claimant, or expired tasks.

func LimitSelf

func LimitSelf() TasksOpt

LimitSelf only returns self-claimed tasks or expired tasks.

func LimitTasks

func LimitTasks(limit int) TasksOpt

LimitTasks sets the limit on the number of tasks to return. A value <= 0 indicates "no limit".

func OmitValues added in v0.3.9

func OmitValues() TasksOpt

OmitValues tells a tasks query to only return metadata, not values.

func WithTaskID

func WithTaskID(ids ...uuid.UUID) TasksOpt

WithTaskID adds a task ID to the set of IDs that can be returned in a task query. The default is "all that match other specs" if no IDs are specified. Note that versions are not part of the ID.

type TasksQuery

type TasksQuery struct {
	Queue    string
	Claimant uuid.UUID
	Limit    int
	IDs      []uuid.UUID

	// OmitValues specifies that only metadata should be returned.
	// Backends are not required to honor this flag, though any
	// service receiving it in a request should ensure that values
	// are not passed over the wire.
	OmitValues bool
}

TasksQuery holds information for a tasks query.

type Waiter

type Waiter interface {
	// Wait waits for an event on the given set of keys, calling cond after
	// poll intervals until one of them is notified, cond returns true, or the
	// context is canceled.
	//
	// If cond is nil, this function returns when the channel is notified,
	// the poll interval is exceeded, or the context is canceled. Only the last
	// event causes a non-nil error.
	//
	// If poll is 0, it can never be exceeded.
	//
	// A common use is to use poll=0 and cond=nil, causing this to simply wait
	// for a notification.
	Wait(ctx context.Context, keys []string, poll time.Duration, cond func() bool) error
}

Waiter can wait for an event on a given key (e.g., queue name).

type Work added in v0.3.0

type Work func(ctx context.Context, task *Task) ([]ModifyArg, error)

Work is a function that is called by Run. It does work for one task, then returns any necessary modifications.

If this function returns a MoveTaskError, the original task is moved into a queue specified by calling ErrQMap on the original queue name. This is useful for keeping track of failed tasks by moving them out of the way instead of deleting them or allowing them to be picked up again.

If this function returns a RetryTaskError, the original task has its attempt field incremented, the err field is updated to contain the text of the error, and the worker goes around again, leaving it to be reclaimed. If the maximum number of attempts has been reached, however, the error acts like a MoveTaskError, instead.

type Worker

type Worker struct {
	// Qs contains the queues to work on.
	Qs []string

	// ErrQMap maps an inbox to the queue tasks are moved to if a MoveTaskError
	// is returned from a worker's run function.
	ErrQMap ErrQMap

	// OnDepErr can hold a function to be called when a dependency error is
	// encountered. if it returns a non-nil error, it will become fatal.
	OnDepErr DependencyHandler

	// MaxAttempts indicates how many attempts are too many before a retryable
	// error becomes permanent and the task is moved to an error queue.
	MaxAttempts int32
	// contains filtered or unexported fields
}

Worker creates an iterator-like protocol for processing tasks in a queue, one at a time, in a loop. Each worker should only be accessed from a single goroutine. If multiple goroutines are desired, they should each use their own worker instance.

Example:

w := eqClient.NewWorker("queue_name")
err := w.Run(ctx, func(ctx context.Context, task *Task) ([]ModifyArg, error) {
	// Do stuff with the task.
	// It's safe to mark it for deletion, too. It is renewed in the background.
	// If renewal changed its version, that is rewritten before modification.
	return []ModifyArg{task.AsDeletion()}, nil
})
// Handle the error, which is nil if the context was canceled (but not if
// it timed out).

func NewWorker added in v0.3.7

func NewWorker(eq *EntroQ, qs ...string) *Worker

NewWorker creates a new worker that makes it easy to claim and operate on tasks in an endless loop.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context, f Work) (err error)

Run attempts to run the given function once per each claimed task, in a loop, until the context is canceled or an unrecoverable error is encountered. The function can return modifications that should be done after it exits, and version numbers for claim renewals will be automatically updated.

func (*Worker) WithOpts added in v0.3.0

func (w *Worker) WithOpts(opts ...WorkerOption) *Worker

type WorkerOption

type WorkerOption func(*Worker)

WorkerOption can be passed to AnalyticWorker to modify the worker

func WithBaseRetryDelay added in v0.4.3

func WithBaseRetryDelay(d time.Duration) WorkerOption

WithBaseRetryDelay sets the base delay for a retried task (the first attempt). Without any backoff settings, this is used for every retry. When used, the task is modified when its attempt is incremented to have its availabiliy time incremented by this amount from now.

func WithDependencyHandler added in v0.3.17

func WithDependencyHandler(f DependencyHandler) WorkerOption

WithDependencyHandler sets a function to be called when a worker encounters a dependency error. If this function returns a non-nil error, the worker will exit.

Note that workers always exit on non-dependency errors, but usually treat dependency errors as things that can be retried. Specifying a handler for dependency errors allows different behavior as needed.

One possible use case for a dependency error handler is to reload a configuration task for the next round: if the task is depended on, but has been changed, the task can be retried, but configuration should also be reloaded, which could be done in a handler.

func WithErrQMap added in v0.3.0

func WithErrQMap(f ErrQMap) WorkerOption

WithErrQMap sets a function that maps from inbox queue names to error queue names. Defaults to DefaultErrQMap.

func WithLease

func WithLease(d time.Duration) WorkerOption

WithLease sets the frequency of task renewal. Tasks will be claimed for an amount of time slightly longer than this so that they have a chance of being renewed before expiring.

func WithMaxAttempts added in v0.4.3

func WithMaxAttempts(m int32) WorkerOption

WithMaxAttempts sets the maximum attempts that are allowed before a RetryTaskError turns into a MoveTaskError (transparently). If this value is 0 (the default), then there is no maximum, and attempts can be incremented indefinitely without a move to an error queue.

func WithWrappedMove added in v0.4.3

func WithWrappedMove(on bool) WorkerOption

WithWrappedMove changes behavior of a MoveTaskError to wrap the entire task into a brand new error task, where the old task is serialized into bytes and stored as the new tas's value. The default is to use Attempt and Err to store necessary data in the existing task, instead.

Directories

Path Synopsis
cmd
eqc
eqmemsvc
Command eqmemsvc starts up an in-memory EntroQ gRPC service.
Command eqmemsvc starts up an in-memory EntroQ gRPC service.
eqmemsvc/cmd
Package cmd holds the commands for the eqmemsvc application.
Package cmd holds the commands for the eqmemsvc application.
eqpgsvc
Command eqpgsvc starts a PostgreSQL-backed EntroQ gRPC service.
Command eqpgsvc starts a PostgreSQL-backed EntroQ gRPC service.
contrib
mr
Package mr has a simple MapReduce implementation, one that does everything inside the task manager (no outside files).
Package mr has a simple MapReduce implementation, one that does everything inside the task manager (no outside files).
mrtest
Package mrtest is a test package tightly tied to the mr package, separated out to avoid import cycles when other tests want to use it.
Package mrtest is a test package tightly tied to the mr package, separated out to avoid import cycles when other tests want to use it.
pkg/procworker
Package procworker implements a worker that reads a subprocess specification task, executes it, and puts results into an outbox.
Package procworker implements a worker that reads a subprocess specification task, executes it, and puts results into an outbox.
Package grpc provides a gRPC backend for EntroQ. This is the backend that is commonly used by clients of an EntroQ task service, set up thus: Server: qsvc -> entroq library -> some backend (e.g., pg) Client: entroq library -> grpc backend You can start, for example, a postgres-backed QSvc like this (or just use pg/svc): ctx := context.Background() svc, err := qsvc.New(ctx, pg.Opener(dbHostPort)) // Other options available, too.
Package grpc provides a gRPC backend for EntroQ. This is the backend that is commonly used by clients of an EntroQ task service, set up thus: Server: qsvc -> entroq library -> some backend (e.g., pg) Client: entroq library -> grpc backend You can start, for example, a postgres-backed QSvc like this (or just use pg/svc): ctx := context.Background() svc, err := qsvc.New(ctx, pg.Opener(dbHostPort)) // Other options available, too.
Package mem is an in-memory implementation of an EntroQ backend.
Package mem is an in-memory implementation of an EntroQ backend.
Package pg provides an entroq.Backend using PostgreSQL.
Package pg provides an entroq.Backend using PostgreSQL.
Package qsvc contains the service implementation for registering with gRPC.
Package qsvc contains the service implementation for registering with gRPC.
qtest
Package qtest contains standard testing routines for exercising various backends in similar ways.
Package qtest contains standard testing routines for exercising various backends in similar ways.
Package queues contains helper functions for manipulation of queue names.
Package queues contains helper functions for manipulation of queue names.
Package subq abstracts the idea of subscribing to a particular queue so that changes can be immediately notified.
Package subq abstracts the idea of subscribing to a particular queue so that changes can be immediately notified.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL