Documentation

Overview

    Package queue includes a regular queue and a priority queue. These queues rely on waitgroups to pause listening threads on empty queues until a message is received. If any thread calls Dispose on the queue, any listeners are immediately returned with an error. Any subsequent put to the queue will return an error as opposed to panicking as with channels. Queues will grow with unbounded behavior as opposed to channels which can be buffered but will pause while a thread attempts to put to a full channel.

    Recently added is a lockless ring buffer using the same basic C design as found here:

    http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue

    Modified for use with Go with the addition of some dispose semantics providing the capability to release blocked threads. This works for both puts and gets, either will return an error if they are blocked and the buffer is disposed. This could serve as a signal to kill a goroutine. All threadsafety is acheived using CAS operations, making this buffer pretty quick.

    Benchmarks: BenchmarkPriorityQueue-8 2000000 782 ns/op BenchmarkQueue-8 2000000 671 ns/op BenchmarkChannel-8 1000000 2083 ns/op BenchmarkQueuePut-8 20000 84299 ns/op BenchmarkQueueGet-8 20000 80753 ns/op BenchmarkExecuteInParallel-8 20000 68891 ns/op BenchmarkRBLifeCycle-8 10000000 177 ns/op BenchmarkRBPut-8 30000000 58.1 ns/op BenchmarkRBGet-8 50000000 26.8 ns/op

    TODO: We really need a Fibonacci heap for the priority queue. TODO: Unify the types of queue to the same interface.

    Index

    Constants

    This section is empty.

    Variables

    View Source
    var (
    	// ErrDisposed is returned when an operation is performed on a disposed
    	// queue.
    	ErrDisposed = errors.New(`queue: disposed`)
    
    	// ErrTimeout is returned when an applicable queue operation times out.
    	ErrTimeout = errors.New(`queue: poll timed out`)
    
    	// ErrEmptyQueue is returned when an non-applicable queue operation was called
    	// due to the queue's empty item state
    	ErrEmptyQueue = errors.New(`queue: empty queue`)
    )

    Functions

    func ExecuteInParallel

    func ExecuteInParallel(q *Queue, fn func(interface{}))

      ExecuteInParallel will (in parallel) call the provided function with each item in the queue until the queue is exhausted. When the queue is exhausted execution is complete and all goroutines will be killed. This means that the queue will be disposed so cannot be used again.

      Types

      type Item

      type Item interface {
      	// Compare returns a bool that can be used to determine
      	// ordering in the priority queue.  Assuming the queue
      	// is in ascending order, this should return > logic.
      	// Return 1 to indicate this object is greater than the
      	// the other logic, 0 to indicate equality, and -1 to indicate
      	// less than other.
      	Compare(other Item) int
      }

        Item is an item that can be added to the priority queue.

        type PriorityQueue

        type PriorityQueue struct {
        	// contains filtered or unexported fields
        }

          PriorityQueue is similar to queue except that it takes items that implement the Item interface and adds them to the queue in priority order.

          func NewPriorityQueue

          func NewPriorityQueue(hint int, allowDuplicates bool) *PriorityQueue

            NewPriorityQueue is the constructor for a priority queue.

            func (*PriorityQueue) Dispose

            func (pq *PriorityQueue) Dispose()

              Dispose will prevent any further reads/writes to this queue and frees available resources.

              func (*PriorityQueue) Disposed

              func (pq *PriorityQueue) Disposed() bool

                Disposed returns a bool indicating if this queue has been disposed.

                func (*PriorityQueue) Empty

                func (pq *PriorityQueue) Empty() bool

                  Empty returns a bool indicating if there are any items left in the queue.

                  func (*PriorityQueue) Get

                  func (pq *PriorityQueue) Get(number int) ([]Item, error)

                    Get retrieves items from the queue. If the queue is empty, this call blocks until the next item is added to the queue. This will attempt to retrieve number of items.

                    func (*PriorityQueue) Len

                    func (pq *PriorityQueue) Len() int

                      Len returns a number indicating how many items are in the queue.

                      func (*PriorityQueue) Peek

                      func (pq *PriorityQueue) Peek() Item

                        Peek will look at the next item without removing it from the queue.

                        func (*PriorityQueue) Put

                        func (pq *PriorityQueue) Put(items ...Item) error

                          Put adds items to the queue.

                          type Queue

                          type Queue struct {
                          	// contains filtered or unexported fields
                          }

                            Queue is the struct responsible for tracking the state of the queue.

                            func New

                            func New(hint int64) *Queue

                              New is a constructor for a new threadsafe queue.

                              func (*Queue) Dispose

                              func (q *Queue) Dispose() []interface{}

                                Dispose will dispose of this queue and returns the items disposed. Any subsequent calls to Get or Put will return an error.

                                func (*Queue) Disposed

                                func (q *Queue) Disposed() bool

                                  Disposed returns a bool indicating if this queue has had disposed called on it.

                                  func (*Queue) Empty

                                  func (q *Queue) Empty() bool

                                    Empty returns a bool indicating if this bool is empty.

                                    func (*Queue) Get

                                    func (q *Queue) Get(number int64) ([]interface{}, error)

                                      Get retrieves items from the queue. If there are some items in the queue, get will return a number UP TO the number passed in as a parameter. If no items are in the queue, this method will pause until items are added to the queue.

                                      func (*Queue) Len

                                      func (q *Queue) Len() int64

                                        Len returns the number of items in this queue.

                                        func (*Queue) Peek

                                        func (q *Queue) Peek() (interface{}, error)

                                          Peek returns a the first item in the queue by value without modifying the queue.

                                          func (*Queue) Poll

                                          func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error)

                                            Poll retrieves items from the queue. If there are some items in the queue, Poll will return a number UP TO the number passed in as a parameter. If no items are in the queue, this method will pause until items are added to the queue or the provided timeout is reached. A non-positive timeout will block until items are added. If a timeout occurs, ErrTimeout is returned.

                                            func (*Queue) Put

                                            func (q *Queue) Put(items ...interface{}) error

                                              Put will add the specified items to the queue.

                                              func (*Queue) TakeUntil

                                              func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, error)

                                                TakeUntil takes a function and returns a list of items that match the checker until the checker returns false. This does not wait if there are no items in the queue.

                                                type RingBuffer

                                                type RingBuffer struct {
                                                	// contains filtered or unexported fields
                                                }

                                                  RingBuffer is a MPMC buffer that achieves threadsafety with CAS operations only. A put on full or get on empty call will block until an item is put or retrieved. Calling Dispose on the RingBuffer will unblock any blocked threads with an error. This buffer is similar to the buffer described here: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue with some minor additions.

                                                  func NewRingBuffer

                                                  func NewRingBuffer(size uint64) *RingBuffer

                                                    NewRingBuffer will allocate, initialize, and return a ring buffer with the specified size.

                                                    func (*RingBuffer) Cap

                                                    func (rb *RingBuffer) Cap() uint64

                                                      Cap returns the capacity of this ring buffer.

                                                      func (*RingBuffer) Dispose

                                                      func (rb *RingBuffer) Dispose()

                                                        Dispose will dispose of this queue and free any blocked threads in the Put and/or Get methods. Calling those methods on a disposed queue will return an error.

                                                        func (*RingBuffer) Get

                                                        func (rb *RingBuffer) Get() (interface{}, error)

                                                          Get will return the next item in the queue. This call will block if the queue is empty. This call will unblock when an item is added to the queue or Dispose is called on the queue. An error will be returned if the queue is disposed.

                                                          func (*RingBuffer) IsDisposed

                                                          func (rb *RingBuffer) IsDisposed() bool

                                                            IsDisposed will return a bool indicating if this queue has been disposed.

                                                            func (*RingBuffer) Len

                                                            func (rb *RingBuffer) Len() uint64

                                                              Len returns the number of items in the queue.

                                                              func (*RingBuffer) Offer

                                                              func (rb *RingBuffer) Offer(item interface{}) (bool, error)

                                                                Offer adds the provided item to the queue if there is space. If the queue is full, this call will return false. An error will be returned if the queue is disposed.

                                                                func (*RingBuffer) Poll

                                                                func (rb *RingBuffer) Poll(timeout time.Duration) (interface{}, error)

                                                                  Poll will return the next item in the queue. This call will block if the queue is empty. This call will unblock when an item is added to the queue, Dispose is called on the queue, or the timeout is reached. An error will be returned if the queue is disposed or a timeout occurs. A non-positive timeout will block indefinitely.

                                                                  func (*RingBuffer) Put

                                                                  func (rb *RingBuffer) Put(item interface{}) error

                                                                    Put adds the provided item to the queue. If the queue is full, this call will block until an item is added to the queue or Dispose is called on the queue. An error will be returned if the queue is disposed.