Published: Apr 30, 2016 License: BSD-3-Clause



A producer/consumer queue is a concurrent bounded buffer supporting multiple concurrent producers and consumers, with timeouts. The queue can be closed from either end, by the producer and/or the consumer. When closed, the contents are discarded, and subsequent operations return an error.

Note: the main reason to use a producer/consumer queue instead of a channel is to allow the consumer to close the channel. This queue can be used for many-to-many communication with multiple producers and/or multiple consumers. Any of the producers and any of the consumers are allowed to close the queue.



var (
	ErrQueueIsClosed = errors.New("queue is closed")
	ErrCancelled     = errors.New("operation was canceled")
	ErrTryAgain      = errors.New("operation failed, try again")


type T

type T struct {
	// contains filtered or unexported fields

T is a producer/consumer queue. It fulfills the same purpose as a Go channel, the main advantage is that the Put() operation does not panic, even after the queue is closed. The main disadvantage is that the T can't be used in a select operation.

func New

func New(maxSize int) *T

New(size) returns a producer/consumer queue with maximum <size> elements.

func (*T) Close

func (q *T) Close()

Close() closes the queue, without discarding the contents. All Put*() operations currently running may, or may not, add their values to the queue. All Put*() operations that happen-after the Close() will fail.

func (*T) Get

func (q *T) Get(cancel <-chan struct{}) (interface{}, error)

Get(cancel) returns the next item from the queue, or an error if the queue is closed or the operation is cancelled.

func (*T) Put

func (q *T) Put(item interface{}, cancel <-chan struct{}) error

Put(item, cancel) adds an item to the queue, or returns an error if the queue is closed or the operation is cancelled. The <cancel> channel may be nil, in which case the operation can't be cancelled.

func (*T) Shutdown

func (q *T) Shutdown()

Shutdown() closes the queue and discards all contents. Any concurrent Get() and Put() operations might exchange values, but all operations that happen-after the Shutdown() will fail.

func (*T) TryPut

func (q *T) TryPut(item interface{}) error

TryPut attempts to add an item to the queue. If the queue is full, ErrTryAgain is returned immediately, without blocking. If the queue is closed, ErrQueueIsClosed is returned.

