goconcurrentqueue

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2022 License: MIT Imports: 4 Imported by: 56

README

go.dev reference godoc reference version Build Status Go Report Card codecov CodeFactor Mentioned in Awesome Go

goconcurrentqueue - Concurrent safe queues

The package goconcurrentqueue offers a public interface Queue with methods for a queue. It comes with multiple Queue's concurrent-safe implementations, meaning they could be used concurrently by multiple goroutines without adding race conditions.

Topics

Installation

Execute

go get github.com/enriquebris/goconcurrentqueue

This package is compatible with all golang versions >= 1.7.x

Documentation

Visit goconcurrentqueue at go.dev

Classes diagram

goconcurrentqueue class diagram

Queues

FIFO

FIFO: concurrent-safe auto expandable queue.

pros
  • It is possible to enqueue as many items as needed.
  • Extra methods to get and remove enqueued items:
    • Get: returns an element's value and keeps the element at the queue
    • Remove: removes an element (using a given position) from the queue
cons
  • It is slightly slower than FixedFIFO.
FixedFIFO

FixedFIFO: concurrent-safe fixed capacity queue.

pros
  • FixedFIFO is, at least, 2x faster than FIFO in concurrent scenarios (multiple GR accessing the queue simultaneously).
cons
  • It has a fixed capacity meaning that no more items than this capacity could coexist at the same time.

Benchmarks FixedFIFO vs FIFO

The numbers for the following charts were obtained by running the benchmarks in a 2012 MacBook Pro (2.3 GHz Intel Core i7 - 16 GB 1600 MHz DDR3) with golang v1.12

Enqueue

Dequeue

Get started

FIFO queue simple usage

Live code - playground

package main

import (
	"fmt"

	"github.com/enriquebris/goconcurrentqueue"
)

type AnyStruct struct {
	Field1 string
	Field2 int
}

func main() {
	queue := goconcurrentqueue.NewFIFO()

	queue.Enqueue("any string value")
	queue.Enqueue(5)
	queue.Enqueue(AnyStruct{Field1: "hello world", Field2: 15})

	// will output: 3
	fmt.Printf("queue's length: %v\n", queue.GetLen())

	item, err := queue.Dequeue()
	if err != nil {
		fmt.Println(err)
		return
	}

	// will output "any string value"
	fmt.Printf("dequeued item: %v\n", item)

	// will output: 2
	fmt.Printf("queue's length: %v\n", queue.GetLen())

}
Wait until an element gets enqueued

Live code - playground

package main

import (
	"fmt"
	"time"

	"github.com/enriquebris/goconcurrentqueue"
)

func main() {
	var (
		fifo = goconcurrentqueue.NewFIFO()
		done = make(chan struct{})
	)

	go func() {
		fmt.Println("1 - Waiting for next enqueued element")
		value, _ := fifo.DequeueOrWaitForNextElement()
		fmt.Printf("2 - Dequeued element: %v\n", value)

		done <- struct{}{}
	}()

	fmt.Println("3 - Go to sleep for 3 seconds")
	time.Sleep(3 * time.Second)

	fmt.Println("4 - Enqueue element")
	fifo.Enqueue(100)

	<-done
}

Wait until an element gets enqueued with timeout

Live code - playground

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/enriquebris/goconcurrentqueue"
)

func main() {
	var (
		fifo = goconcurrentqueue.NewFIFO()
		ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
	)
	defer cancel()

	fmt.Println("1 - Waiting for next enqueued element")
	_, err := fifo.DequeueOrWaitForNextElementContext(ctx)
    
	if err != nil {
		fmt.Printf("2 - Failed waiting for new element: %v\n", err)
		return
	}
}

Dependency Inversion Principle using concurrent-safe queues

High level modules should not depend on low level modules. Both should depend on abstractions. Robert C. Martin

Live code - playground

package main

import (
	"fmt"

	"github.com/enriquebris/goconcurrentqueue"
)

func main() {
	var (
		queue          goconcurrentqueue.Queue
		dummyCondition = true
	)

	// decides which Queue's implementation is the best option for this scenario
	if dummyCondition {
		queue = goconcurrentqueue.NewFIFO()
	} else {
		queue = goconcurrentqueue.NewFixedFIFO(10)
	}

	fmt.Printf("queue's length: %v\n", queue.GetLen())
	workWithQueue(queue)
	fmt.Printf("queue's length: %v\n", queue.GetLen())
}

// workWithQueue uses a goconcurrentqueue.Queue to perform the work
func workWithQueue(queue goconcurrentqueue.Queue) error {
	// do some work

	// enqueue an item
	if err := queue.Enqueue("test value"); err != nil {
		return err
	}

	return nil
}

History

v0.7.0
  • Prevents FIFO.DequeueOrWaitForNextElement to keep waiting for a waitChan while internal queues contain items
v0.6.3
  • Prevents FIFO.DequeueOrWaitForNextElement to add useless wait channels
v0.6.2
  • Prevents FIFO.DequeueOrWaitForNextElement to gets blocked when waiting for an enqueued element
v0.6.1
  • FixedFifo.Enqueue prevents to get blocked trying to send the item over an invalid waitForNextElementChan channel
v0.6.0
  • Added DequeueOrWaitForNextElementContext()
v0.5.1
  • FIFO.DequeueOrWaitForNextElement() was modified to avoid deadlock when DequeueOrWaitForNextElement && Enqueue are invoked around the same time.
  • Added multiple goroutine unit testings for FIFO.DequeueOrWaitForNextElement()
v0.5.0
  • Added DequeueOrWaitForNextElement()
v0.4.0
  • Added QueueError (custom error)
v0.3.0
  • Added FixedFIFO queue's implementation (at least 2x faster than FIFO for multiple GRs)
  • Added benchmarks for both FIFO / FixedFIFO
  • Added GetCap() to Queue interface
  • Removed Get() and Remove() methods from Queue interface
v0.2.0
  • Added Lock/Unlock/IsLocked methods to control operations locking
v0.1.0
  • First In First Out (FIFO) queue added

Documentation

Index

Constants

View Source
const (
	QueueErrorCodeEmptyQueue            = "empty-queue"
	QueueErrorCodeLockedQueue           = "locked-queue"
	QueueErrorCodeIndexOutOfBounds      = "index-out-of-bounds"
	QueueErrorCodeFullCapacity          = "full-capacity"
	QueueErrorCodeInternalChannelClosed = "internal-channel-closed"
)
View Source
const (
	WaitForNextElementChanCapacity = 1000
)

Variables

This section is empty.

Functions

This section is empty.

Types

type FIFO

type FIFO struct {
	// contains filtered or unexported fields
}

FIFO (First In First Out) concurrent queue

func NewFIFO

func NewFIFO() *FIFO

NewFIFO returns a new FIFO concurrent queue

func (*FIFO) Dequeue

func (st *FIFO) Dequeue() (interface{}, error)

Dequeue dequeues an element. Returns error if queue is locked or empty.

func (*FIFO) DequeueOrWaitForNextElement

func (st *FIFO) DequeueOrWaitForNextElement() (interface{}, error)

DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element gets enqueued and returns it. Multiple calls to DequeueOrWaitForNextElement() would enqueue multiple "listeners" for future enqueued elements.

func (*FIFO) DequeueOrWaitForNextElementContext

func (st *FIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (interface{}, error)

DequeueOrWaitForNextElementContext dequeues an element (if exist) or waits until the next element gets enqueued and returns it. Multiple calls to DequeueOrWaitForNextElementContext() would enqueue multiple "listeners" for future enqueued elements. When the passed context expires this function exits and returns the context' error

func (*FIFO) Enqueue

func (st *FIFO) Enqueue(value interface{}) error

Enqueue enqueues an element. Returns error if queue is locked.

func (*FIFO) Get

func (st *FIFO) Get(index int) (interface{}, error)

Get returns an element's value and keeps the element at the queue

func (*FIFO) GetCap

func (st *FIFO) GetCap() int

GetCap returns the queue's capacity

func (*FIFO) GetLen

func (st *FIFO) GetLen() int

GetLen returns the number of enqueued elements

func (*FIFO) IsLocked

func (st *FIFO) IsLocked() bool

IsLocked returns true whether the queue is locked

func (*FIFO) Lock

func (st *FIFO) Lock()

Lock // Locks the queue. No enqueue/dequeue operations will be allowed after this point.

func (*FIFO) Remove

func (st *FIFO) Remove(index int) error

Remove removes an element from the queue

func (*FIFO) Unlock

func (st *FIFO) Unlock()

Unlock unlocks the queue

type FixedFIFO

type FixedFIFO struct {
	// contains filtered or unexported fields
}

Fixed capacity FIFO (First In First Out) concurrent queue

func NewFixedFIFO

func NewFixedFIFO(capacity int) *FixedFIFO

func (*FixedFIFO) Dequeue

func (st *FixedFIFO) Dequeue() (interface{}, error)

Dequeue dequeues an element. Returns error if: queue is locked, queue is empty or internal channel is closed.

func (*FixedFIFO) DequeueOrWaitForNextElement

func (st *FixedFIFO) DequeueOrWaitForNextElement() (interface{}, error)

DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element gets enqueued and returns it. Multiple calls to DequeueOrWaitForNextElement() would enqueue multiple "listeners" for future enqueued elements.

func (*FixedFIFO) DequeueOrWaitForNextElementContext

func (st *FixedFIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (interface{}, error)

DequeueOrWaitForNextElementContext dequeues an element (if exist) or waits until the next element gets enqueued and returns it. Multiple calls to DequeueOrWaitForNextElementContext() would enqueue multiple "listeners" for future enqueued elements. When the passed context expires this function exits and returns the context' error

func (*FixedFIFO) Enqueue

func (st *FixedFIFO) Enqueue(value interface{}) error

Enqueue enqueues an element. Returns error if queue is locked or it is at full capacity.

func (*FixedFIFO) GetCap

func (st *FixedFIFO) GetCap() int

GetCap returns the queue's capacity

func (*FixedFIFO) GetLen

func (st *FixedFIFO) GetLen() int

GetLen returns queue's length (total enqueued elements)

func (*FixedFIFO) IsLocked

func (st *FixedFIFO) IsLocked() bool

func (*FixedFIFO) Lock

func (st *FixedFIFO) Lock()

func (*FixedFIFO) Unlock

func (st *FixedFIFO) Unlock()

type Queue

type Queue interface {
	// Enqueue element
	Enqueue(interface{}) error
	// Dequeue element
	Dequeue() (interface{}, error)
	// DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element gets enqueued and returns it.
	// Multiple calls to DequeueOrWaitForNextElement() would enqueue multiple "listeners" for future enqueued elements.
	DequeueOrWaitForNextElement() (interface{}, error)
	// DequeueOrWaitForNextElementContext dequeues an element (if exist) or waits until the next element gets enqueued and returns it.
	// Multiple calls to DequeueOrWaitForNextElementContext() would enqueue multiple "listeners" for future enqueued elements.
	// When the passed context expires this function exits and returns the context' error
	DequeueOrWaitForNextElementContext(context.Context) (interface{}, error)
	// Get number of enqueued elements
	GetLen() int
	// Get queue's capacity
	GetCap() int

	// Lock the queue. No enqueue/dequeue/remove/get operations will be allowed after this point.
	Lock()
	// Unlock the queue.
	Unlock()
	// Return true whether the queue is locked
	IsLocked() bool
}

Queue interface with basic && common queue functions

type QueueError

type QueueError struct {
	// contains filtered or unexported fields
}

func NewQueueError

func NewQueueError(code string, message string) *QueueError

func (*QueueError) Code

func (st *QueueError) Code() string

func (*QueueError) Error

func (st *QueueError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL