goqueue

package module
v1.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2023 License: MIT Imports: 5 Imported by: 7

README

github.com/antonio-alexander/go-queue

go-queue, is a FIFO data structure that is a functional replacement for channels. It's opinion is that channels are good for synchronously signaling that there is data available, but not good at communicating that data. There are a number of API that completely separate this signaling from the destructive (or non-destructive) reading of data. I think that go-queue should be used in situations where a channel is long-lived and/or you require non-destructive access to data while maintaining the FIFO.

Here are some common situations where go-queue functionality would be advantageous to using channels:

  • If you want to "peek" at the data at the head of the queue to perform work on it before removing it from the queue (e.g. if the attempted "work" on that item failed, you've already removed it from the channel so you can't put it back)
  • If you want to put an item at the front of the queue when there are items in the queue
  • If you want to remove all data from the queue at once (e.g. this is almost 100% necessary for high throughput then the consumer can run faster than the producer)
  • You're using the producer/consumer pattern and you want to avoid polling (you can use a select case with a time.After or ticker, but you're still forced to "poll" whether the channel has data in it)
  • You have need of a channel, but your business logic needs the channel's size to grow at runtime (I think this is a code smell, but I've dealt with stranger things)
  • You want to know how many items are in the channel

Queue interfaces

go-queue is separated into a high-level/common go module github.com/antonio-alexander/go-queue where the interfaces (described below) and tests are defined that can be imported/used by anyone attempting to implement those interfaces.

If it's not obvious, the goal of this separation of ownership of interfaces is used such that anyone using queues depend on the interface, not the implementation

Keep in mind that some of these functions are dependent on the underlying implementation; for example overflow and capacity will have different output depending on if the queue is finite or infinite.

Owner, similar to GarbageCollector(), defines functions that operate on the underlying pointer. The Close() function will ready the underlying pointer for garbage collection and return any items that remain in the queue.

type Owner interface {
    Close() (items []interface{})
}

GarbageCollecter can be used to perform a kind of defragmentation of memory. Generally because the queue implementations are backed by a slice, depending on how the data is put within that slice (e.g. NOT a pointer) periodic destruction and re-creation of the slice can allow garbage collection.

type GarbageCollecter interface {
    GarbageCollect()
}

Dequeuer can be used to destructively remove one or more items from the queue, underflow will be true if the queue is empty. In the event the queue is empty, the output of items and flush will have a length of zero. Once an item is removed, it loses its place in the fifo and it's order can't be guaranteed.

type Dequeuer interface {
    Dequeue() (item interface{}, underflow bool)
    DequeueMultiple(n int) (items []interface{})
    Flush() (items []interface{})
}

Peeker can be used to non-destructively remove one or more items from the queue. Underflow is true if there are no items in the queue.

type Peeker interface {
    Peek() (items []interface{})
    PeekHead() (item interface{}, underflow bool)
    PeekFromHead(n int) (items []interface{})
}

Enqueuer can be used to put one or more item in the queue, overflow is true if the queue is full.

type Enqueuer interface {
    Enqueue(item interface{}) (overflow bool)
    EnqueueMultiple(items []interface{}) (itemsRemaining []interface{}, overflow bool)
}

EnqueueInFronter can be used to place a single item at teh front of the queue, if the queue is full overflow will be true. Note that this won't "add" an item to the queue if its full.

type EnqueueInFronter interface {
    EnqueueInFront(item interface{}) (overflow bool)
}

Info can be used to return information about the queue such as how many items are in the queue, or the current "size" of the queue.

type Info interface {
    Length() (size int)
    Capacity() (capacity int)
}

Event can be used to get a read-only signal channel that will signal with an empty struct whenever data is put "in" to the queue or taken "out" of the queue. These are very useful in avoiding polling in certain patterns.

type Event interface {
    GetSignalIn() (signal <-chan struct{})
    GetSignalOut() (signal <-chan struct{})
}

Patterns

These are a handful of patterns that can be used to get data out of and into the queue using the given interfaces. Almost all of these patterns are based on the producer/consumer design patterns and variants of it.

All of these patterns assume that the queue is of a fixed size. Some of them don't make sense for infinite queues.

This is a producer "polling" pattern, it will enqueue data at the rate of the producer ticker. The "in" is fairly straight forward, keep in mind that you don't have to perform any conversion for the data in since it's an empty interface. Just be careful about using non-scalar values, I think it's a good practice to keep items in the queue 1:1.

Pros:

  • Immediate feedback if the queue is full (via overflow)

Cons:

  • There's no type safety for enqueing data (be careful)
var queue goqueue.Enquerer

tProduce := time.NewTicker(time.Second)
defer tProduce.Stop()
for {
    select {
    case <-tProduce.C:
        tNow := time.Now()
        if overflow := queue.Enqueue(tNow); !overflow {
            fmt.Printf("enqueued: %v\n", tNow)
        }
    }
}

This is a polling producer pattern that handles situations where the queue could be full meaning that the data in the queue is being produced faster than it can be consumed.

Pros:

  • This ensures that even if data is being consumed slower than it's being produced, you don't lose any data (but you can't produce as fast as you can consume...)

Cons:

  • Because this uses "polling", it can only check as fast as the ticker, so you could hypothetically sacrifice CPU cycles for data integrity.
var queue goqueue.Enquerer

tProduce := time.NewTicker(time.Second)
defer tProduce.Stop()
<-start
for {
    select {
    case <-tProduce.C:
        tNow := time.Now()
        for overflow := queue.Enqueue(tNow); !overflow; {
            fmt.Println("overflow occured")
            <-time.After(time.Millisecond)
            overflow = queue.Enqueue(tNow)
        }
    case <-stopper:
        return
    }
}

Alternatively, this is an event-based producer pattern that handles situations where the queue could be full and is a little more efficient in terms of cpu usage; just keep in mind that if there are multiple producers, there's no guarantee that once you get the signal the queue won't be full.

Cons:

  • This means that you're producing faster than you can consume, this only makes sense in a go routine, but it generally means that you should increase the size of your queue
  • Has the potential to block forever, make sure that you have some way to stop it (e.g. a stopper signal channel)
var queue interface{
    goqueue.Enquerer
    goqueue.Event
}

signal := queue.GetSignalIn()
tProduce := time.NewTicker(time.Second)
defer tProduce.Stop()
<-start
for {
    select {
    case <-tProduce.C:
        tNow := time.Now()
        for overflow := queue.Enqueue(tNow); !overflow; {
            fmt.Println("overflow occured")
            <-signal
            overflow = queue.Enqueue(tNow)
        }
    case <-stopper:
        return
    }
}

This is a consumer polling pattern, it will dequeue data at the rate of the consumer ticker. This out is rather annoying in that it outputs an empty interface, and you need to know how to cast that into the appropriate data type. Type switch case is the most elegant solution when you have more than one data types.

Be careful to NOT use anonymous structs that travel between package boundaries (they aren't always equivalent).

Although this works, this has the down-side that you're limited at being able to consume the data no faster than 1Hz. If data is produced any faster, it could fall significantly behind. If data is produced any slower, you waste cpu cycles with an underflow. Its simply not super efficient if the producer and consumer aren't at approximately the same speed.

var queue goqueue.Dequeuer

tConsume := time.NewTicker(time.Second)
defer tConsume.Stop()
for {
    select {
    case <-tConsume.C:
        if item, underflow := queue.Dequeue(); !underflow {
            switch v := item.(type) {
                default:
                    fmt.Printf("unsupported type: %T\n", v)
                case time.Time, *time.Time:
                    fmt.Printf("dequeued: %v\n", v)
            }
        }
    }
}

This is a consumer event-based pattern, where it will wait until data is placed INTO the queue, each time data is placed into the queue, a signal is received, which will then "trigger" the logic to dequeue. This works really well and lets the loop run as fast as data is being produced.

This works really well, but it has the down-side that it could miss a signal if data is produced faster than it can be consumed.

var queue interface{
    goqueue.Dequeuer
    goqueue.Event
}

signal := queue.GetSignalIn()
tConsume := time.NewTicker(time.Second)
defer tConsume.Stop()
for {
    select {
      case <-signal:
        if item, underflow := queue.Dequeue(); !underflow {
            switch v := item.(type) {
                default:
                    fmt.Printf("unsupported type: %T\n", v)
                case time.Time, *time.Time:
                    fmt.Printf("dequeued: %v\n", v)
            }
        }
    }
}

This is a high throughput design pattern; it handles the "scalar" problem by being able to consume significantly more data per cycle; it also handles the "I missed a signal" problem, by ALSO using polling to consume the data. The Flush() function allows you to process all available data at once.

This pattern ensures that no data is lost and that you consume data faster or as fast as you produce it.

var queue interface{
    goqueue.Dequeuer
    goqueue.Event
}

signal := queue.GetSignalIn()
consumeFx := func(items []interface{}) {
    for _, item := range items {
        switch v := item.(type) {
            default:
                fmt.Printf("unsupported type: %T\n", v)
            case time.Time, *time.Time:
                fmt.Printf("dequeued: %v\n", v)
        }
    }
}
tConsume := time.NewTicker(time.Second)
defer tConsume.Stop()
for {
    select {
      case <-tConsume.C:
        items := queue.Flush()
        consumeFx(items)
      case <-signal:
        items := queue.Flush()
        consumeFx(items)
    }
}

Testing

The existing tests are implemented as "code" and can be used within your implementation's tests to "confirm" that they implement the interfaces as expected by "this" version of the go-queue package.

Take note that there isn't an enqueue test, this is because that's fairly specific to the implementation.

These are the avaialble unit tests:

  • New: can be used to verify the constructor
  • GarbageCollect: can be used to verify garbage collection
  • Dequeue: can be used to verify dequeue
  • DequeueMultiple: can be used to verify dequeue multiple
  • Flush: can be used to verify flush
  • Peek: can be used to verify peek
  • PeekFromHead: can be used to verify peek from head

These are the available function/integration tests:

  • Event: can be used to verify that event signals work as expected
  • Info: can be used to verify that info works as expected (finite leaning)
  • Queue: can be used to verify that queue works as expected (in general)
  • Async: can be used to verify if safe for concurrent usage

To use one of the tests, you can use the following code snippet. Keep in mind that in order to test, the queue/constructor needs to implement ALL of the interfaces expected by the test (and by association they need to implement those interfaces as expected).

import (
    "testing"

    goqueue "github.com/antonio-alexander/go-queue"
    finite "github.com/antonio-alexander/go-queue/finite"

    goqueue_tests "github.com/antonio-alexander/go-queue/tests"
)

func TestQueue(t *testing.T) {
    goqueue_tests.Queue(t, func(size int) interface {
        goqueue.Owner
        goqueue.Enqueuer
        goqueue.Dequeuer
        goqueue.Info
    } {
        return finite.New(size)
    })
}

Finite Queue

This is a fixed size fifo, when the queue is full, it won't allow you to place any more items inside the queue (sans EnqueueLossy). For more information, look at this README.md.

Infinite Queue

This is a queue that starts with a fixed size, but when that queue fills up, it'll grow by the initially configured grow size. For more information, look at this README.md.

Documentation

Overview

Package goqueue provides common types and functions used by all implementations of a queue

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AssertExamples added in v1.2.2

func AssertExamples(example *Example, examples []*Example) func() bool

func MustDequeue added in v1.2.2

func MustDequeue(queue Dequeuer, done <-chan struct{}, rate time.Duration) (interface{}, bool)

MustDequeue will attempt to dequeue at least one item at the rate configured until the done channel signals. KIM: It's possible to provide a nil channel and this function will block (forever) until a dequeue is successful

func MustDequeueEvent added in v1.2.2

func MustDequeueEvent(queue interface {
	Dequeuer
	Event
}, done <-chan struct{}) (interface{}, bool)

func MustDequeueMultiple added in v1.2.2

func MustDequeueMultiple(queue Dequeuer, done <-chan struct{}, n int, rate time.Duration) []interface{}

func MustDequeueMultipleEvent added in v1.2.2

func MustDequeueMultipleEvent(queue interface {
	Dequeuer
	Event
}, done <-chan struct{}, n int) []interface{}

func MustEnqueue added in v1.2.2

func MustEnqueue(queue Enqueuer, item interface{}, done <-chan struct{}, rate time.Duration) bool

MustEnqueue will attempt to use the Enqueue() function until the enqueue is successful (no overflow); this function will block until success occurs or the done channel receives a signal. An enqueue will attempt to occur at the rate configured

func MustEnqueueEvent added in v1.2.2

func MustEnqueueEvent(queue interface {
	Enqueuer
	Event
}, item interface{}, done <-chan struct{}) bool

MustEnqueue will attempt to use the Enqueue() function until the enqueue is successful (no overflow); this function will block until success occurs or the done channel receives a signal. An enqueue will be attempted for every signal received

func MustEnqueueMultiple added in v1.2.2

func MustEnqueueMultiple(queue Enqueuer, items []interface{}, done <-chan struct{}, rate time.Duration) ([]interface{}, bool)

MustEnqueueMultiple will attempt to enqueue until the done channel completes,

at the configured rate or the number of elements are successfully enqueued
into the provided queue

KIM: this function doesn't preserve the unit of work and may not be consistent with concurent usage (although it is safe)

func MustEnqueueMultipleEvent added in v1.2.2

func MustEnqueueMultipleEvent(queue interface {
	Enqueuer
	Event
}, items []interface{}, done <-chan struct{}) ([]interface{}, bool)

MustEnqueueMultipleEvent will attempt to enqueue one or more items, upon initial failure, it'll use the event channels/signals to attempt to enqueue items KIM: this function doesn't preserve the unit of work and may not be consistent with concurent usage (although it is safe)

func MustFlush added in v1.2.2

func MustFlush(queue Dequeuer, done <-chan struct{}, rate time.Duration) []interface{}

func MustFlushEvent added in v1.2.2

func MustFlushEvent(queue interface {
	Dequeuer
	Event
}, done <-chan struct{}) []interface{}

func MustPeek added in v1.2.2

func MustPeek(queue Peeker, done <-chan struct{}, rate time.Duration) []interface{}

func MustPeekEvent added in v1.2.2

func MustPeekEvent(queue interface {
	Peeker
	Event
}, done <-chan struct{}) []interface{}

func MustPeekFromHead added in v1.2.2

func MustPeekFromHead(queue Peeker, done <-chan struct{}, n int, rate time.Duration) []interface{}

func MustPeekFromHeadEvent added in v1.2.2

func MustPeekFromHeadEvent(queue interface {
	Peeker
	Event
}, done <-chan struct{}, n int) []interface{}

func MustPeekHead added in v1.2.2

func MustPeekHead(queue Peeker, done <-chan struct{}, rate time.Duration) (interface{}, bool)

func MustPeekHeadEvent added in v1.2.2

func MustPeekHeadEvent(queue interface {
	Peeker
	Event
}, done <-chan struct{}) (interface{}, bool)

Types

type BinaryMarshaler added in v1.2.0

type BinaryMarshaler = encoding.BinaryMarshaler

These types are specifically provided to attempt to communicate support for how queues would be able to store data in a persistent way no matter the data type (empty interface)

type BinaryUnmarshaler added in v1.2.0

type BinaryUnmarshaler = encoding.BinaryUnmarshaler

These types are specifically provided to attempt to communicate support for how queues would be able to store data in a persistent way no matter the data type (empty interface)

type Bytes added in v1.2.0

type Bytes []byte

Bytes is provided to make it easier to create jagged arrays; two dimensional arrays are nice, but they work off the idea that each row has the same number of elements which doesnt work for the use case for a queue... KIM: Bytes is a type that can be used to traverse package boundaries unlike an anonymous struct or Bytes defined by some other package

type Dequeuer

type Dequeuer interface {
	Dequeue() (item interface{}, underflow bool)
	DequeueMultiple(n int) (items []interface{})
	Flush() (items []interface{})
}

Dequeuer can be used to destructively remove one or more items from the queue, it can remove one item via Dequeue(), multiple items via DequeueMultiple() or all items using Flush() underflow will be true if the queue is empty

type EnqueueInFronter

type EnqueueInFronter interface {
	EnqueueInFront(item interface{}) (overflow bool)
}

EnqueueInFronter describes an operation where you enqueue a single item at the front of the queue, if the queue is full overflow will be true

type Enqueuer

type Enqueuer interface {
	Enqueue(item interface{}) (overflow bool)
	EnqueueMultiple(items []interface{}) (itemsRemaining []interface{}, overflow bool)
}

Enqueuer can be used to put one or more items into the queue Enqueue() can be used to place one item while EnqueueMultiple() can be used to place multiple items, in the event the queue is full the remaining items will be provided (if applicable) and overflow will be true

type Event

type Event interface {
	GetSignalIn() (signal <-chan struct{})
	GetSignalOut() (signal <-chan struct{})
}

Event can be used to get a read-only signal that would indicate whether data was removed from the queue (out) or put into the queue (in). Keep in mind that whether the channel is buffered or un-buffered depends on the underlying implementation

type Example added in v1.2.0

type Example struct {
	Int    int     `json:"int,omitempty"`
	Float  float64 `json:"float,omitempty,string"`
	String string  `json:"string,omitempty"`
}

func ExampleClose added in v1.2.0

func ExampleClose(queue Owner) []*Example

func ExampleConvertMultiple added in v1.2.0

func ExampleConvertMultiple(items []interface{}) []*Example

func ExampleConvertSingle added in v1.2.0

func ExampleConvertSingle(item interface{}) *Example

func ExampleDequeue added in v1.2.0

func ExampleDequeue(queue Dequeuer) (*Example, bool)

func ExampleDequeueMultiple added in v1.2.0

func ExampleDequeueMultiple(queue Dequeuer, n int) []*Example

func ExampleEnqueueMultiple added in v1.2.0

func ExampleEnqueueMultiple(queue Enqueuer, values []*Example) ([]*Example, bool)

func ExampleFlush added in v1.2.0

func ExampleFlush(queue Dequeuer) []*Example

func ExampleGen added in v1.2.2

func ExampleGen(sizes ...int) []*Example

func ExampleGenFloat64 added in v1.2.0

func ExampleGenFloat64(sizes ...int) []*Example

ExampleGenFloat64 will generate a random number of random float values if n is equal to 0 not to exceed the constant TestMaxExamples, if n is provided, it will generate that many items

func ExampleGenInt added in v1.2.2

func ExampleGenInt(sizes ...int) []*Example

func ExampleGenString added in v1.2.2

func ExampleGenString(sizes ...int) []*Example

func ExamplePeek added in v1.2.0

func ExamplePeek(queue Peeker) []*Example

func ExamplePeekFromHead added in v1.2.0

func ExamplePeekFromHead(queue Peeker, n int) []*Example

func ExamplePeekHead added in v1.2.0

func ExamplePeekHead(queue Peeker) (*Example, bool)

func (*Example) MarshalBinary added in v1.2.0

func (v *Example) MarshalBinary() ([]byte, error)

func (*Example) UnmarshalBinary added in v1.2.0

func (v *Example) UnmarshalBinary(bytes []byte) error

type GarbageCollecter

type GarbageCollecter interface {
	GarbageCollect()
}

GarbageCollecter can be implemented to re-create the underlying pointers so that they can be garabge collected, you can think of this as creating an opportunity to defrag the memory

type Length added in v1.2.0

type Length interface {
	Length() (size int)
}

Length can be used to determine how many items are inside a queue at any given time

type Owner

type Owner interface {
	Close() (items []interface{})
}

Owner provides functions that directly affect the underlying pointers and data structures of a queue pointers. The Close() function should ready the underlying pointer for garbage collection and return a slice of any items that remain in the queue

type Peeker

type Peeker interface {
	Peek() (items []interface{})
	PeekHead() (item interface{}, underflow bool)
	PeekFromHead(n int) (items []interface{})
}

Peeker can be used to non-destructively remove one or more items from the queue, it can remove all items via Peek(), remove an item from the front of the queue via PeekHead() or remove multiple items via PeekFromHead(). Underflow will be true, if the queue is empty

Directories

Path Synopsis
Package finite provides common types and functions used by a finite queue implementation
Package finite provides common types and functions used by a finite queue implementation
tests
Package finite_tests provides a test suite for finite queues
Package finite_tests provides a test suite for finite queues
Package infinite provides common types and functions used by a infinite queue implementation
Package infinite provides common types and functions used by a infinite queue implementation
tests
Package infinite_tests provides a test suite for infinite queues
Package infinite_tests provides a test suite for infinite queues
Package goqueue_tests provides a test suite for general queues
Package goqueue_tests provides a test suite for general queues

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL