Version: v0.1.20 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2021 License: BSD-3-Clause Imports: 2 Imported by: 0



Package bqueue implements several kinds of buffer queues, as a N-writer, 1-reader queue. By "buffer," we mean iobuf.Slice values. Each writer has a separate bounded queue to which it writes buffers. The queue also supports flow control.


// Create a new queue using one of the implementations
// (currently only bqueue/drrqueue).
q := drrqueue.New()

Reader API:

// Returns the next buffer in the queue, blocking until there is one
// available.  Returns with an error if <q> is closed:
_, buf, err := q.Get()

Writer API:

// Allocate a new Writer with the id, priority, and space for N elements.
w := q.New(id, priority, N)

// Add <buf> to the <w>.  Blocks until there is space in the Writer.
// Aborts if <cancel> is closed or contains a value.
err := w.Put(buf, cancel)

w.Release(N)  // Make the next N buffers available to q.Get().

The q.Release() method is used for rate limiting. Buffers can be added with q.Put(), but they are not passed to q.Get() until they are released.



This section is empty.


View Source
var (
	ErrBQueueIsClosed        = errors.New("bqueue: queue is closed")
	ErrWriterAlreadyExists   = errors.New("bqueue: writer already exists with this identifier")
	ErrWriterIsClosed        = errors.New("bqueue: writer is closed")
	ErrCantToggleFlowControl = errors.New("bqueue: can't turn on flow control when it is off")
	ErrCancelled             = errors.New("bqueue: operation was canceled")
	ErrTryAgain              = errors.New("bqueue: writer is not ready, try again")


This section is empty.


type FlushFunc

type FlushFunc func() error

FlushFunc is the type of flushing functions. See T.Get for more info.

type ID

type ID int64

ID is the type of Writer identifiers.

type Priority

type Priority uint // TODO(jyh): Change the dense requirement if we need it.

Priority is an integer priority. Smaller is greater priority.

For performance, priorities should be dense and start from 0. Some implementations like drrqueue have use space linear in the max priority.

type T

type T interface {
	String() string

	// Find returns the Writer with the specified ID.  Returns nil if there is
	// no such writer.
	Find(id ID) Writer

	// Get returns the next contents of the queue.  Get returns a Writer and an
	// array of elements dequeued from the Writer.  The number of elements
	// returned depends on the implementation (for example, drrqueue specifies a
	// cap on how many bytes can be dequeued per Writer per round-robin cycle).
	// In addition, multiple elements are returned so that iobuf.Coalesce() can
	// be used to coalesce the contents.
	// Get blocks until at least one element can be returned or the queue is
	// closed.  If non-nil, the <flush> function is called just before Get
	// blocks.
	// If a Writer is closed (the Writer's Close() method was called), then Get
	// returns the Writer with empty contents.  The caller should call
	// writer.Shutdown() to remove the Writer and prevent it from being returned
	// in subsequent calls.
	// It is not safe to call Get() concurrently.
	Get(flush FlushFunc) (Writer, []*iobuf.Slice, error)

	// NewWriter allocates a new Writer.
	NewWriter(id ID, p Priority, n int) (Writer, error)

T specifies a buffer queue. The NewWriter method is used to create new writer queues, and the Get method returns the next buffer to be served.

type Writer

type Writer interface {
	ID() ID

	// Close closes the Writer, without discarding the contents.  All Put
	// operations currently running may, or may not, add their values to the
	// Writer.  All Put operations that happen-after the Close will fail.

	// Shutdown closes the Writer as in Close and also discards the contents.
	// If removeWriter is true the writer will be removed from the
	// associated T's queue entirely, otherwise the now empty writer will
	// remain and eventually be returned by a T.Get.
	Shutdown(removeWriter bool)

	// IsClosed returns true if the Writer is closed.
	IsClosed() bool

	// IsDrained returns true if the Writer is closed and has no data
	IsDrained() bool

	// Put adds an element to the queue.  Put blocks until there is space in
	// the Writer.  The element is not made available to T.Get until it is
	// released with the Release method.  Returns an error if the queue is
	// closed or the operation is cancelled.
	Put(buf *iobuf.Slice, cancel <-chan struct{}) error

	// TryPut is like Put, but it is nonblocking.
	TryPut(buf *iobuf.Slice) error

	// Release allows the next <n> elements to be removed from the Writer and
	// passed to Get.  If <n> is negative, all messages are released and flow
	// control is no longer used.
	Release(n int) error

Writer represents a single writer queue. Writer queues are served according to the policy defined by the container queue T.

Source Files


Path Synopsis
Package drrqueue implements a deficit round-robin buffer queue.
Package drrqueue implements a deficit round-robin buffer queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL