Documentation
¶
Overview ¶
Package workqueue is the Go implementation of a work queue, on top of a redis database, with implementations in Python, Rust, Go, Node.js (TypeScript) and Dotnet (C#).
For an overview of how the work queue works, it’s limitations, and the general concepts and implementations in other languages, please read the redis-work-queue readme at https://github.com/MeVitae/redis-work-queue/blob/main/README.md.
Setup ¶
import ( "github.com/redis/go-redis/v9" workqueue "github.com/mevitae/redis-work-queue/go" ) db := redis.NewClient(&redis.Options{ Addr: "your-redis-server:1234", }) workQueue := workqueue.NewWorkQueue(workqueue.KeyPrefix("example_work_queue"))
Adding Items ¶
// From bytes: exampleItem := NewItem([]byte("[1,2,3]")) // Or from JSON: jsonItem, err := NewItemFromJSONData([1, 2, 3]) // ... err = workQueue.AddItem(ctx, db, jsonItem)
Completing work ¶
Please read the documentation on leasing and completing items at https://github.com/MeVitae/redis-work-queue/blob/main/README.md#leasing-an-item
for (;;) { job, err := workQueue.Lease(ctx, db, true, 0, 5*time.Second) if err != nil { panic(err) } doSomeWork(job) _, err = workQueue.Complete(ctx, db, job) if err != nil { panic(err) } }
Handling errors ¶
Please read the documentation on handling errors at https://github.com/MeVitae/redis-work-queue/blob/main/README.md#handling-errors
for (;;) { job, err := workQueue.Lease(ctx, db, true, 0, 5*time.Second) if err != nil { panic(err) } err = doSomeWork(job) if err == nil { // Mark successful jobs as complete _, err = workQueue.Complete(ctx, db, job) if err != nil { panic(err) } } else if !shouldRetryAfter(err) { // Errors that shouldn't cause a retry should mark the job as complete so it isn't tried // again. logError(err) _, err = workQueue.Complete(ctx, db, job) if err != nil { panic(err) } } else { // Drop a job that should be retried - it will be returned to the work queue after the // (5 second) lease expires. } }
Index ¶
- func ItemDataJson[T any](item *Item) (data T, err error)
- type Item
- type KeyPrefix
- type WorkQueue
- func (workQueue *WorkQueue) AddItem(ctx context.Context, db *redis.Client, item Item) (bool, error)
- func (workQueue *WorkQueue) AddUniqueItem(ctx context.Context, db *redis.Client, item Item) error
- func (workQueue *WorkQueue) AddUniqueItemToPipeline(ctx context.Context, pipeline redis.Pipeliner, item Item)
- func (workQueue *WorkQueue) Complete(ctx context.Context, db *redis.Client, item *Item) (bool, error)
- func (workQueue *WorkQueue) DeepClean(ctx context.Context, db *redis.Client) error
- func (workQueue *WorkQueue) Lease(ctx context.Context, db *redis.Client, block bool, timeout time.Duration, ...) (*Item, error)
- func (workQueue *WorkQueue) LightClean(ctx context.Context, db *redis.Client) error
- func (workQueue *WorkQueue) Processing(ctx context.Context, db *redis.Client) (int64, error)
- func (workQueue *WorkQueue) QueueLen(ctx context.Context, db *redis.Client) (int64, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ItemDataJson ¶
ItemDataJson returns the data from an item, parsed as JSON to type T.
Types ¶
type Item ¶
Item for a work queue. Each item has an ID and associated data.
func NewItemFromJSONData ¶
NewItemFromJSONData creates a new item with a random ID (a UUID). The data is the result of json.Marshal(data).
func (*Item) ParseJsonData ¶
ParseJsonData parses the item's data, as JSON, to v.
type KeyPrefix ¶
type KeyPrefix string
KeyPrefix is a string which should be prefixed to an identifier to generate a database key.
Example ¶
cv_key := KeyPrefix("cv:") // ... cv_id := "abcdef-123456" // Now, cv_id == "cv:abcdef-123456" // It can be used to access an item in a database, for example: cv_info := db.get(cv_key.of(cv_id))
type WorkQueue ¶
type WorkQueue struct {
// contains filtered or unexported fields
}
WorkQueue backed by a redis database.
func NewWorkQueue ¶
func (*WorkQueue) AddItem ¶
AddItem to the work queue.
If an item with the same ID already exists, this item is not added, and false is returned. Otherwise, if the item is added true is returned.
If you know the item ID is unique, and not already in the queue, use the optimised WorkQueue.AddUniqueItem instead.
func (*WorkQueue) AddUniqueItem ¶ added in v0.3.0
AddItem, which is known to have an ID not already in the queue, to the work queue.
This creates a pipeline and executes it on the database.
func (*WorkQueue) AddUniqueItemToPipeline ¶ added in v0.3.0
func (workQueue *WorkQueue) AddUniqueItemToPipeline(ctx context.Context, pipeline redis.Pipeliner, item Item)
AddItemToPipeline adds an item, which is known to have an ID not already in the queue, to the work queue. This adds the redis commands onto the pipeline passed.
Use WorkQueue.AddUniqueItem if you don't want to pass a pipeline directly.
func (*WorkQueue) Complete ¶
func (workQueue *WorkQueue) Complete(ctx context.Context, db *redis.Client, item *Item) (bool, error)
Complete marks a job as completed and remove it from the work queue. After Complete has been called (and returns true), no workers will receive this job again.
Complete returns a boolean indicating if *the job has been removed* **and** *this worker was the first worker to call Complete*. So, while lease might give the same job to multiple workers, complete will return true for only one worker.
func (*WorkQueue) Lease ¶
func (workQueue *WorkQueue) Lease( ctx context.Context, db *redis.Client, block bool, timeout time.Duration, leaseDuration time.Duration, ) (*Item, error)
Request a work lease the work queue. This should be called by a worker to get work to complete. When completed, the WorkQueue.Complete method should be called.
If block is true, the function will return either when a job is leased or after timeout if timeout isn't 0.
If the job is not completed before the end of lease_duration, another worker may pick up the same job. It is not a problem if a job is marked as done more than once.
If no job is available before the timeout, (nil, nil) is returned.
If you've not already done it, it's worth reading the documentation on leasing items at https://github.com/MeVitae/redis-work-queue/blob/main/README.md#leasing-an-item
func (*WorkQueue) LightClean ¶ added in v0.3.0
func (*WorkQueue) Processing ¶
Processing returns the number of items being processed.