queue

package module
v0.0.0-...-f2407cf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2021 License: MIT Imports: 5 Imported by: 0

README

queue

This is a SQLite-backed persistent queue with:

  • Dependent tasks
  • Flexible scheduling (not before, etc.)
  • Max retries

Each task is composed of an (action, payload) tuple that is stored in SQlite. Payload is anything that can be marshaled to a blob via the TextMarshaler interface.


type MyTask struct {
	ThingToDo string
}

// MarshalText can be any encoding format, workers are responsible for knowing the payload encoding
func (m MyTask) MarshalText() ([]byte, error) {
	return json.Marshal(m)
}

taskID, err := queue.Enqueue("mytasks", MyTask{"do this thing"})
// error checking

payload, onSuccess, err := queue.Dequeue("mytasks")
processPayload(payload)

// everything ok, call onSuccess to delete from queue
// tasks will be requeued once they timeout if the processor crashes or errors out
onSuccess()

See the godoc for additional options.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultMaxTries int = 10

DefaultMaxTries permanently fails the task after this many failed tries

View Source
var DefaultTimeout time.Duration = 2 * time.Minute

DefaultTimeout is the default for enqueued items after which the task is assumed to have failed and will requeue for retry

View Source
var NoTask error = errors.New("no task")

NoTask is returned when dequeue is called and no task is available for that action

Functions

func Dequeue

func Dequeue(db DB, action string, opts ...Option) ([]byte, func() error, error)

Dequeue returns the oldest unclaimed task payload for action and a function to call on successful completion, achieving an exclusive lock on the task for a TTL. Dequeue will return nil and error of type NoTask when the queue is empty for that action.

func Enqueue

func Enqueue(db DB, action string, payload encoding.TextMarshaler, opts ...Option) (int64, error)

Enqueue creates a durable task record in the queue that will survive crashes and restarts

func Length

func Length(db DB, action string) (int, error)

Length returns the queue length for a particular action in a non-blocking read. Length can be used before Dequeue to prevent lock contention.

func MigrateDB

func MigrateDB(db DB) error

MigrateDB will apply SQL to migrate the database

func RequeueExpired

func RequeueExpired(db DB) (int, error)

RequeueExpired will re-add tasks to the queue when their TTL has expired. Note that you do not have to call this function manually unless you want to requeue immediately. Expired tasks are automatically requeued when a subsequent database operation is performed. For very low traffic queues, you may want to call this periodically to limit worst case delay before requeue.

Types

type DB

type DB interface {
	Exec(query string, args ...interface{}) (sql.Result, error)
	QueryRow(query string, args ...interface{}) *sql.Row
}

DB is the minimal interface to the persistence layer. It works with sqlx, standard database/sql, or other abstraction layers like ~scooter/db

type Option

type Option func(o *options)

func MaxTries

func MaxTries(t int) Option

MaxTries will fail the task after the specified number of tries. Set max tries to -1 to retry indefinitely.

func NotBefore

func NotBefore(t time.Time) Option

NotBefore will delay task execution until after time t

func OnSuccess

func OnSuccess(f func() error) Option

OnSuccess adds an additional function to be called after the task is successful. This can be used to queue a dependent task, finish clean up, or persist new state.

func Parent

func Parent(taskID int64) Option

Parent declares a dependency on another task. The task will not run until parent completes successfully.

func Timeout

func Timeout(d time.Duration) Option

Timeout adds a custom TTL for a task's exclusive lock before assuming failure and being eligible for requeue

type Task

type Task struct {
	ID       int64
	Action   string
	Payload  []byte
	Expires  int64
	Nbf      int64
	Tries    int
	MaxTries int `db:"max_tries"`
	Parent   sql.NullInt64
}

Task represents a task for a particular action with a payload stored in blob. Task processors are responsible for deserializing payloads for each action type. Successful tasks processors should call `OnSuccess` to remove the task from the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL