Documentation
¶
Index ¶
- Variables
- func Shovel(src, dst *Queue) (int, error)
- type Item
- type Items
- type Key
- type Options
- type Queue
- func (q *Queue) Clear() error
- func (q *Queue) Close() error
- func (q *Queue) DeleteLowerThan(key Key) (int, error)
- func (q *Queue) Len() int
- func (q *Queue) Move(n int, dst Items, dstQueue *Queue, fn ReadFn) error
- func (q *Queue) Peek(n int, dst Items, fn ReadFn) error
- func (q *Queue) Pop(n int, dst Items, fn ReadFn) error
- func (q *Queue) Push(items Items) error
- func (q *Queue) Sync() error
- type ReadFn
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultBucketFunc = bucket.DefaultBucketFunc
DefaultBucketFunc assumes that `key` is a nanosecond unix timestamps and divides data (roughly) in 2m minute buckets.
var DefaultOptions = bucket.DefaultOptions
DefaultOptions give you a set of options that are good to enough to try some experiments. Your mileage can vary a lot with different settings, so make sure to do some benchmarking!
var FixedSizeBucketFunc = bucket.FixedSizeBucketFunc
FixedSizeBucketFunc returns a BucketFunc that divides buckets into equal sized buckets with `n` entries. This can also be used to create time-based keys, if you use nanosecond based keys and pass time.Minute to create a buckets with a size of one minute.
var ShiftBucketFunc = bucket.ShiftBucketFunc
ShiftBucketFunc creates a fast BucketFunc that divides data into buckets by masking `shift` less significant bits of the key. With a shift of 37 you roughly get 2m buckets (if your key input are nanosecond-timestamps). If you want to calculate the size of a shift, use this formula: (2 ** shift) / (1e9 / 60) = minutes
Functions ¶
func Shovel ¶
Shovel moves items from `src` to `dst`. The `src` queue will be completely drained afterwards. For speed reasons this assume that the dst queue uses the same bucket func as the source queue. If you cannot guarantee this, you should implement a naive Shovel() implementation that just uses Pop/Push.
This method can be used if you want to change options like the BucketFunc or if you intend to have more than one queue that are connected by some logic. Examples for the latter case would be a "deadletter queue" where you put failed calculations for later re-calculations or a queue for unacknowledged items.
Types ¶
type Options ¶
Options gives you some knobs to configure the queue. Read the individual options carefully, as some of them can only be set on the first call to Open()
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is the high level API to the priority queue.
Example ¶
// Error handling stripped for brevity: dir, _ := os.MkdirTemp("", "timeq-example") defer os.RemoveAll(dir) // Open the queue. If it does not exist, it gets created: queue, _ := Open(dir, DefaultOptions()) // Push some items to it: pushItems := make(Items, 0, 10) for idx := 0; idx < 10; idx++ { pushItems = append(pushItems, Item{ Key: Key(idx), Blob: []byte(fmt.Sprintf("key_%d", idx)), }) } _ = queue.Push(pushItems) // Retrieve the same items again: _ = queue.Pop(10, nil, func(popItems Items) error { // Just for example purposes, check if they match: if reflect.DeepEqual(pushItems, popItems) { fmt.Println("They match! :)") } else { fmt.Println("They do not match! :(") } return nil })
Output: They match! :)
func Open ¶
Open tries to open the priority queue structure in `dir`. If `dir` does not exist, then a new, empty priority queue is created. The behavior of the queue can be fine-tuned with `opts`.
func (*Queue) Close ¶
Close should always be called and error checked when you're done with using the queue. Close might still flush out some data, depending on what sync mode you configured.
func (*Queue) DeleteLowerThan ¶
DeleteLowerThan deletes all items lower than `key`.
func (*Queue) Len ¶
Len returns the number of items in the queue. NOTE: This gets more expensive when you have a higher number of buckets, so you probably should not call that in a hot loop.
func (*Queue) Move ¶
Move works like Pop, but it pushes the popped items to `dstQueue` immediately. This implementation is safer than one that is build on this external API, as it deletes the popped data only when the push was successful. Please read the documentation of Pop() too.
func (*Queue) Peek ¶
Peek works like Pop, but does not delete the items in the queue. Please read the documentation of Pop() too.
func (*Queue) Pop ¶
Pop fetches up to `n` items from the queue. It will call the supplied `fn` one or several times until either `n` is reached or the queue is empty. If the queue is empty before calling Pop(), then `fn` is not called. If `n` is negative, then as many items as possible are returned until the queue is empty.
The `dst` argument can be used to pass a preallocated slice that the queue appends to. This can be done to avoid allocations. If you don't care you can also simply pass nil.
You should NEVER use the supplied items outside of `fn`, as they are directly sliced from a mmap(2). Accessing them outside will almost certainly lead to a crash. If you need them outside (e.g. for appending to a slice) then you can use the Copy() function of Items.