queue

package
v0.32.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2023 License: GPL-3.0 Imports: 8 Imported by: 1

Documentation

Overview

Package queue provides an in-memory queue structure suitable for the safe and low latency implementation of a real job queue.

It's like beanstalkd, but faster, with the ability to query the queue for desired items, reject duplicates, and wait on dependencies.

Like beanstalkd, when you add items to the queue, they move between different sub-queues:

Items start in the delay queue. After the item's delay time, they automatically move to the ready queue. From there you can Reserve() an item to get the highest priority (or for those with equal priority, the oldest - fifo) one which switches it from the ready queue to the run queue. Items can also have dependencies, in which case they start in the dependency queue and only move to the ready queue (bypassing the delay queue) once all its dependencies have been Remove()d from the queue. Items can also belong to a reservation group, in which case you can Reserve() an item in a desired group.

In the run queue the item starts a time-to-release (ttr) countdown; when that runs out the item is placed back on the ready queue. This is to handle a process Reserving an item but then crashing before it deals with the item; with it back on the ready queue, some other process can pick it up.

To stop it going back to the ready queue you either Remove() the item (you dealt with the item successfully), Touch() it to give yourself more time to handle the item, or you Bury() the item (the item can't be dealt with until the user takes some action). When you know you have a transient problem preventing you from handling the item right now, you can manually Release() the item back to the delay queue.

    import "github.com/VertebrateResequencing/wr/queue"
    q = queue.New("myQueue")
    q.SetReadyAddedCallback(func(queuename string, allitemdata []interface{}) {
        for _, item := range allitemdata {
            // cast item to the original type, then arrange to do something now
            // you know that the item is ready to be processed
        }
    })

    // add an item to the queue
    ttr := 30 * time.Second
    item, err := q.Add("uuid1", "", "item data1", 0, 0 * time.Second, ttr)
    item, err := q.Add("uuid2", "group", "item data2", 0, 0 * time.Second, ttr)

    // get it back out
    item, err = queue.Get("uuid1")

    // reserve the next item with no group
    item, err = queue.Reserve("", 0)

    // or reserve the next item in a particular group
	item, err = queue.Reserve("group", 0)

	// or reserve even if there are no items in the queue right now, waiting
	// until something gets added or otherwise becomes ready
	item, err = queue.Reserve("group", 1 * time.Second)

    // queue.Touch() every < ttr seconds if you might take longer than ttr to
    // process the item

    // say you successfully handled the item
    item.Remove()

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrQueueClosed   = errors.New("queue closed")
	ErrNothingReady  = errors.New("ready queue is empty")
	ErrAlreadyExists = errors.New("already exists")
	ErrNotFound      = errors.New("not found")
	ErrNotReady      = errors.New("not ready")
	ErrNotRunning    = errors.New("not running")
	ErrNotBuried     = errors.New("not buried")
)

queue has some typical errors

Functions

This section is empty.

Types

type ChangedCallback added in v0.9.0

type ChangedCallback func(from, to SubQueue, data []interface{})

ChangedCallback is used as a callback to know when items change sub-queues, telling you what item.Data() moved from which sub-queue to which other sub- queue. For new items in the queue, `from` will be SubQueueNew, and for items leaving the queue, `to` will be SubQueueRemoved.

type Error

type Error struct {
	Queue string // the queue's Name
	Op    string // name of the method
	Item  string // the item's key
	Err   error  // one of our Err vars
}

Error records an error and the operation, item and queue that caused it.

func (Error) Error

func (e Error) Error() string

type Item

type Item struct {
	Key          string
	ReserveGroup string
	// contains filtered or unexported fields
}

Item holds the information about each item in our queue, and has thread-safe functions to update properties as we switch between sub-queues.

func (*Item) ChangedKey added in v0.18.0

func (item *Item) ChangedKey(old, new string)

ChangedKey updates this item by changing its Key if old matches it, or by updating the key in any dependencies of this item.

func (*Item) Data

func (item *Item) Data() interface{}

Data returns the data of this item.

func (*Item) Dependencies added in v0.2.0

func (item *Item) Dependencies() []string

Dependencies returns the keys of the other items we are dependent upon. Note, do not add these back during a queue.Update(), or you could end up adding back dependencies that already got resolved, leaving you in a permanent dependent state; use UnresolvedDependencies() for that purpose instead.

func (*Item) ReadyAt added in v0.9.1

func (item *Item) ReadyAt() time.Time

ReadyAt is a thread-safe way of getting just the readyAt of an item, when you don't need all of the other information from Stats().

func (*Item) ReleaseAt added in v0.9.1

func (item *Item) ReleaseAt() time.Time

ReleaseAt is a thread-safe way of getting just the releaseAt of an item, when you don't need all of the other information from Stats().

func (*Item) SetData added in v0.21.0

func (item *Item) SetData(data interface{})

SetData stores new data in this item.

func (*Item) State added in v0.9.1

func (item *Item) State() ItemState

State is a thread-safe way of getting just the state of an item, when you don't need all of the other information from Stats().

func (*Item) Stats

func (item *Item) Stats() *ItemStats

Stats returns some information about the item.

func (*Item) UnresolvedDependencies added in v0.3.0

func (item *Item) UnresolvedDependencies() []string

UnresolvedDependencies returns the keys of the other items we are still dependent upon.

type ItemDef

type ItemDef struct {
	Key          string
	ReserveGroup string
	Data         interface{}
	Priority     uint8 // highest priority is 255
	Delay        time.Duration
	TTR          time.Duration
	StartQueue   SubQueue // blank, or one of SubQueueRun or SubQueueBury
	Dependencies []string
}

ItemDef makes it possible to supply a slice of Add() args to AddMany().

type ItemState added in v0.9.0

type ItemState string

ItemState is how we describe the possible item states.

const (
	ItemStateDelay     ItemState = "delay"
	ItemStateReady     ItemState = "ready"
	ItemStateRun       ItemState = "run"
	ItemStateBury      ItemState = "bury"
	ItemStateDependent ItemState = "dependent"
	ItemStateRemoved   ItemState = "removed"
)

ItemState* constants represent all the possible item states.

type ItemStats

type ItemStats struct {
	State     ItemState
	Age       time.Duration
	Remaining time.Duration
	Delay     time.Duration
	TTR       time.Duration
	Reserves  uint32
	Timeouts  uint32
	Releases  uint32
	Buries    uint32
	Kicks     uint32
	Priority  uint8
	Size      uint8
}

ItemStats holds information about the Item's state. Remaining is the time remaining in the current sub-queue. This will be a duration of zero for all but the delay and run states. In the delay state it tells you how long before it can be reserved, and in the run state it tells you how long before it will be released automatically.

type Queue

type Queue struct {
	Name string
	// contains filtered or unexported fields
}

Queue is a synchronized map of items that can shift to different sub-queues, automatically depending on their delay or ttr expiring, or manually by calling certain methods.

func New

func New(ctx context.Context, name string) *Queue

New is a helper to create instance of the Queue struct.

func (*Queue) Add

func (queue *Queue) Add(ctx context.Context, key string, reserveGroup string, data interface{}, priority uint8, delay time.Duration, ttr time.Duration, startQueue SubQueue, deps ...[]string) (*Item, error)

Add is a thread-safe way to add new items to the queue.

After delay they will switch to the ready sub-queue from where they can be Reserve()d. Once reserved, they have ttr to Remove() the item, otherwise it gets released back to the ready sub-queue.

The priority determines which item will be next to be Reserve()d, with priority 255 (the max) items coming before lower priority ones (with 0 being the lowest). Items with the same priority number are Reserve()d on a fifo basis.

reserveGroup can be left as an empty string, but specifying it then lets you provide the same to Reserve() to get the next item with the given reserveGroup.

startQueue should normally be supplied as an empty string, meaning the item will start in the delay or ready sub-queue as described above. For the purpose of recovering a queue following a crash, however, you can supply either SubQueueRun or SubQueueBury to start the item in one of those sub-queues. If the item has unmet dependencies, startQueue is ignored.

The final argument to Add() is an optional slice of item ids on which this item depends: this item will first enter the dependency sub-queue and only transfer to the ready sub-queue when items with these ids get Remove()d from the queue.

Add() returns an item, which may have already existed (in which case, nothing was actually added or changed).

func (*Queue) AddMany

func (queue *Queue) AddMany(ctx context.Context, items []*ItemDef) (added, dups int, err error)

AddMany is like Add(), except that you supply a slice of *ItemDef, and it returns the number that were actually added and the number of items that were not added because they were duplicates of items already in the queue. If an error occurs, nothing will have been added.

func (*Queue) AddWithSize added in v0.20.0

func (queue *Queue) AddWithSize(ctx context.Context, key string, reserveGroup string, data interface{}, priority uint8, size uint8, delay time.Duration, ttr time.Duration, startQueue SubQueue, deps ...[]string) (*Item, error)

AddWithSize is like Add(), but the item also gets a "size" property. Size alters the way priority is handled. For items with the same priority, the next to be Reserve()d will be the item with the highest size. If they also have the same size, then they will be Reserve()d in fifo order.

func (*Queue) AllItems

func (queue *Queue) AllItems() []*Item

AllItems returns the items in the queue. NB: You should NOT do anything to these items - use for read-only purposes.

func (*Queue) Bury

func (queue *Queue) Bury(key string) error

Bury is a thread-safe way to switch an item in the run sub-queue to the bury sub-queue, for when the item can't be dealt with ever, at least until the user takes some action and changes something.

func (*Queue) ChangeKey added in v0.18.0

func (queue *Queue) ChangeKey(old, new string) error

ChangeKey is a thread-safe way to change the key an item can be found with using Get() (and also ensures any dependencies involving the old key will continue to work). If an item already exists in the queue with the new key, this will fail.

func (*Queue) Destroy

func (queue *Queue) Destroy() error

Destroy shuts down a queue, destroying any contents. You can't do anything useful with it after that.

func (*Queue) Get

func (queue *Queue) Get(key string) (*Item, error)

Get is a thread-safe way to get an item by the key you used to Add() it.

func (*Queue) GetRunningData

func (queue *Queue) GetRunningData() []interface{}

GetRunningData gets all the item.Data() of items currently in the run sub- queue.

func (*Queue) HasDependents added in v0.2.0

func (queue *Queue) HasDependents(key string) (bool, error)

HasDependents tells you if the item with the given key has any other items depending upon it. You'd want to check this before Remove()ing this item if you're removing it because it was undesired as opposed to complete, as Remove() always triggers dependent items to become ready.

func (*Queue) Kick

func (queue *Queue) Kick(ctx context.Context, key string) error

Kick is a thread-safe way to switch an item in the bury sub-queue to the ready sub-queue, for when a previously buried item can now be handled.

func (*Queue) Release

func (queue *Queue) Release(ctx context.Context, key string) error

Release is a thread-safe way to switch an item in the run sub-queue to the delay sub-queue, for when the item should be dealt with later, not now.

func (*Queue) Remove

func (queue *Queue) Remove(ctx context.Context, key string) error

Remove is a thread-safe way to remove an item from the queue.

func (*Queue) Reserve

func (queue *Queue) Reserve(reserveGroup string, wait time.Duration) (*Item, error)

Reserve is a thread-safe way to get the highest priority (or for those with equal priority, the oldest (by time since the item was first Add()ed) item in the queue, switching it from the ready sub-queue to the run sub-queue, and in so doing starting its ttr countdown.

If reserveGroup is not blank, you will get the next item that was added with the given ReserveGroup (conversely, if your items were added with ReserveGroups but you don't supply one here, you will not get an item).

If wait is greater than 0, we will wait for up to that much time for an item to appear in the ready sub-queue, if at least 1 isn't already there. If after this time there is still nothing in the ready sub-queue, no item and a ErrNothingReady error is returned.

You need to Remove() the item when you're done with it. If you're still doing something and ttr is approaching, Touch() it, otherwise it will be assumed you died and the item will be released back to the ready sub-queue automatically, to be handled by someone else that gets it from a Reserve() call. If you know you can't handle it right now, but someone else might be able to later, you can manually call Release(), which moves it to the delay sub-queue.

func (*Queue) SetChangedCallback

func (queue *Queue) SetChangedCallback(callback ChangedCallback)

SetChangedCallback sets a callback that will be called when items move from one sub-queue to another. The callback receives the name of the moved-from sub-queue ('new' in the case of entering the queue for the first time), the name of the moved-to sub-queue ('removed' in the case of the item being removed from the queue), and a slice of item.Data() of everything that moved in this way. The callback will be initiated in a go routine.

func (*Queue) SetDelay

func (queue *Queue) SetDelay(key string, delay time.Duration) error

SetDelay is a thread-safe way to change the delay of an item.

func (*Queue) SetReadyAddedCallback

func (queue *Queue) SetReadyAddedCallback(callback ReadyAddedCallback)

SetReadyAddedCallback sets a callback that will be called when new items have been added to the ready sub-queue. The callback will receive the name of the queue, and a slice of the Data properties of every item currently in the ready sub-queue. The callback will be initiated in a go routine.

Note that we will wait for the callback to finish running before calling it again. If new items enter the ready sub-queue while your callback is still running, you will only know about them when your callback is called again, immediately after the previous call completes.

func (*Queue) SetReserveGroup added in v0.8.0

func (queue *Queue) SetReserveGroup(key string, newGroup string) error

SetReserveGroup is a thread-safe way to change the ReserveGroup of an item.

func (*Queue) SetTTRCallback added in v0.9.0

func (queue *Queue) SetTTRCallback(callback TTRCallback)

SetTTRCallback sets a callback that will be called when an item in the run sub-queue hits its TTR. The callback receives an item's data and should return the sub-queue the item should be moved to. If you don't set this, the default will be to move all items to the ready sub-queue.

func (*Queue) Stats

func (queue *Queue) Stats() *Stats

Stats returns information about the number of items in the queue and each sub-queue.

func (*Queue) Touch

func (queue *Queue) Touch(key string) error

Touch is a thread-safe way to extend the amount of time a Reserve()d item is allowed to run.

func (*Queue) TriggerReadyAddedCallback

func (queue *Queue) TriggerReadyAddedCallback(ctx context.Context)

TriggerReadyAddedCallback allows you to manually trigger your readyAddedCallback at times when no new items have been added to the ready queue. It will receive the current set of ready item data.

func (*Queue) Update

func (queue *Queue) Update(ctx context.Context, key string, reserveGroup string, data interface{}, priority uint8, delay time.Duration, ttr time.Duration, deps ...[]string) error

Update is a thread-safe way to change the data, ReserveGroup, priority, delay, ttr or dependencies of an item. You must supply all of these as per Add() - just supply the old values of those you are not changing (except for dependencies, which remain optional). The old values can be found by getting the item with Get() (giving you item.Key, item.ReserveGroup, item.Data() and item.UnresolvedDependencies()), and then calling item.Stats() to get stats.Priority, stats.Delay and stats.TTR.

type ReadyAddedCallback added in v0.9.0

type ReadyAddedCallback func(queuename string, allitemdata []interface{})

ReadyAddedCallback is used as a callback to know when new items have been added to the ready sub-queue, getting /all/ items in the ready sub-queue.

type Stats

type Stats struct {
	Items     int
	Delayed   int
	Ready     int
	Running   int
	Buried    int
	Dependant int
}

Stats holds information about the Queue's state.

type SubQueue added in v0.9.0

type SubQueue string

SubQueue is how we name the sub-queues of a Queue.

const (
	SubQueueNew       SubQueue = "new"
	SubQueueDelay     SubQueue = "delay"
	SubQueueReady     SubQueue = "ready"
	SubQueueRun       SubQueue = "run"
	SubQueueBury      SubQueue = "bury"
	SubQueueDependent SubQueue = "dependent"
	SubQueueRemoved   SubQueue = "removed"
)

SubQueue* constants represent all the possible sub-queues. For use in changedCallback(), there are also the fake sub-queues representing items new to the queue and items removed from the queue.

type TTRCallback added in v0.9.0

type TTRCallback func(data interface{}) SubQueue

TTRCallback is used as a callback to decide which sub-queue an item should move to when a an item in the run sub-queue hits its TTR, based on that item's data. Valid return values are SubQueueDelay, SubQueueReady and SubQueueBury. SubQueueRun can be used to avoid changing subqueue. Other values will be treated as SubQueueReady).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL