package module
Version: v0.7.3 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2021 License: Apache-2.0 Imports: 10 Imported by: 10



A task queue with strong competing-consumer semantics and transactional updates.

See here for an article explaining how this might fit into your system:


The Go components of this package can be found in online documenation for the entrogo.com/entroq Go package.

Pronounced "Entro-Q" ("Entro-Queue"), as in the letter than comes after "Entro-P". We aim to take the next step away from parallel systems chaos. It is also the descendent of github.com/shiblon/taskstore, an earlier and less robust attempt at the same idea.

It is designed to be as simple and modular as possible, doing one thing well. It is not a pubsub system, a database, or a generic RPC mechanism. It is only a competing-consumer work queue, and will only ever be that. As such, it has also been designed to do that one thing really well.


A Docker container is available on Docker Hub as shiblon/entroq. You can use this to start an EntroQ service, then talk to it using the provided Go or Python libraries. It exposes port 37706 by default.

The default service that runs uses an in-memory work queue, coupled with an optional write-ahead log for persistence and fault tolerance. There is also a PostgreSQL-backed version that can be chosen.

If you merely want an in-process work queue for Go, you can just use the in-memory implementation of the library without any server at all. For other languages, you should use the service and the gRPC-based language-specific library to talk to it.

There is also a command-line client that you can use to talk to the EntroQ service:

go install entrogo.com/entroq/cmd/eqc@latest
eqc --help

You can then run eqc to talk to an EntroQ service (such as one started in the shiblon/entroq container from Docker Hub).

There is also a Python-based command line, installable via pip:

python3 -m pip install git+https://github.com/shiblon/entroq
python3 -m entroq --help


EntroQ supports precisely two atomic mutating operations:

  • Claim an available task from one of possibly several queues, or
  • Update a set of tasks (delete, change, insert), optionally depending on the passive existence of other task versions.

There are a few read-only accessors, as well, such as a way to list tasks within a queue, a way to list queues with their sizes, etc. These read-only operations do not have any transactional properties, and are best-effort snapshots of queue state. Every effort has been made to ensure that these read-only operations do not cause starvation of fundamental operations.

Both Claim and Modify change the version number of every task they affect. Any time any task is mutated, its version increments. Thus, if one process manages to mutate a task, any other process that was working on it will find that it is not available, and will fail. This is the key concept behind the "commit once" semantics of EntroQ.

Unlike many pub/sub systems that are used as competing consumer queues, this eliminates the possibility of work getting dropped after delivery, or work being committed more than once. Some additional detail on this approach is given below, but the fundamental principle is this:

Work commits should never be lost or duplicated.

A task, at its most basic, is defined by these traits:

  • Queue Name
  • Globally Unique ID
  • Version
  • Arrival Time
  • Value

Tasks also hold other information, such as the ID of the current client that holds a claim on it, when it was created, and when it was modified, and how many times it has been claimed, but these are implementation details.


EntroQ can hold multiple "queues", though these do not exist as first-class concepts in the system. A queue only exists so long as there are tasks that want to be in it. If no tasks are in a particular queue, that queue does not exist. In database terms, you can think of the queue as being a property of the task, not the other way around. Indeed, in the PostgreSQL backend, it is simply a column in a tasks table.

Because task IDs are durable, and only the version increments during mutation, it is possible to track a single task through multiple modifications. Any attempted mutation, however, will fail if both the ID and version don't match the task to be mutated. The ID of the task causing a modification failure is returned with the error.

This allows for a robust competing-consumer worker setup, where workers are guaranteed to not accidentally clobber each other's tasks.


A Claim occurs on one or more queues. A randomly chosen task among those that have an Arrival Time (spelled at) in the past is returned if any are available. Otherwise this call blocks. There is a similar call TryClaim that is non-blocking (and returns a nil value if no task could be claimed immediately), if that is desired.

When more than one queue is specified in a Claim operation, only one task will be returned from one of those queues. This allows a worker to be a fan-in consumer, fairly pulling tasks from multiple queues. If a "fast lane" is desired for a particular worker, this can be achieved by simply having more than one queue that it claims from. Tasks will be pulled fairly from multiple queues, and thus the shortest will be consumed earlier than any longer ones. This is how things tend to work in amusement parks, for example. More complex priority schemes have been considered, but tend to be fraught with peril and unintended consequences.

When successfully claiming a task, the following happen atomically:

  • Its version is incremented, and
  • Its arrival time is advanced to a point in the future.

Incrementing the version ensures that any previous claimants will fail to update that task: the version number will not match. Setting the arrival time in the future gives the new claimant time to do the work, or to establish a cycle of "at updates", where it renews the lease on the task until it is finished. It also allows the network time to return the task to the claimant.

Note: best practice is to keep the initial claim time relatively low, then rely on the claimant to update the arrival time periodically. This allows for situations like network partitions to be recovered from relatively quickly: a new worker will more quickly pick up a task that missed a 30-second renewal window than one that was reserved for 15 minutes with a claimant that died early in its lease.


The Modify call can do multiple modifications at once, in an atomic transaction. There are four kinds of modifications that can be made:

  • Insert: add one or more tasks to desired queues.
  • Delete: delete one or more tasks.
  • Change: change the value, queue, or arrival time of a given task.
  • Depend: require passive presence of one or more tasks and versions.

If any of the dependencies are not present (tasks to be deleted, changed, or depended upon, or tasks to be inserted when a colliding ID is specified), then the entire operation fails with an error indicating exactly which tasks were not present in the requested version or had colliding identifiers in the case of insertion.

Because work is not lost until explicitly acknowledged (deleted), it is usually safe to simply abandon work when receiving a deependency error, and grab another task to work on. EntroQ has been designed to avoid starving any queue with tasks that might have inherent data that causes crashes or bugs in workers. These tasks will stick around and be retried periodically, but meanwhile others will go ahead of them because ready tasks are selected at random from each queue.


Once you create an EntroQ client instance, you can use it to create what is called a "worker". A worker is essentially a claim-renew-modify loop on one or more queues. These can be run in goroutines (or threads, or your language's equivalent). Creating a worker using the standard library allows you to focus on writing only the logic that happens once a task has been acquired. In the background the claimed task is renewed for as long as the worker code is running. Good worker code is available in Go, and less-good-but-reasonable code for workers is provided in contrib/py. The principles are straightforward to implement in any language that can speak gRPC.

The code that a worker runs is responsible for doing something with the claimed task, then returning the intended modifications that should happen when it is successful. For modification of the claimed task, the standard worker code handles updating the version number in case that task has been renewed in the background (and thus had its version increment while the work was being done).

This is a very common way of using EntroQ: stand up an EntroQ service, then start up one or more workers responsible for handling the flow of tasks through your system. Initial injection of tasks can easily be done with the provided libraries or command-line clients.

Rather than design a pipeline, it makes sense to have workers that are responsible for doing small tasks, and one or more worker types that are responsible for implementing a pipeline state machine. In its simplest form, you can create a "trampoline" worker that handles responses to a single global queue and pushes them into individual task queues depending on contents and disposition.

Pipelines are very brittle ideas and should generally be avoided. In a pipeline that grows over time, the complexity of each component increases exponentially with the number of possible input and output types. A trampoline, on the other hand, allows every worker to be "single-purpose", encoding state transitions in one place instead of spreading them across the entirety of a microservice architecture.

Commit Once, Maybe Work Twice

Tasks in EntroQ can only be acknowledged (deleted or moved) once. It is possible for more than one claimant (or "worker") to be performing the same tasks at the same time, but only one of them will succeed in deleting that task. Because the other will fail to mutate its task, it will also know to discard any work that it did.

Thus, it's possible for the actual work to be done more than once, but that work will only be durably recorded once. Because of this, idempotent worker design is still important, and some helpful principles are described below.

Meanwhile, to give some more detail about how two workers might end up working on the same task, consider an "Early and Slow" (ES) worker and a "Late and Quick" (LQ) worker. "Early and Slow" is the first to claim a particular task, but then takes a long time getting it done. This delay may be due to a network partition, or a slow disk on a machine, memory pressure, or process restarts. Whatever the reason, ES claims a task, then doesn't acknowledge it before the deadline.

While ES is busy working on its task, but not acknowleding or renewing it, "Late and Quick" (LQ) successfully claims the task after its arrival time is reached. If, while it is working on it, ES tries to commit its task, it has an earlier version of the task and the commit fails. LQ then finishes and succeeds, because it holds the most recent version of the task, which is the one represented in the queue.

This also works if LQ finishes before ES does, in which case ES tries to finally commit its work, but the task that is in the queue is no longer there: it has been deleted because LQ finished it and requested deletion.

These semantics, where a claim is a mutating event, and every alteration to a task changes its version, make it safe for multiple workers to attempt to do the same work without the danger of it being committed (reported) twice.

Safe Work

It is possible to abuse the ID/version distinction, by asking EntroQ to tell you about a given task ID, then overriding the claimant ID and task version. This is, in fact, how "force delete" works in the provided command-line client. If you wish to mutate a task, you should have first claimed it. Otherwise you have no guarantee that the operation is safe. Merely reading it (e.g., using the Tasks call) is not sufficient to guarantee safe mutation.

If you feel the need to use these facilities in normal worker code, however, that should be a sign that the design is wrong. Only in extreme cases, like manual intervention, should these ever be used. As a general rule,

Only work on claimed tasks, and never override the claimant or version.

If you need to force something, you probably want to use the command-line client so that a human with human judgement is involved, not in a worker. Then you should be sure of the potential consequences to your system.

To further ensure safety when using a competing-consumer work queue like EntroQ, it is important to adhere to a few simple principles:

  • All outside mutations should be idempotent, and
  • Any output files should be uniquely named every time.

The first principle of idempotence allows things like database writes to be done safely by multiple workers (remembering the ES vs. LQ concept above). As an example, instead of incrementing a value in a database, it should simply be set to its final value. Sometimes an increment is really what you want, in which case you can make that operation idempotent by storing the final value in the task itself so that the worker simply records that. That way, no matter how many workers make the change, they make it to the same value. The principle is this:

Use stable, absolute values in mutations, not relative updates.

The second principle of unique file names applies to every worker that attempts to write anything. Each worker, even those working on the same task, should generate a random or time-based token in the file name for anything that it writes to the file system. While this can generate garbage that needs to be collected later, it also guarantees that partial writes do not corrupt complete ones. File system semantics are quite different from database semantics, and having uniquely-named outputs for every write helps to guarantee that corruption is avoided.

In short, when there is any likelihood of a file being written by more than one process,

Choose garbage collection over write efficiency.

Thankfully, adhering to these safety principles is usually pretty easy, resulting in great benefits to system stability and trustworthiness.


To create a Go program that makes use of EntroQ, use the entroq.New function and hand it a BackendOpener. An "opener" is a function that can, given a context and suitable parameters, create a backend for the client to use as its implementation.

To create a Python client, you can use the entroq package, which always speaks gRPC to a backend EntroQ service.

In Go, there are three primary backend implementations:

  • In-memory,
  • PostgreSQL, and
  • A gRPC client.
In-Memory Backend

The eqmem backend allows your EntroQ instance to work completely in-process. You can use exactly the same library calls to talk to it as you would if it were somewhere on the network, making it easy to start in memory and progress to database or networked implementations later as needed.

The following is a short example of how to create an in-memory work queue:

package main

import (


func main() {
  ctx := context.Background()
  eq := entroq.New(ctx, eqmem.Opener())

  // Use eq.Modify, eq.Insert, eq.Claim, etc., probably in goroutines.

The memory backend contains a write-ahead log implementation for persistence. See the code documentation for how to set parameters to specify the journal directory and when journals should be rotated.

The EntroQ Docker Hub Image defaults to using an in-memory implementation backed by a journal with periodic snapshots. See the volume mounts in the relevant Dockerfile to know how to mount your own data directories into a running container. The default container starts a gRPC service using this journal-backed in-memory implementation.

gRPC Backend

The grpc backend is somewhat special. It converts an entroq.EntroQ client into a gRPC client that can talk to the provided qsvc implementation, described below.

This allows you to stand up a gRPC endpoint in front of your "real" persistent backend, giving you authentication management and all of the other goodies that gRPC provides on the server side, all exposed via protocol buffers and the standard gRPC service interface.

All clients would then use the grpc backend to connect to this service, again with gRPC authentication and all else that it provides. This is the preferred way to use the EntroQ client library. In fact, the eqc command-line client is really just a gRPC client that can be used to speak to the default Docker container mentioned earlier.

As a basic example of how to set up a gRPC-based EntroQ client:

package main

import (


func main() {
  ctx := context.Background()
  eq := entroq.New(ctx, eqgrpc.Opener(":37706"))

  // Use eq.Modify, eq.Insert, eq.Claim, etc., probably in goroutines.

The opener accepts a host name and a number of other gRPC-related optional parameters, including mTLS parameters and other familiar gRPC controls.

PostgreSQL Backend

The pg backend uses a PostgreSQL database. This is a performant, persistent backend option that is suitable for heavy loads (though if your load on this system is truly heavy, you might have gotten your task granularity wrong).

package main

import (


func main() {
  ctx := context.Background()
  eq := entroq.New(ctx, eqpg.Opener(":5432", eqpg.WithDB("postgres"), eqpg.WithUsername("myuser")))
  // The above supports other postgres-related parameters, as well.

  // Use eq.Modify, eq.Insert, eq.Claim, etc., probably in goroutines.

This backend is highly PostgreSQL-specific, as it requires the ability to issue a SELECT ... FOR UPDATE SKIP LOCKED query in order to performantly claim tasks. MySQL has similar support, so a similar backend could be written for it if desired.

Unfortunately, CockroachDB does not contain support for the necessary SQL statements, even though it speaks the PostgreSQL wire format. It cannot be used in place of PostgreSQL without implementing an entirely new backend (not impossible, just not done).

Starting a PostgreSQL Instance

If you wish to start an EntroQ service backed by PostgreSQL, you can run the database and EntroQ service in containers on the same Docker network fairly easily.

Note that no matter how you run things, there is no need to create any tables in your database. The EntroQ service checks for the existence of a tasks table and creates it if it is not present. It is also carefully set up to update older tables when newer versions are deployed.

The eqc command-line utility is included in the entroq container, so you can play around with it using docker exec.

If you are using version 0.7, for example, you can run

docker exec shiblon/entroq:v0.7 eqc --help

An example docker-compose file is shown below that should give you the idea of how they interoperate.

Note that we use /tmp for the example below. This is not recommended in production for obvious reasons, but should illuminate the way things fit together.

version: "3"
    image: "postgres:12"
        condition: any
    restart: always
      - /tmp/postgres/data:/var/lib/postgresql/data

    image: "shiblon/entroq:v0.7"
      - database
        condition: any
    restart: always
      - 37706:37706
      - "pg"
      - "--dbaddr=database:5432"
      - "--attempts=10"

This starts up PostgreSQL and EntroQ, where EntroQ will make multiple attempts to connect to the database before giving up, allowing PostgreSQL some time to get its act together.


The qsvc directory contains the gRPC service implementation that is found in the Docker container shiblon/entroq. This service exposes the endpoints found in proto. Working services using various backends are found in the cmd directories, e.g.,

  • cmd/eqmemsvc
  • cmd/eqpgsvc

You can build any of these and start them on desired ports and with desired backend connections based on flag settings.

There is no service backend for grpc, though one could conceivably make sense as a sort of proxy. But in that case you should really just use a standard gRPC proxy. There are very good reasons to not build your own gRPC proxy, no matter how convenient it might seem given the architecture.

When using one of these services, this is your main server and should be treated as a singleton. Clients should use the grpc backend to connect to it.

Does making the server a singleton affect performance? Yes, of course, you can't scale a singleton, but in practice if you are hitting a work queue that hard you likely have a granularity problem in your design. Additionally, a single process like this can easily handle thousands of workers.

Python Implementation

In contrib/py there is an implementation of a gRPC client in Python, including a basic CLI. You can use this to talk to a gRPC EntroQ service, and it includes a basic worker implementation so that you can relatively easily replace uses of Celery in Python.

The Python implementation is made to be pip-installable directly from github:

$ python3 -m pip install git+https://github.com/shiblon/entroq

This creates the entroq module and supporting protocol buffers beneath it. Use of these can be seen in the command-line client, implemented in __main__.py.


One of the most complete Go examples that is also used as a stress test is a very naive implementation of MapReduce, in contrib/mr. If you look in there, you will see numerous idiomatic uses of the competing queue concept, complete with workers, using queue empty tests a part of system semantics, queues assigned to specific processes, and others.


EntroQ, when run by itself, doesn't do any authorization. If you simply include the library into a process, and access the backends directly (not the gRPC backend), then authorization is, in fact, not possible: you just have access to everything.

If you do want to include authorization, however, there's good news: the gRPC service does allow authorization, and there is an OPA-based implementation of it ready to go and available for both in-memory and postgres backends.

To use the OPA-HTTP strategy (where gRPC service request authorization is sent to the Open Policy Agent to get authorization approval or failure messages), you can specify the --authz=opahttp flag on the command line for the various services you can run.

Note that this means that you would need to have a working OPA instance with appropriate packages running at a location that you can specify.

How it Works

The eqc client has the ability to accept an authorization token, which it passes through gRPC in the standard Authorization: Bearer <token> HTTP header. If the OPA HTTP authorization strategy is enabled in the service flags, the server then packages up this header into a request, along with the desired actions on the desired queues (e.g., a claim on an inbox queue), and sends that request along to OPA.

The authorization token is passed in two places: within the request itself, and in the standard Authorization HTTP header. This gives you some flexibility: you can use the OPA system authorization to get an input.identity created for you, or you can just unpack the input fields and do that by hand, bypassing the OPA internal authorization and just focusing on getting answers about your specific query.

OPA must then have an entroq.authz package that is shapaed like an authz.AuthzError type, defined in the authz package.

The opadata directory contains configurations that work in precisely this way, but it is important to understand the delineation of responsibilities first:

  • EntroQ client:

    • Sends the authorization token in an Authorization header when asked.
  • EntroQ service:

    • Forwards the Authorization header to OPA with a request representing desired queues and actions.
    • Unpackes the OPA response and allows or disallows the request, accordingly.
    • Packages up any unauthorized responses into structured errors for the client, if structure is desired.
  • OPA:

    • Unpack the authorization token to get user information, if any.
    • Produce a set of "permitted queues and actions" that can be matched against the request.
    • Compare and produce either "allow" or a set of "failed" queues and actions, with error messages.
  • Some other system:

    • Do authentication, generate tokens.

Of note: there are two critical responsibilities that EntroQ does not participate in at all:

  • Generation of authorization tokens (from an authentication process), and
  • Interpretation of authorization tokens.

Another system must be used for authentication and production of valid tokens for a user. EntroQ has zero opinions on that matter.

Furthermore, OPA only inspects the authorization token, it does not produce one.

Because EntroQ is particular about what it sends as "input" and what it receives as a "document" (in OPA parlance), some core OPA packages are already provided for you, under authz/opadata. These files should be used without alteration in any OPA configuration that you ultimately use. They contain mehods for comparing queue specs, and the entroq.authz package in particular ensures that data is both properly shaped and has proper error semantics for a reply.

The system user (deployer) is responsible for providing the following values:

  • entroq.permissions.allowed_queues: a set of queue specifications shaped like Queue in authz, and
  • entroq.user.username: a string containing a username, can be empty or undefined.

Example configurations that are not terribly secure in how users are determined (e.g., no JWT validation) are found in authz/opadata/conf/example, and policy data in the shape understood by those example files is found in authz/opadata/policy/example.

All of these have associated tests that can be run in the standard way, or you can invoke them using go test inside the authz direcory.

These examples are used in contrib/opa-compose, where a docker-compose.yaml file shows an example of how you might set up an EntroQ and OPA instance side by side, using simple JWT tokens to hold sub claims with usernames.

The basic idea is this:

  • Define entroq.user.username such that the username is safely pulled from whatever kind of token your system needs.
  • Define entroq.permissions.allowed_queues to contain all queue specifications that are relevant for the user you get from entroq.user.
  • Define policy in whatever way you prefer (there are many possibilities of how to provide "data" to OPA - we chose, for our example, to provide it as an entroq.policy package, but you may choose to use a data service, push documents directly into OPA, etc.).

After that, the core files and EntroQ itself do the rest. You just have to have valid tokens, which you will need to get from somewhere, and OPA will need to know enough to unpack and validate them (e.g., it might need the signing key).



Copyright 2019 Chris Monson <shiblon@gmail.com>

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at


Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Package entroq contains the main task queue client and data definitions. The client relies on a backend to implement the actual transactional functionality, the interface for which is also defined here.



View Source
const (
	DefaultClaimPollTime = 30 * time.Second
	DefaultClaimDuration = 30 * time.Second
View Source
const DefaultRetryDelay = 30 * time.Second

DefaultRetryDelay is the amount by which to advance the arrival time when a worker task errors out as retryable.


This section is empty.


func DefaultErrQMap added in v0.3.0

func DefaultErrQMap(inbox string) string

DefaultErrQMap appends "/err" to the inbox, and is the default behavior if no overriding error queue mapping options are provided.

func IsCanceled

func IsCanceled(err error) bool

IsCanceled indicates whether the error is a canceled error.

func IsTimeout

func IsTimeout(err error) bool

IsTimeout indicates whether the error is a timeout error.

func NotifyModified

func NotifyModified(n Notifier, inserted, changed []*Task)

NotifyModified takes inserted and changed tasks and notifies once per unique queue/ID pair.

func ProcessTime added in v0.3.0

func ProcessTime() time.Time

ProcessTime returns the time the calling process thinks it is, in UTC.

func QueuesFromStats added in v0.3.3

func QueuesFromStats(stats map[string]*QueueStat, err error) (map[string]int, error)

QueuesFromStats can be used for converting the new QueueStats to the old Queues output, making it easier on backend implementers to just define one function (similar to how WaitTryClaim or PollTryClaim can make implementing Claim in terms of TryClaim easier).


type Backend

type Backend interface {
	// Queues returns a mapping from all known queues to their task counts.
	Queues(ctx context.Context, qq *QueuesQuery) (map[string]int, error)

	// QueueStats returns statistics for the specified queues query. Richer
	// than just calling Queues, as it can return more than just the size.
	QueueStats(ctx context.Context, qq *QueuesQuery) (map[string]*QueueStat, error)

	// Tasks retrieves all tasks from the given queue. If claimantID is
	// specified (non-zero), limits those tasks to those that are either
	// expired or belong to the given claimant. Otherwise returns them all.
	Tasks(ctx context.Context, tq *TasksQuery) ([]*Task, error)

	// TryClaim attempts to claim a task from the "top" (or close to it) of the
	// given queue. When claimed, a task is held for the duration specified
	// from the time of the claim. If claiming until a specific wall-clock time
	// is desired, the task should be immediately modified after it is claimed
	// to set At to a specific time. Returns a nil task and a nil error if
	// there is nothing to claim. Will fail with a DependencyError is a
	// specific task ID is requested but not present.
	TryClaim(ctx context.Context, cq *ClaimQuery) (*Task, error)

	// Claim is a blocking version of TryClaim, attempting to claim a task
	// from a queue, and blocking until canceled or a task becomes available.
	// Will fail with a DependencyError is a specific task ID is requested but
	// not present. Never returns both a nil task and a nil error
	// simultaneously: a failure to claim a task is an error (potentially just
	// a timeout).
	Claim(ctx context.Context, cq *ClaimQuery) (*Task, error)

	// Modify attempts to atomically modify the task store, and only succeeds
	// if all dependencies are available and all mutations are either expired
	// or already owned by this claimant. The Modification type provides useful
	// functions for determining whether dependencies are good or bad. This
	// function is intended to return a DependencyError if the transaction could
	// not proceed because dependencies were missing or already claimed (and
	// not expired) by another claimant.
	Modify(ctx context.Context, mod *Modification) (inserted []*Task, changed []*Task, err error)

	// Time returns the time as the backend understands it, in UTC.
	Time(ctx context.Context) (time.Time, error)

	// Close closes any underlying connections. The backend is expected to take
	// ownership of all such connections, so this cleans them up.
	Close() error

Backend describes all of the functions that any backend has to implement to be used as the storage for task queues.

type BackendClaimFunc

type BackendClaimFunc func(ctx context.Context, eq *ClaimQuery) (*Task, error)

BackendClaimFunc is a function that can make claims based on a ClaimQuery. It is a convenience type for backends to use.

type BackendOpener

type BackendOpener func(ctx context.Context) (Backend, error)

BackendOpener is a function that can open a connection to a backend. Creating a client with a specific backend is accomplished by passing one of these functions into New.

type ChangeArg

type ChangeArg func(m *Modification, t *Task)

ChangeArg is an argument to the Changing function used to create arguments for Modify, e.g., to change the queue and set the expiry time of a task to 5 minutes in the future, you would do something like this:

      QueueTo("a new queue"),
	     ArrivalTimeBy(5 * time.Minute)))

func ArrivalTimeBy

func ArrivalTimeBy(d time.Duration) ChangeArg

ArrivalTimeBy sets the arrival time to a time in the future, by the given duration.

func ArrivalTimeTo

func ArrivalTimeTo(at time.Time) ChangeArg

ArrivalTimeTo sets a specific arrival time on a changed task in the Changing function.

func AttemptToNext added in v0.4.3

func AttemptToNext() ChangeArg

AttemptToNext sets the Attempt field in Task to the next value (increments it).

func AttemptToZero added in v0.4.3

func AttemptToZero() ChangeArg

AttemptToZero resets the Attempt field to zero.

func ErrTo added in v0.4.3

func ErrTo(e string) ChangeArg

ErrTo sets the Err field in the task.

func ErrToZero added in v0.4.3

func ErrToZero() ChangeArg

ErrToZero sets the Err field to its zero value (clears the error).

func QueueTo

func QueueTo(q string) ChangeArg

QueueTo creates an option to modify a task's queue in the Changing function.

func ValueTo

func ValueTo(v []byte) ChangeArg

ValueTo sets the changing task's value to the given byte slice.

type ClaimOpt

type ClaimOpt func(*ClaimQuery)

ClaimOpt modifies limits on a task claim.

func ClaimAs

func ClaimAs(id uuid.UUID) ClaimOpt

ClaimAs sets the claimant ID for a claim operation. When not set, uses the internal default for this client.

func ClaimFor added in v0.3.0

func ClaimFor(duration time.Duration) ClaimOpt

ClaimFor sets the duration of a successful claim (the amount of time from now when it expires).

func ClaimPollTime

func ClaimPollTime(d time.Duration) ClaimOpt

ClaimPollTime sets the polling time for a claim. Set to DefaultClaimPollTime if left at 0.

func From added in v0.3.0

func From(qs ...string) ClaimOpt

From sets the queue(s) for a claim.

type ClaimQuery

type ClaimQuery struct {
	Queues   []string      // Queues to attempt to claim from. Only one wins.
	Claimant uuid.UUID     // The ID of the process trying to make the claim.
	Duration time.Duration // How long the task should be claimed for if successful.
	PollTime time.Duration // Length of time between (possibly interruptible) sleep and polling.

ClaimQuery contains information necessary to attempt to make a claim on a task in a specific queue.

type DependencyError

type DependencyError struct {
	Inserts []*TaskID
	Depends []*TaskID
	Deletes []*TaskID
	Changes []*TaskID

	Claims []*TaskID

	Message string

DependencyError is returned when a dependency is missing when modifying the task store.

func AsDependency

func AsDependency(err error) (DependencyError, bool)

AsDependency indicates whether the given error is a dependency error.

func DependencyErrorf

func DependencyErrorf(msg string, vals ...interface{}) DependencyError

DependencyErrorf creates a new dependency error with the given message.

func (DependencyError) Copy

Copy produces a new deep copy of this error type.

func (DependencyError) Error

func (m DependencyError) Error() string

Error produces a helpful error string indicating what was missing.

func (DependencyError) HasClaims

func (m DependencyError) HasClaims() bool

HasClaims indicates whether any of the tasks were claimed by another claimant and unexpired.

func (DependencyError) HasCollisions

func (m DependencyError) HasCollisions() bool

HasCollisions indicates whether any of the inserted tasks collided with existing IDs.

func (DependencyError) HasMissing

func (m DependencyError) HasMissing() bool

HasMissing indicates whether there was anything missing in this error.

func (DependencyError) OnlyClaims added in v0.6.1

func (m DependencyError) OnlyClaims() bool

OnlyClaims indicates that the error was only related to claimants. Useful for backends to do "force" operations, making it easy to ignore this particular error.

type DependencyHandler added in v0.3.17

type DependencyHandler func(err DependencyError) error

DependencyHandler is called (if set) when a worker run finishes with a dependency error. If it returns a non-nil error, that converts into a fatal error.

type EntroQ

type EntroQ struct {
	// contains filtered or unexported fields

EntroQ is a client interface for accessing the task queue.

func New

func New(ctx context.Context, opener BackendOpener, opts ...Option) (*EntroQ, error)

New creates a new task client with the given backend implementation, for example, to use an in-memory implementation:

cli, err := New(ctx, mem.Opener())

func (*EntroQ) Claim

func (c *EntroQ) Claim(ctx context.Context, opts ...ClaimOpt) (*Task, error)

Claim attempts to get the next unclaimed task from the given queues. It blocks until one becomes available or until the context is done. When it succeeds, it returns a task with the claimant set to the default, or to the value given in options, and an arrival time computed from the duration. The default duration if none is given is DefaultClaimDuration.

func (*EntroQ) Close

func (c *EntroQ) Close() error

Close closes the underlying backend.

func (*EntroQ) DoWithRenew

func (c *EntroQ) DoWithRenew(ctx context.Context, task *Task, lease time.Duration, f func(context.Context) error) (*Task, error)

DoWithRenew runs the provided function while keeping the given task lease renewed.

func (*EntroQ) DoWithRenewAll

func (c *EntroQ) DoWithRenewAll(ctx context.Context, tasks []*Task, lease time.Duration, f func(context.Context) error) ([]*Task, error)

DoWithRenewAll runs the provided function while keeping all given tasks leases renewed.

func (*EntroQ) ID

func (c *EntroQ) ID() uuid.UUID

ID returns the default claimant ID of this client. Used in "bare" calls, like Modify, Claim, etc. To change the ID per call (usually not needed, and can be dangerous), use the "As" calls, e.g., ModifyAs.

func (*EntroQ) Modify

func (c *EntroQ) Modify(ctx context.Context, modArgs ...ModifyArg) (inserted []*Task, changed []*Task, err error)

Modify allows a batch modification operation to be done, gated on the existence of all task IDs and versions specified. Deletions, Updates, and Dependencies must be present. The transaction all fails or all succeeds.

Returns all inserted task IDs, and an error if it could not proceed. If the error was due to missing dependencies, a *DependencyError is returned, which can be checked for by calling AsDependency(err).

func (*EntroQ) NewWorker

func (c *EntroQ) NewWorker(qs ...string) *Worker

NewWorker is a convenience method on an EntroQ client to create a worker.

func (*EntroQ) QueueStats added in v0.3.3

func (c *EntroQ) QueueStats(ctx context.Context, opts ...QueuesOpt) (map[string]*QueueStat, error)

QueueStats returns a mapping from queue names to task stats.

func (*EntroQ) Queues

func (c *EntroQ) Queues(ctx context.Context, opts ...QueuesOpt) (map[string]int, error)

Queues returns a mapping from all queue names to their task counts.

func (*EntroQ) QueuesEmpty

func (c *EntroQ) QueuesEmpty(ctx context.Context, opts ...QueuesOpt) (bool, error)

QueuesEmpty indicates whether the specified task queues are all empty. If no options are specified, returns an error.

func (*EntroQ) RenewAllFor

func (c *EntroQ) RenewAllFor(ctx context.Context, tasks []*Task, duration time.Duration) (result []*Task, err error)

RenewAllFor attempts to renew all given tasks' leases (update arrival times) for the given duration. Returns the new tasks.

func (*EntroQ) RenewFor

func (c *EntroQ) RenewFor(ctx context.Context, task *Task, duration time.Duration) (*Task, error)

RenewFor attempts to renew the given task's lease (update arrival time) for the given duration. Returns the new task.

func (*EntroQ) Tasks

func (c *EntroQ) Tasks(ctx context.Context, queue string, opts ...TasksOpt) ([]*Task, error)

Tasks returns a slice of all tasks in the given queue.

func (*EntroQ) Time added in v0.3.0

func (c *EntroQ) Time(ctx context.Context) (time.Time, error)

Time gets the time as the backend understands it, in UTC. Default is just time.Now().UTC().

func (*EntroQ) TryClaim

func (c *EntroQ) TryClaim(ctx context.Context, opts ...ClaimOpt) (*Task, error)

TryClaim attempts one time to claim a task from the given queues. If there are no tasks, it returns a nil error *and* a nil task. This allows the caller to decide whether to retry. It can fail if certain (optional) dependency tasks are not present. This can be used, for example, to ensure that configuration tasks haven't changed.

func (*EntroQ) WaitQueuesEmpty added in v0.3.0

func (c *EntroQ) WaitQueuesEmpty(ctx context.Context, opts ...QueuesOpt) error

WaitQueuesEmpty does a poll-and-wait strategy to block until the queue query returns empty.

type ErrQMap added in v0.3.0

type ErrQMap func(inbox string) string

ErrQMap is a function that maps from an inbox name to its "move on error" error box name. If no mapping is found, a suitable default should be returned.

type IDOption added in v0.5.1

type IDOption func(id *TaskID)

IDOption is an option for things that require task ID information. Allows additional ID-related metadata to be passed.

func WithIDQueue added in v0.5.1

func WithIDQueue(q string) IDOption

WithIDQueue specifies the queue for a particular task ID.

type InsertArg

type InsertArg func(*Modification, *TaskData)

InsertArg is an argument to task insertion.

func WithArrivalTime

func WithArrivalTime(at time.Time) InsertArg

WithArrivalTime changes the arrival time to a fixed moment during task insertion.

func WithArrivalTimeIn

func WithArrivalTimeIn(duration time.Duration) InsertArg

WithArrivalTimeIn computes the arrival time based on the duration from now, e.g.,

  InsertingInto("my queue",
    WithTimeIn(2 * time.Minute)))

func WithAttempt added in v0.4.3

func WithAttempt(value int32) InsertArg

WithAttempt sets the number of attempts for this task. Usually not needed, handled automatically by the worker.

func WithErr added in v0.4.3

func WithErr(value string) InsertArg

WithErr sets the error field of a task during insertion. Usually not needed, as tasks are typically modified to add errors, not inserted with them.

func WithID

func WithID(id uuid.UUID) InsertArg

WithID sets the task's ID for insertion. This is not normally needed, as the backend will assign a new, unique ID for this task if none is specified. There are cases where assigning an explicit insertion ID (always being careful that it is unique) can be useful, however.

For example, a not uncommon need is for a worker to do the following:

- Claim a task,
- Make database entries corresponding to downstream work,
- Insert tasks for the downstream work and delete claimed task.

If the database entries need to reference the tasks that have not yet been inserted (e.g., if they need to be used to get at the status of a task), it is not safe to simply update the database after insertion, as this introduces a race condition. If, for example, the following strategy is employed, then the task IDs may never make it into the database:

- Claim a task,
- Make database entries
- Insert tasks and delete claimed task
- Update database with new task IDs

In this event, it is entirely possible to successfully process the incoming task and create the outgoing tasks, then lose network connectivity and fail to add those IDs to the databse. Now it is no longer possible to update the database appropriately: the task information is simply lost.

Instead, it is safe to do the following:

- Claim a task
- Make database entries, including with to-be-created task IDs
- Insert tasks with those IDs and delete claimed task.

This avoids the potential data loss condition entirely.

There are other workarounds for this situation, like using a two-step creation process and taking advantage of the ability to move tasks between queues without disturbing their ID (only their version), but this is not uncommon enough to warrant requiring the extra worker logic just to get a task ID into the database.

func WithSkipColliding added in v0.2.0

func WithSkipColliding(s bool) InsertArg

WithSkipColliding sets the insert argument to allow itself to be removed if the only error encountered is an ID collision. This can help when it is desired to insert multiple tasks, but a previous subset was already inserted with similar IDs. Sometimes you want to specify a superset to "catch what we missed".

func WithValue

func WithValue(value []byte) InsertArg

WithValue sets the task's byte slice value during insertion.

  InsertingInto("my queue",
    WithValue([]byte("hi there"))))

type Modification

type Modification struct {
	Claimant uuid.UUID `json:"claimant"`

	Inserts []*TaskData `json:"inserts"`
	Changes []*Task     `json:"changes"`
	Deletes []*TaskID   `json:"deletes"`
	Depends []*TaskID   `json:"depends"`
	// contains filtered or unexported fields

Modification contains all of the information for a single batch modification in the task store.

func NewModification

func NewModification(claimant uuid.UUID, modArgs ...ModifyArg) *Modification

NewModification creates a new modification: insertions, deletions, changes, and dependencies all together. When creating this for the purpose of passing to WithModification, set the claimant to uuid.Nil (it is ignored in that case).

func (*Modification) AllDependencies

func (m *Modification) AllDependencies() (map[uuid.UUID]int32, error)

AllDependencies returns a dependency map from ID to version for tasks that must exist and match version numbers. It returns an error if there are duplicates. Changes, Deletions, Dependencies and Insertions with IDs must be disjoint sets.

When using this to query backend storage for the presence of tasks, note that it is safe to ignore the version if you use the DependencyError method to determine whether a transaction can proceed. That checks versions appropriate for all different types of modification.

func (*Modification) DependencyError

func (m *Modification) DependencyError(found map[uuid.UUID]*Task) error

DependencyError returns a DependencyError if there are problems with the dependencies found in the backend, or nil if everything is fine. Problems include missing or claimed dependencies, both of which will block a modification.

func (*Modification) String added in v0.6.1

func (m *Modification) String() string

String produces a friendly version of this modification.

type ModifyArg

type ModifyArg func(m *Modification)

ModifyArg is an argument to the Modify function, which does batch modifications to the task store.

func Changing

func Changing(task *Task, changeArgs ...ChangeArg) ModifyArg

Changing adds a task update to a Modify call, e.g., to modify the queue a task belongs in:

cli.Modify(ctx, Changing(myTask, QueueTo("a different queue name")))

func Deleting

func Deleting(id uuid.UUID, version int32, opts ...IDOption) ModifyArg

Deleting adds a deletion to a Modify call, e.g.,

cli.Modify(ctx, Deleting(id, version))

func DependingOn

func DependingOn(id uuid.UUID, version int32, opts ...IDOption) ModifyArg

DependingOn adds a dependency to a Modify call, e.g., to insert a task into "my queue" with data "hey", but only succeeding if a task with anotherID and someVersion exists:

  InsertingInto("my queue",
    DependingOn(anotherID, someVersion))

func Inserting

func Inserting(tds ...*TaskData) ModifyArg

Inserting creates an insert modification from TaskData:

		Queue: "myqueue",
		At:    time.Now.Add(1 * time.Minute),
		Value: []byte("hi there"),

Or, better still,

	    WithArrivalTimeIn(1 * time.Minute),
	    WithValue([]byte("hi there"))))

func InsertingInto

func InsertingInto(q string, insertArgs ...InsertArg) ModifyArg

InsertingInto creates an insert modification. Use like this:

cli.Modify(InsertingInto("my queue name", WithValue([]byte("hi there"))))

func ModifyAs

func ModifyAs(id uuid.UUID) ModifyArg

ModifyAs sets the claimant ID for a particular modify call. Usually not needed, can be dangerous unless used with extreme care. The client default is used if this option is not provided.

func WithModification

func WithModification(src *Modification) ModifyArg

WithModification returns a ModifyArg that merges the given Modification with whatever it is so far. Ignores Claimant field, and simply appends to all others.

type MoveTaskError added in v0.2.7

type MoveTaskError struct {
	Err error

MoveTaskError causes a task to be moved to a specified queue. This can be useful when non-fatal task-specific errors happen in a worker and we want to stash them somewhere instead of just causing the worker to crash, but allows us to handle that as an early error return. The error is added to the task.

func MoveTaskErrorf added in v0.7.1

func MoveTaskErrorf(format string, values ...interface{}) *MoveTaskError

MoveTaskErrorf creates a MoveTaskError given a format string and values, just like fmt.Errorf.

func NewMoveTaskError added in v0.2.7

func NewMoveTaskError(err error) *MoveTaskError

NewMoveTaskError creates a new MoveTaskError from the given error.

func (*MoveTaskError) Error added in v0.2.7

func (e *MoveTaskError) Error() string

Error produces an error string.

type Notifier

type Notifier interface {
	// Notify signals an event on the key. Wakes up one waiter, if any, or is
	// dropped if no waiters are waiting.
	Notify(key string)

Notifier can be notified on a given key (e.g., queue name);

type NotifyWaiter

type NotifyWaiter interface {

NotifyWaiter can wait for and notify events.

type Option

type Option func(*EntroQ)

Option is an option that modifies how EntroQ clients are created.

func WithClaimantID

func WithClaimantID(id uuid.UUID) Option

WithClaimantID sets the default claimaint ID for this client.

type QueueStat added in v0.3.3

type QueueStat struct {
	Name      string `json:"name"`      // The queue name.
	Size      int    `json:"size"`      // The total number of tasks.
	Claimed   int    `json:"claimed"`   // The number of currently claimed tasks.
	Available int    `json:"available"` // The number of available tasks.

	MaxClaims int `json:"maxClaims"` // The maximum number of claims for a task in the queue.

QueueStat holds high-level information about a queue. Note that available + claimed may not add up to size. This is because a task can be unavailable (AT in the future) without being claimed by anyone.

type QueuesOpt

type QueuesOpt func(*QueuesQuery)

QueuesOpt modifies how queue requests are made.

func LimitQueues

func LimitQueues(limit int) QueuesOpt

LimitQueues sets the limit on the number of queues that are returned.

func MatchExact

func MatchExact(matches ...string) QueuesOpt

MatchExact adds an allowable exact match for a queue listing.

func MatchPrefix

func MatchPrefix(prefixes ...string) QueuesOpt

MatchPrefix adds allowable prefix matches for a queue listing.

type QueuesQuery

type QueuesQuery struct {
	// MatchPrefix specifies allowable prefix matches. If empty, limitations
	// are not set based on prefix matching. All prefix match conditions are ORed.
	// If both this and MatchExact are empty or nil, no limitations are set on
	// queue name: all will be returned.
	MatchPrefix []string

	// MatchExact specifies allowable exact matches. If empty, limitations are
	// not set on exact queue names.
	MatchExact []string

	// Limit specifies an upper bound on the number of queue names returned.
	Limit int

QueuesQuery modifies a queue listing request.

type RetryTaskError added in v0.4.3

type RetryTaskError struct {
	Err error

RetryTaskError causes a task to be retried, incrementing its Attempt field and setting its Err to the text of the error. If MaxAttempts is positive and nonzero, and has been reached, then this behaves in the same ways as a MoveTaskError.

func NewRetryTaskError added in v0.4.3

func NewRetryTaskError(err error) *RetryTaskError

NewRetryTaskError creates a new RetryTaskError from the given error.

func RetryTaskErrorf added in v0.7.1

func RetryTaskErrorf(format string, values ...interface{}) *RetryTaskError

RetryTaskErrorf creates a RetryTaskError in the same way that you would create an error with fmt.Errorf.

func (*RetryTaskError) Error added in v0.4.3

func (e *RetryTaskError) Error() string

Error produces an error string.

type Task

type Task struct {
	Queue string `json:"queue"`

	ID      uuid.UUID `json:"id"`
	Version int32     `json:"version"`

	At       time.Time `json:"at"`
	Claimant uuid.UUID `json:"claimant"`
	Claims   int32     `json:"claims"`
	Value    []byte    `json:"value"`

	Created  time.Time `json:"created"`
	Modified time.Time `json:"modified"`

	// FromQueue specifies the previous queue for a task that is moving to another queue.
	// Usually not present, can be used for change authorization (since two queues are in play, there).
	FromQueue string `json:"fromqueue,omitempty"`

	// Worker retry logic uses these fields when moving tasks and when retrying them.
	// It is left up to the consumer to determine how many attempts is too many
	// and to produce a suitable retry or move error.
	Attempt int32  `json:"attempt"`
	Err     string `json:"err"`

Task represents a unit of work, with a byte slice value payload. Note that Claims is the number of times a task has successfully been claimed. This is different than the version number, which increments for every modification, not just claims.

func PollTryClaim

func PollTryClaim(ctx context.Context, eq *ClaimQuery, tc BackendClaimFunc) (*Task, error)

PollTryClaim runs a loop in which the TryClaim function is called between sleeps with exponential backoff (up to a point). Backend implementations may choose to use this as their Claim implementation.

func WaitTryClaim

func WaitTryClaim(ctx context.Context, eq *ClaimQuery, tc BackendClaimFunc, w Waiter) (*Task, error)

WaitTryClaim runs a loop in which the TryClaim function is called, then if no tasks are available, the given wait function is used to attempt to wait for a task to become available on the queue.

The wait function should exit (more or less) immediately if the context is canceled, and should return a nil error if the wait was successful (something became available).

func (*Task) AsChange

func (t *Task) AsChange(args ...ChangeArg) ModifyArg

AsChange returns a ModifyArg that can be used in the Modify function, e.g.,

cli.Modify(ctx, task1.AsChange(ArrivalTimeBy(2 * time.Minute)))

The above is shorthand for

cli.Modify(ctx, Changing(task1, ArrivalTimeBy(2 * time.Minute)))

func (*Task) AsDeletion

func (t *Task) AsDeletion() ModifyArg

AsDeletion returns a ModifyArg that can be used in the Modify function, e.g.,

cli.Modify(ctx, task1.AsDeletion())

The above would cause the given task to be deleted, if it can be. It is shorthand for

cli.Modify(ctx, Deleting(task1.ID, task1.Version, WithIDQueue(task1.Queue)))

func (*Task) AsDependency

func (t *Task) AsDependency() ModifyArg

AsDependency returns a ModifyArg that can be used to create a Modify dependency, e.g.,

cli.Modify(ctx, task.AsDependency())

That is shorthand for

cli.Modify(ctx, DependingOn(task.ID, task.Version, WithIDQueue(task.Queue)))

func (*Task) Copy

func (t *Task) Copy() *Task

Copy copies this task's data and everything.

func (*Task) CopyOmitValue added in v0.3.9

func (t *Task) CopyOmitValue() *Task

CopyOmitValue copies this task but leaves the value blank.

func (*Task) CopyWithValue added in v0.6.1

func (t *Task) CopyWithValue(ok bool) *Task

CopyWithValue lets you specify whether the value should be copied.

func (*Task) Data

func (t *Task) Data() *TaskData

Data returns the data for this task.

func (*Task) IDVersion

func (t *Task) IDVersion() *TaskID

ID returns a Task ID from this task.

func (*Task) String

func (t *Task) String() string

String returns a useful representation of this task.

type TaskData

type TaskData struct {
	Queue string    `json:"queue"`
	At    time.Time `json:"at"`
	Value []byte    `json:"value"`

	// Attempt indicates which "attempt number" this task is on. Used by workers.
	Attempt int32 `json:"attempt"`

	// Err contains error information for this task. Used by workers.
	Err string `json:"err"`

	// ID is an optional task ID to be used for task insertion.
	// Default (uuid.Nil) causes the backend to assign one, and that is
	// sufficient for many cases. If you desire to make a database entry that
	// *references* a task, however, in that case it can make sense to specify
	// an explicit task ID for insertion. This allows a common workflow cycle
	// 	consume task -> db update -> insert tasks
	// to be done safely, where the database update needs to refer to
	// to-be-inserted tasks.
	ID uuid.UUID `json:"id"`

	// These timings are here so that journaling can restore full state.
	// Usually they are blank, and there are no convenience methods to allow
	// them to be set. Leave them at default values in all cases.
	Created  time.Time `json:"created"`
	Modified time.Time `json:"modified"`
	// contains filtered or unexported fields

TaskData contains just the data, not the identifier or metadata. Used for insertions.

func (*TaskData) String added in v0.2.0

func (t *TaskData) String() string

String returns a string representation of the task data, excluding the value.

type TaskID

type TaskID struct {
	ID      uuid.UUID `json:"id"`
	Version int32     `json:"version"`

	Queue string `json:"queue,omitempty"`

TaskID contains the identifying parts of a task. If IDs don't match (identifier and version together), then operations fail on those tasks.

Also contains the name of the queue in which this task resides. Can be omitted, as it does not effect functionality, but might be required for authorization, which is performed based on queue name. Present whenever using tasks as a source of IDs.

func NewTaskID added in v0.5.1

func NewTaskID(id uuid.UUID, version int32, opts ...IDOption) *TaskID

NewTaskID creates a new TaskID with given options.

func (TaskID) AsDeletion added in v0.3.4

func (t TaskID) AsDeletion() ModifyArg

AsDeletion produces an appropriate ModifyArg to delete the task with this ID.

func (TaskID) AsDependency added in v0.3.17

func (t TaskID) AsDependency() ModifyArg

AsDependency produces an appropriate ModifyArg to depend on this task ID.

func (TaskID) String

func (t TaskID) String() string

String produces the id:version string representation.

type TasksOpt

type TasksOpt func(*EntroQ, *TasksQuery)

TasksOpt is an option that can be passed into Tasks to control what it returns.

func LimitClaimant

func LimitClaimant(id uuid.UUID) TasksOpt

LimitClaimant only returns tasks with the given claimant, or expired tasks.

func LimitSelf

func LimitSelf() TasksOpt

LimitSelf only returns self-claimed tasks or expired tasks.

func LimitTasks

func LimitTasks(limit int) TasksOpt

LimitTasks sets the limit on the number of tasks to return. A value <= 0 indicates "no limit".

func OmitValues added in v0.3.9

func OmitValues() TasksOpt

OmitValues tells a tasks query to only return metadata, not values.

func WithTaskID

func WithTaskID(ids ...uuid.UUID) TasksOpt

WithTaskID adds a task ID to the set of IDs that can be returned in a task query. The default is "all that match other specs" if no IDs are specified. Note that versions are not part of the ID.

type TasksQuery

type TasksQuery struct {
	Queue    string
	Claimant uuid.UUID
	Limit    int
	IDs      []uuid.UUID

	// OmitValues specifies that only metadata should be returned.
	// Backends are not required to honor this flag, though any
	// service receiving it in a request should ensure that values
	// are not passed over the wire.
	OmitValues bool

TasksQuery holds information for a tasks query.

type Waiter

type Waiter interface {
	// Wait waits for an event on the given set of keys, calling cond after
	// poll intervals until one of them is notified, cond returns true, or the
	// context is canceled.
	// If cond is nil, this function returns when the channel is notified,
	// the poll interval is exceeded, or the context is canceled. Only the last
	// event causes a non-nil error.
	// If poll is 0, it can never be exceeded.
	// A common use is to use poll=0 and cond=nil, causing this to simply wait
	// for a notification.
	Wait(ctx context.Context, keys []string, poll time.Duration, cond func() bool) error

Waiter can wait for an event on a given key (e.g., queue name).

type Work added in v0.3.0

type Work func(ctx context.Context, task *Task) ([]ModifyArg, error)

Work is a function that is called by Run. It does work for one task, then returns any necessary modifications.

If this function returns a MoveTaskError, the original task is moved into a queue specified by calling ErrQMap on the original queue name. This is useful for keeping track of failed tasks by moving them out of the way instead of deleting them or allowing them to be picked up again.

If this function returns a RetryTaskError, the original task has its attempt field incremented, the err field is updated to contain the text of the error, and the worker goes around again, leaving it to be reclaimed. If the maximum number of attempts has been reached, however, the error acts like a MoveTaskError, instead.

type Worker

type Worker struct {
	// Qs contains the queues to work on.
	Qs []string

	// ErrQMap maps an inbox to the queue tasks are moved to if a MoveTaskError
	// is returned from a worker's run function.
	ErrQMap ErrQMap

	// OnDepErr can hold a function to be called when a dependency error is
	// encountered. if it returns a non-nil error, it will become fatal.
	OnDepErr DependencyHandler

	// MaxAttempts indicates how many attempts are too many before a retryable
	// error becomes permanent and the task is moved to an error queue.
	MaxAttempts int32
	// contains filtered or unexported fields

Worker creates an iterator-like protocol for processing tasks in a queue, one at a time, in a loop. Each worker should only be accessed from a single goroutine. If multiple goroutines are desired, they should each use their own worker instance.


w := eqClient.NewWorker("queue_name")
err := w.Run(ctx, func(ctx context.Context, task *Task) ([]ModifyArg, error) {
	// Do stuff with the task.
	// It's safe to mark it for deletion, too. It is renewed in the background.
	// If renewal changed its version, that is rewritten before modification.
	return []ModifyArg{task.AsDeletion()}, nil
// Handle the error, which is nil if the context was canceled (but not if
// it timed out).

func NewWorker added in v0.3.7

func NewWorker(eq *EntroQ, qs ...string) *Worker

NewWorker creates a new worker that makes it easy to claim and operate on tasks in an endless loop.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context, f Work) (err error)

Run attempts to run the given function once per each claimed task, in a loop, until the context is canceled or an unrecoverable error is encountered. The function can return modifications that should be done after it exits, and version numbers for claim renewals will be automatically updated.

func (*Worker) WithOpts added in v0.3.0

func (w *Worker) WithOpts(opts ...WorkerOption) *Worker

WithOpts sets options on a newly-created worker.

type WorkerOption

type WorkerOption func(*Worker)

WorkerOption can be passed to AnalyticWorker to modify the worker

func WithBaseRetryDelay added in v0.4.3

func WithBaseRetryDelay(d time.Duration) WorkerOption

WithBaseRetryDelay sets the base delay for a retried task (the first attempt). Without any backoff settings, this is used for every retry. When used, the task is modified when its attempt is incremented to have its availabiliy time incremented by this amount from now.

func WithDependencyHandler added in v0.3.17

func WithDependencyHandler(f DependencyHandler) WorkerOption

WithDependencyHandler sets a function to be called when a worker encounters a dependency error. If this function returns a non-nil error, the worker will exit.

Note that workers always exit on non-dependency errors, but usually treat dependency errors as things that can be retried. Specifying a handler for dependency errors allows different behavior as needed.

One possible use case for a dependency error handler is to reload a configuration task for the next round: if the task is depended on, but has been changed, the task can be retried, but configuration should also be reloaded, which could be done in a handler.

func WithErrQMap added in v0.3.0

func WithErrQMap(f ErrQMap) WorkerOption

WithErrQMap sets a function that maps from inbox queue names to error queue names. Defaults to DefaultErrQMap.

func WithLease

func WithLease(d time.Duration) WorkerOption

WithLease sets the frequency of task renewal. Tasks will be claimed for an amount of time slightly longer than this so that they have a chance of being renewed before expiring.

func WithMaxAttempts added in v0.4.3

func WithMaxAttempts(m int32) WorkerOption

WithMaxAttempts sets the maximum attempts that are allowed before a RetryTaskError turns into a MoveTaskError (transparently). If this value is 0 (the default), then there is no maximum, and attempts can be incremented indefinitely without a move to an error queue.


Path Synopsis
Package eqgrpc provides a gRPC backend for EntroQ. This is the backend that is commonly used by clients of an EntroQ task service, set up thus: Server: qsvc -> entroq library -> some backend (e.g., pg) Client: entroq library -> grpc backend You can start, for example, a postgres-backed QSvc like this (or just use pg/svc): ctx := context.Background() svc, err := qsvc.New(ctx, pg.Opener(dbHostPort)) // Other options available, too.
Package eqgrpc provides a gRPC backend for EntroQ. This is the backend that is commonly used by clients of an EntroQ task service, set up thus: Server: qsvc -> entroq library -> some backend (e.g., pg) Client: entroq library -> grpc backend You can start, for example, a postgres-backed QSvc like this (or just use pg/svc): ctx := context.Background() svc, err := qsvc.New(ctx, pg.Opener(dbHostPort)) // Other options available, too.
Package eqmem implements an in-memory entroq that has fine-grained locking and can handle simultaneously stats/task listing and modifications to a large extent.
Package eqmem implements an in-memory entroq that has fine-grained locking and can handle simultaneously stats/task listing and modifications to a large extent.
Package eqpg provides an entroq.Backend using PostgreSQL.
Package eqpg provides an entroq.Backend using PostgreSQL.
Command eqmemsvc starts up an in-memory EntroQ gRPC service.
Command eqmemsvc starts up an in-memory EntroQ gRPC service.
Package cmd holds the commands for the eqmemsvc application.
Package cmd holds the commands for the eqmemsvc application.
Command eqpgsvc starts a PostgreSQL-backed EntroQ gRPC service.
Command eqpgsvc starts a PostgreSQL-backed EntroQ gRPC service.
Package mr has a simple MapReduce implementation, one that does everything inside the task manager (no outside files).
Package mr has a simple MapReduce implementation, one that does everything inside the task manager (no outside files).
Package mrtest is a test package tightly tied to the mr package, separated out to avoid import cycles when other tests want to use it.
Package mrtest is a test package tightly tied to the mr package, separated out to avoid import cycles when other tests want to use it.
Package authz contains standard data structures for representing permissions and authorization requests / responses.
Package authz contains standard data structures for representing permissions and authorization requests / responses.
Package opahttp implements the authz.Authorizer using an Open Policy Agent (OPA).
Package opahttp implements the authz.Authorizer using an Open Policy Agent (OPA).
Package procworker implements a worker that reads a subprocess specification task, executes it, and puts results into an outbox.
Package procworker implements a worker that reads a subprocess specification task, executes it, and puts results into an outbox.
Package queues contains helper functions for manipulation of queue names.
Package queues contains helper functions for manipulation of queue names.
Package qsvc contains the service implementation for registering with gRPC.
Package qsvc contains the service implementation for registering with gRPC.
Package qtest contains standard testing routines for exercising various backends in similar ways.
Package qtest contains standard testing routines for exercising various backends in similar ways.
Package subq abstracts the idea of subscribing to a particular queue so that changes can be immediately notified.
Package subq abstracts the idea of subscribing to a particular queue so that changes can be immediately notified.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL