boltqueue

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2021 License: MIT Imports: 9 Imported by: 0

README

boltqueue

GoDoc

GoDoc Build Status Coverage Status Go Report Card Issues

import "github.com/rickb777/boltqueue"

Package boltqueue provides a persistent priority queue (PQueue) using as its store BBolt, which is a derivative of BoltDB.

Built on this, a channel implementation is provided that uses the BBolt backing store to provide a channel with persistent buffering on a large scale. This is intended to complement (not replace) standard message queuing technologies.

Priority Queue

The PQueue type represents a priority queue. Messages may be inserted into the queue at a numeric priority. Higher numbered priorities take precedence over lower numbered ones. Messages are dequeued following priority order, then time ordering, with the oldest messages of the highest priority emerging first.

IChan

The IChan channel provides a communication pipe with the same semantics as normal buffered Go channels. The message type is always []byte, however. Buffering uses a BoltDB file store. This allows the channel to be persistent and outside-of-memory, but will impair performance compared to an equivalent in-memory channel.

Licence : MIT

Documentation

Overview

Package boltqueue provides a persistent priority queue based on BoltDB (https://github.com/boltdb/bolt)

Priority Queue

The PQueue type represents a priority queue. Messages may be inserted into the queue at a numeric priority. Higher numbered priorities take precedence over lower numbered ones. Messages are dequeued following priority order, then time ordering, with the oldest messages of the highest priority emerging first.

There is no practical limit on the number of priorities, but a smaller number will typically give better performance than a larger number.

File-backed Buffered Channel

The IChan type represents an unbounded channel with one priority, backed by a PQueue. As with ordinary channels, messages are inserted into one end and received from the other end in the same order. The size of the channel's buffer is limited only by space on the fiing system.

The sending end provides a choice: messages can be directly inserted into the channel using the Send() and SendString() methods. Or they can be channel-communicated using SendEnd() to obtain a normal channel. The latter is a nicer abstraction but requires one extra goroutine. If you care more about performance, use Send() and SendString() instead.

You cannot use both methods on the same IChan (it will panic if you do). This is to keep shutdown behaviour predictable.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrorHandler

type ErrorHandler func(error)

type IChan

type IChan struct {
	// contains filtered or unexported fields
}

func NewIChan

func NewIChan(filename string) (*IChan, error)

NewIChan creates a new file-backed infinite channel. It uses the specified filename to create a BoltDB database that implements the channel persistence. If the filename is a directory name ending with '/', a unique filename is generated and appended to it. The channel's buffer is limited only by space available on the filesystem.

func NewIChanOf

func NewIChanOf(pq *PQueue) *IChan

NewIChanOf creates a new file-backed infinite channel from a BoltDB database. The channel's buffer is limited only by space available on the filesystem.

func (*IChan) Close

func (c *IChan) Close() error

Close closes the channel and its underlying queue. This is a direct function call unlike interacting with a channel end. Once SendEnd() has been used, you cannot then use this method too. You need instead to close the channel returned by SendEnd().

func (*IChan) ReceiveEnd

func (c *IChan) ReceiveEnd() <-chan []byte

ReceiveEnd gets the output end of the IChan. The result is the channel end, not the messages. This channel end should be used repeatedly until the channel is closed.

It is safe to share this channel between several goroutines; when this is done, a pseudo-random selection is made between them and only one goroutine receives each message (this is normal Go behaviour).

func (*IChan) Send

func (c *IChan) Send(value []byte) error

Send sends a message via the channel. This is a direct function call unlike interacting with a channel end. Once SendEnd() has been used, you cannot then use this method too.

func (*IChan) SendEnd

func (c *IChan) SendEnd() chan<- []byte

SendEnd gets the input end of the IChan. The first time this is called, a new goroutine is started that transfers messages into the IChan.

It is safe to share this channel between several goroutines; when this is done, a pseudo-random selection is made between them and only one goroutine receives each message (this is normal Go behaviour).

When you have finished, you muse close the channel (as is normal for Go channels), otherwise the resources will not be released cleanly.

If you prefer for there not to be one extra goroutine and don't want the simple channel abstraction, don't use this method but instead use Send(), SendString() and then Close().

func (*IChan) SendString

func (c *IChan) SendString(value string) error

SendString sends a message via the channel. This is a direct function call unlike interacting with a channel end. Once SendEnd() has been used, you cannot then use this method too.

func (*IChan) SetErrorHandler

func (c *IChan) SetErrorHandler(eh func(error))

SetErrorHandler registers a function to handle errors at the receiving end.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message represents a message in the priority queue

func NewGobMessage added in v0.0.2

func NewGobMessage(value interface{}) *Message

NewGobMessage generates a new priority queue message from a value via gob encoding. Any error results in a panic (usually arising due to missing gob type registration).

func NewMessage

func NewMessage(value string) *Message

NewMessage generates a new priority queue message from a string.

func NewMessagef

func NewMessagef(format string, arg ...interface{}) *Message

NewMessagef generates a new priority queue message from a formatted string. Formatting is as per fmt.Sprintf.

func WrapBytes

func WrapBytes(value []byte) *Message

WrapBytes generates a new priority queue message. Do not modify the source value after submitting the message.

func (*Message) GobValue added in v0.0.2

func (m *Message) GobValue(v interface{}) error

GobValue returns the message's value using gob decoding. This unpacks the data from NewGobMessage.

func (*Message) Priority

func (m *Message) Priority() uint

Priority returns the priority the message had in the queue.

func (*Message) String

func (m *Message) String() string

String outputs the string representation of the message's value.

func (*Message) Value

func (m *Message) Value() []byte

Value returns the message's value./ This is a mutable slice and you should not normally modify it.

type PQueue

type PQueue struct {
	// When RetainOnClose is true, the database file will be preserved after Close() is called.
	// Normally, the file is deleted on Close().
	RetainOnClose bool
	// contains filtered or unexported fields
}

PQueue is a priority queue backed by a Bolt database on disk

func NewPQueue

func NewPQueue(filename string, priorities uint) (*PQueue, error)

NewPQueue loads or creates a new PQueue with the given filename. If the filename is a directory name ending with '/', a unique filename is generated and appended to it. Specify the required range of priorities; available priorities are from 0 (lowest) to the specified number minus one.

func WrapDB

func WrapDB(db *bbolt.DB, priorities uint) (*PQueue, error)

WrapDB wraps an existing BoltDB. Specify the required range of priorities; available priorities are from 0 (lowest) to the specified number minus one.

func (*PQueue) ApproxSize

func (b *PQueue) ApproxSize() int64

ApproxSize returns the sum of the sizes of all the priority queues, approximately. If the queue size is changing rapidly, this figure will be inaccurate. However, obtaining this value is very quick.

func (*PQueue) Close

func (b *PQueue) Close() error

Close closes the queue database.

func (*PQueue) Dequeue

func (b *PQueue) Dequeue() (*Message, error)

Dequeue removes the oldest, highest priority message from the queue and returns it. If there are no messages available, nil, nil will be returned.

func (*PQueue) DequeueString

func (b *PQueue) DequeueString() (string, error)

DequeueString removes the oldest, highest priority message from the queue and returns its value as a string.

func (*PQueue) DequeueValue

func (b *PQueue) DequeueValue() ([]byte, error)

DequeueValue removes the oldest, highest priority message from the queue and returns its byte slice.

func (*PQueue) Enqueue

func (b *PQueue) Enqueue(priority uint, message *Message) error

Enqueue adds a message to the queue at a specified priority (0=lowest).

func (*PQueue) EnqueueString

func (b *PQueue) EnqueueString(priority uint, value string) error

EnqueueString adds a string value to the queue at a specified priority (0=lowest).

func (*PQueue) EnqueueValue

func (b *PQueue) EnqueueValue(priority uint, value []byte) error

EnqueueValue adds a byte slice value to the queue at a specified priority (0=lowest).

func (*PQueue) Requeue

func (b *PQueue) Requeue(priority uint, message *Message) error

Requeue adds a message back into the queue, keeping its precedence. If added at the same priority, it should be among the first to dequeue. If added at a different priority, it will dequeue before newer messages of that priority.

func (*PQueue) Size

func (b *PQueue) Size(priority uint) (int, error)

Size returns the number of entries of a given priority from 0 to 255 (0=highest).

func (*PQueue) TotalSize

func (b *PQueue) TotalSize() (int64, error)

TotalSize sums the sizes of all the priority queues.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL