workqueue

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2024 License: MIT Imports: 7 Imported by: 4

Documentation

Overview

Package workqueue is the Go implementation of a work queue, on top of a redis database, with implementations in Python, Rust, Go, Node.js (TypeScript) and Dotnet (C#).

For an overview of how the work queue works, it’s limitations, and the general concepts and implementations in other languages, please read the redis-work-queue readme at https://github.com/MeVitae/redis-work-queue/blob/main/README.md.

Setup

import (
    "github.com/redis/go-redis/v9"
    workqueue "github.com/mevitae/redis-work-queue/go"
)

db := redis.NewClient(&redis.Options{
    Addr: "your-redis-server:1234",
})
workQueue := workqueue.NewWorkQueue(workqueue.KeyPrefix("example_work_queue"))

Adding Items

// From bytes:
exampleItem := NewItem([]byte("[1,2,3]"))
// Or from JSON:
jsonItem, err := NewItemFromJSONData([1, 2, 3])
// ...
err = workQueue.AddItem(ctx, db, jsonItem)

Completing work

Please read the documentation on leasing and completing items at https://github.com/MeVitae/redis-work-queue/blob/main/README.md#leasing-an-item

for (;;) {
   job, err := workQueue.Lease(ctx, db, true, 0, 5*time.Second)
   if err != nil { panic(err) }
   doSomeWork(job)
   _, err = workQueue.Complete(ctx, db, job)
   if err != nil { panic(err) }
}

Handling errors

Please read the documentation on handling errors at https://github.com/MeVitae/redis-work-queue/blob/main/README.md#handling-errors

for (;;) {
   job, err := workQueue.Lease(ctx, db, true, 0, 5*time.Second)
   if err != nil { panic(err) }
   err = doSomeWork(job)
   if err == nil {
       // Mark successful jobs as complete
       _, err = workQueue.Complete(ctx, db, job)
       if err != nil { panic(err) }
   } else if !shouldRetryAfter(err) {
       // Errors that shouldn't cause a retry should mark the job as complete so it isn't tried
       // again.
       logError(err)
       _, err = workQueue.Complete(ctx, db, job)
       if err != nil { panic(err) }
   } else {
       // Drop a job that should be retried - it will be returned to the work queue after the
       // (5 second) lease expires.
   }
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ItemDataJson

func ItemDataJson[T any](item *Item) (data T, err error)

ItemDataJson returns the data from an item, parsed as JSON to type T.

Types

type Item

type Item struct {
	ID   string `json:"id"`
	Data []byte `json:"data"`
}

Item for a work queue. Each item has an ID and associated data.

func NewItem

func NewItem(data []byte) Item

NewItem creates a new item with a random ID (a UUID).

func NewItemFromJSONData

func NewItemFromJSONData(data any) (Item, error)

NewItemFromJSONData creates a new item with a random ID (a UUID). The data is the result of json.Marshal(data).

func (*Item) ParseJsonData

func (item *Item) ParseJsonData(v any) error

ParseJsonData parses the item's data, as JSON, to v.

type KeyPrefix

type KeyPrefix string

KeyPrefix is a string which should be prefixed to an identifier to generate a database key.

Example

cv_key := KeyPrefix("cv:")
// ...
cv_id := "abcdef-123456"
// Now, cv_id == "cv:abcdef-123456"
// It can be used to access an item in a database, for example:
cv_info := db.get(cv_key.of(cv_id))

func (KeyPrefix) Concat

func (prefix KeyPrefix) Concat(other string) KeyPrefix

Concat other onto prefix and return the result as a KeyPrefix.

func (KeyPrefix) Of

func (prefix KeyPrefix) Of(name string) string

Of returns the result of prefixing prefix onto name.

type WorkQueue

type WorkQueue struct {
	// contains filtered or unexported fields
}

WorkQueue backed by a redis database.

func NewWorkQueue

func NewWorkQueue(name KeyPrefix) WorkQueue

func (*WorkQueue) AddItem

func (workQueue *WorkQueue) AddItem(ctx context.Context, db *redis.Client, item Item) (bool, error)

AddItem to the work queue.

If an item with the same ID already exists, this item is not added, and false is returned. Otherwise, if the item is added true is returned.

If you know the item ID is unique, and not already in the queue, use the optimised WorkQueue.AddUniqueItem instead.

func (*WorkQueue) AddUniqueItem added in v0.3.0

func (workQueue *WorkQueue) AddUniqueItem(ctx context.Context, db *redis.Client, item Item) error

AddItem, which is known to have an ID not already in the queue, to the work queue.

This creates a pipeline and executes it on the database.

func (*WorkQueue) AddUniqueItemToPipeline added in v0.3.0

func (workQueue *WorkQueue) AddUniqueItemToPipeline(ctx context.Context, pipeline redis.Pipeliner, item Item)

AddItemToPipeline adds an item, which is known to have an ID not already in the queue, to the work queue. This adds the redis commands onto the pipeline passed.

Use WorkQueue.AddUniqueItem if you don't want to pass a pipeline directly.

func (*WorkQueue) Complete

func (workQueue *WorkQueue) Complete(ctx context.Context, db *redis.Client, item *Item) (bool, error)

Complete marks a job as completed and remove it from the work queue. After Complete has been called (and returns true), no workers will receive this job again.

Complete returns a boolean indicating if *the job has been removed* **and** *this worker was the first worker to call Complete*. So, while lease might give the same job to multiple workers, complete will return true for only one worker.

func (*WorkQueue) DeepClean added in v0.3.0

func (workQueue *WorkQueue) DeepClean(ctx context.Context, db *redis.Client) error

func (*WorkQueue) Lease

func (workQueue *WorkQueue) Lease(
	ctx context.Context,
	db *redis.Client,
	block bool,
	timeout time.Duration,
	leaseDuration time.Duration,
) (*Item, error)

Request a work lease the work queue. This should be called by a worker to get work to complete. When completed, the WorkQueue.Complete method should be called.

If block is true, the function will return either when a job is leased or after timeout if timeout isn't 0.

If the job is not completed before the end of lease_duration, another worker may pick up the same job. It is not a problem if a job is marked as done more than once.

If no job is available before the timeout, (nil, nil) is returned.

If you've not already done it, it's worth reading the documentation on leasing items at https://github.com/MeVitae/redis-work-queue/blob/main/README.md#leasing-an-item

func (*WorkQueue) LightClean added in v0.3.0

func (workQueue *WorkQueue) LightClean(ctx context.Context, db *redis.Client) error

func (*WorkQueue) Processing

func (workQueue *WorkQueue) Processing(ctx context.Context, db *redis.Client) (int64, error)

Processing returns the number of items being processed.

func (*WorkQueue) QueueLen

func (workQueue *WorkQueue) QueueLen(ctx context.Context, db *redis.Client) (int64, error)

Return the length of the work queue (not including items being processed, see WorkQueue.Processing).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL