Documentation
¶
Overview ¶
Package boltqueue provides a persistent priority queue based on BoltDB (https://github.com/boltdb/bolt)
Priority Queue ¶
The PQueue type represents a priority queue. Messages may be inserted into the queue at a numeric priority. Higher numbered priorities take precedence over lower numbered ones. Messages are dequeued following priority order, then time ordering, with the oldest messages of the highest priority emerging first.
There is no practical limit on the number of priorities, but a smaller number will typically give better performance than a larger number.
File-backed Buffered Channel ¶
The IChan type represents an unbounded channel with one priority, backed by a PQueue. As with ordinary channels, messages are inserted into one end and received from the other end in the same order. The size of the channel's buffer is limited only by space on the fiing system.
The sending end provides a choice: messages can be directly inserted into the channel using the Send() and SendString() methods. Or they can be channel-communicated using SendEnd() to obtain a normal channel. The latter is a nicer abstraction but requires one extra goroutine. If you care more about performance, use Send() and SendString() instead.
You cannot use both methods on the same IChan (it will panic if you do). This is to keep shutdown behaviour predictable.
Index ¶
- type ErrorHandler
- type IChan
- type Message
- type PQueue
- func (b *PQueue) ApproxSize() int64
- func (b *PQueue) Close() error
- func (b *PQueue) Dequeue() (*Message, error)
- func (b *PQueue) DequeueString() (string, error)
- func (b *PQueue) DequeueValue() ([]byte, error)
- func (b *PQueue) Enqueue(priority uint, message *Message) error
- func (b *PQueue) EnqueueString(priority uint, value string) error
- func (b *PQueue) EnqueueValue(priority uint, value []byte) error
- func (b *PQueue) Requeue(priority uint, message *Message) error
- func (b *PQueue) Size(priority uint) (int, error)
- func (b *PQueue) TotalSize() (int64, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ErrorHandler ¶
type ErrorHandler func(error)
type IChan ¶
type IChan struct {
// contains filtered or unexported fields
}
func NewIChan ¶
NewIChan creates a new file-backed infinite channel. It uses the specified filename to create a BoltDB database that implements the channel persistence. If the filename is a directory name ending with '/', a unique filename is generated and appended to it. The channel's buffer is limited only by space available on the filesystem.
func NewIChanOf ¶
NewIChanOf creates a new file-backed infinite channel from a BoltDB database. The channel's buffer is limited only by space available on the filesystem.
func (*IChan) Close ¶
Close closes the channel and its underlying queue. This is a direct function call unlike interacting with a channel end. Once SendEnd() has been used, you cannot then use this method too. You need instead to close the channel returned by SendEnd().
func (*IChan) ReceiveEnd ¶
ReceiveEnd gets the output end of the IChan. The result is the channel end, not the messages. This channel end should be used repeatedly until the channel is closed.
It is safe to share this channel between several goroutines; when this is done, a pseudo-random selection is made between them and only one goroutine receives each message (this is normal Go behaviour).
func (*IChan) Send ¶
Send sends a message via the channel. This is a direct function call unlike interacting with a channel end. Once SendEnd() has been used, you cannot then use this method too.
func (*IChan) SendEnd ¶
SendEnd gets the input end of the IChan. The first time this is called, a new goroutine is started that transfers messages into the IChan.
It is safe to share this channel between several goroutines; when this is done, a pseudo-random selection is made between them and only one goroutine receives each message (this is normal Go behaviour).
When you have finished, you muse close the channel (as is normal for Go channels), otherwise the resources will not be released cleanly.
If you prefer for there not to be one extra goroutine and don't want the simple channel abstraction, don't use this method but instead use Send(), SendString() and then Close().
func (*IChan) SendString ¶
SendString sends a message via the channel. This is a direct function call unlike interacting with a channel end. Once SendEnd() has been used, you cannot then use this method too.
func (*IChan) SetErrorHandler ¶
SetErrorHandler registers a function to handle errors at the receiving end.
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message represents a message in the priority queue
func NewGobMessage ¶ added in v0.0.2
func NewGobMessage(value interface{}) *Message
NewGobMessage generates a new priority queue message from a value via gob encoding. Any error results in a panic (usually arising due to missing gob type registration).
func NewMessage ¶
NewMessage generates a new priority queue message from a string.
func NewMessagef ¶
NewMessagef generates a new priority queue message from a formatted string. Formatting is as per fmt.Sprintf.
func WrapBytes ¶
WrapBytes generates a new priority queue message. Do not modify the source value after submitting the message.
func (*Message) GobValue ¶ added in v0.0.2
GobValue returns the message's value using gob decoding. This unpacks the data from NewGobMessage.
type PQueue ¶
type PQueue struct { // When RetainOnClose is true, the database file will be preserved after Close() is called. // Normally, the file is deleted on Close(). RetainOnClose bool // contains filtered or unexported fields }
PQueue is a priority queue backed by a Bolt database on disk
func NewPQueue ¶
NewPQueue loads or creates a new PQueue with the given filename. If the filename is a directory name ending with '/', a unique filename is generated and appended to it. Specify the required range of priorities; available priorities are from 0 (lowest) to the specified number minus one.
func WrapDB ¶
WrapDB wraps an existing BoltDB. Specify the required range of priorities; available priorities are from 0 (lowest) to the specified number minus one.
func (*PQueue) ApproxSize ¶
ApproxSize returns the sum of the sizes of all the priority queues, approximately. If the queue size is changing rapidly, this figure will be inaccurate. However, obtaining this value is very quick.
func (*PQueue) Dequeue ¶
Dequeue removes the oldest, highest priority message from the queue and returns it. If there are no messages available, nil, nil will be returned.
func (*PQueue) DequeueString ¶
DequeueString removes the oldest, highest priority message from the queue and returns its value as a string.
func (*PQueue) DequeueValue ¶
DequeueValue removes the oldest, highest priority message from the queue and returns its byte slice.
func (*PQueue) EnqueueString ¶
EnqueueString adds a string value to the queue at a specified priority (0=lowest).
func (*PQueue) EnqueueValue ¶
EnqueueValue adds a byte slice value to the queue at a specified priority (0=lowest).
func (*PQueue) Requeue ¶
Requeue adds a message back into the queue, keeping its precedence. If added at the same priority, it should be among the first to dequeue. If added at a different priority, it will dequeue before newer messages of that priority.