bigqueue

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2020 License: MIT Imports: 9 Imported by: 5

README

GoDoc MIT license Build Status codecov

Go Report Card golangci Codacy Badge Maintainability CodeFactor

bigqueue

bigqueue provides embedded, fast and persistent queue written in pure Go using memory mapped (mmap) files. bigqueue is currently not thread safe. Check out the roadmap for v0.3.0 for more details on progress on thread safety. To use bigqueue in parallel context, a write lock needs to be acquired (even for Read APIs).

Installation

go get github.com/grandecola/bigqueue

Requirements

  • Only works for linux and darwin OS
  • Only works on Little Endian architecture

Usage

Standard API

Create or open a bigqueue:

bq, err := bigqueue.NewMmapQueue("path/to/queue")
defer bq.Close()

bigqueue persists the data of the queue in multiple Arenas. Each Arena is a file on disk that is mapped into memory (RAM) using mmap syscall. Default size of each Arena is set to 128MB. It is possible to create a bigqueue with custom Arena size:

bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetArenaSize(4*1024))
defer bq.Close()

Bigqueue also allows setting up the maximum possible memory that it can use. By default, the maximum memory is set to [3 x Arena Size].

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetArenaSize(4*1024),
	    bigqueue.SetMaxInMemArenas(10))
defer bq.Close()

In this case, bigqueue will never allocate more memory than 4KB*10=40KB. This memory is above and beyond the memory used in buffers for copying data.

Bigqueue allows to set periodic flush based on either elapsed time or number of mutate (enqueue/dequeue) operations. Flush syncs the in memory changes of all memory mapped files with disk. This is a best effort flush. Elapsed time and number of mutate operations are only checked upon an enqueue/dequeue.

This is how we can set these options:

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushOps(2))

In this case, a flush is done after every two mutate operations.

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushDuration(time.Minute))

In this case, a flush is done after one minute elapses and an Enqueue/Dequeue is called.

Write to bigqueue:

err := bq.Enqueue([]byte("elem"))

bigqueue allows writing string data directly, avoiding conversion to []byte:

err := bq.EnqueueString("elem")

Read from bigqueue:

elem, err := bq.Dequeue()

we can also read string data from bigqueue:

elem, err := bq.DequeueString()

Check whether bigqueue has non zero elements:

isEmpty := bq.IsEmpty()
Advanced API

bigqueue allows reading data from bigqueue using consumers similar to Kafka. This allows multiple consumers from reading data at different offsets (not in thread safe manner yet). The offsets of each consumer are persisted on disk and can be retrieved by creating a consumer with the same name. Data will be read from the same offset where it was left off.

We can create a new consumer as follows. The offsets of a new consumer are set at the start of the queue wherever the first non-deleted element is.

consumer, err := bq.NewConsumer("consumer")

We can also copy an existing consumer. This will create a consumer that will have the same offsets into the queue as that of the existing consumer.

copyConsumer, err := bq.FromConsumer("copyConsumer", consumer)

Now, read operations can be performed on the consumer:

isEmpty := consumer.IsEmpty()
elem, err := consumer.Dequeue()
elem, err := consumer.DequeueString()

Benchmarks

Benchmarks are run on a Lenovo P52s laptop (i7-8550U, 8 core @1.80GHz, 15.4GB RAM) having ubuntu 18.10, 64 bit machine.

Go version: 1.13

NewMmapQueue
BenchmarkNewMmapQueue/ArenaSize-4KB-8         	     259	   4336293 ns/op	    2578 B/op	      44 allocs/op
BenchmarkNewMmapQueue/ArenaSize-128KB-8       	     277	   4292180 ns/op	    2577 B/op	      44 allocs/op
BenchmarkNewMmapQueue/ArenaSize-4MB-8         	     282	   4279293 ns/op	    2575 B/op	      44 allocs/op
BenchmarkNewMmapQueue/ArenaSize-128MB-8       	     276	   4294212 ns/op	    2577 B/op	      44 allocs/op
Enqueue
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-8         	 1227482	       974 ns/op	      50 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-8         	 1227622	       990 ns/op	      50 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-8      	 1349326	       905 ns/op	      52 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-4KB/MaxMem-384KB-8       	  295298	      3629 ns/op	      49 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-4KB/MaxMem-1.25MB-8      	  335749	      3684 ns/op	      49 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-4KB/MaxMem-NoLimit-8     	  371170	      3407 ns/op	      51 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128KB/MaxMem-12MB-8        	   13934	     82812 ns/op	      49 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128KB/MaxMem-40MB-8        	   14103	     84175 ns/op	      49 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128KB/MaxMem-NoLimit-8     	   15004	     86985 ns/op	      52 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-4MB/MaxMem-256MB-8       	     450	   2908083 ns/op	      50 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-4MB/MaxMem-1.25GB-8      	     474	   3051462 ns/op	      49 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-4MB/MaxMem-NoLimit-8     	     469	   2928673 ns/op	      51 B/op	       1 allocs/op
EnqueueString
BenchmarkEnqueueString/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-8   	 1143330	      1067 ns/op	      34 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-8   	 1118235	      1111 ns/op	      34 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-8         	 1267702	     29356 ns/op	      36 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-384KB-8          	  333758	      3695 ns/op	      33 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-1.25MB-8         	  324952	      3810 ns/op	      33 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-NoLimit-8        	  361842	     90321 ns/op	      35 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-12MB-8           	   13420	     94311 ns/op	      33 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-40MB-8           	   13555	     87892 ns/op	      33 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-NoLimit-8        	   14716	    269216 ns/op	      36 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-256MB-8          	     393	   3820592 ns/op	      33 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-1.25GB-8         	     463	   4252438 ns/op	      34 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-NoLimit-8        	     386	   4935426 ns/op	      34 B/op	       1 allocs/op
Dequeue
BenchmarkDequeue/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-8                  	 1000000	      6303 ns/op	     176 B/op	       2 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-8                  	 1000000	      9283 ns/op	     176 B/op	       2 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-8               	 6215215	       208 ns/op	     160 B/op	       2 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-4KB/MaxMem-384KB-8                	  506739	      4813 ns/op	    4143 B/op	       2 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-4KB/MaxMem-1.25MB-8               	  517282	      6274 ns/op	    4143 B/op	       2 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-4KB/MaxMem-NoLimit-8              	  892928	      1341 ns/op	    4128 B/op	       2 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-128KB/MaxMem-12MB-8                 	   25336	     46375 ns/op	  131127 B/op	       2 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-128KB/MaxMem-40MB-8                 	   25876	     46788 ns/op	  131127 B/op	       2 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-128KB/MaxMem-NoLimit-8              	   36745	     34488 ns/op	  131104 B/op	       2 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-4MB/MaxMem-256MB-8                	     734	   1740006 ns/op	 4194386 B/op	       2 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-4MB/MaxMem-1.25GB-8               	     931	   1591828 ns/op	 4194375 B/op	       2 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-4MB/MaxMem-NoLimit-8              	     990	   1437580 ns/op	 4194336 B/op	       2 allocs/op
DequeueString
BenchmarkDequeueString/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-8            	 1000000	      6760 ns/op	     184 B/op	       3 allocs/op
BenchmarkDequeueString/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-8            	 1000000	      9584 ns/op	     184 B/op	       3 allocs/op
BenchmarkDequeueString/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-8         	 5069414	       247 ns/op	     168 B/op	       3 allocs/op
BenchmarkDequeueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-384KB-8          	  505219	      4913 ns/op	    4151 B/op	       3 allocs/op
BenchmarkDequeueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-1.25MB-8         	  499880	      6123 ns/op	    4151 B/op	       3 allocs/op
BenchmarkDequeueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-NoLimit-8        	  816019	      1398 ns/op	    4136 B/op	       3 allocs/op
BenchmarkDequeueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-12MB-8           	   25624	     45954 ns/op	  131135 B/op	       3 allocs/op
BenchmarkDequeueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-40MB-8           	   25681	     45620 ns/op	  131135 B/op	       3 allocs/op
BenchmarkDequeueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-NoLimit-8        	   36438	     34198 ns/op	  131112 B/op	       3 allocs/op
BenchmarkDequeueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-256MB-8          	     708	   1688158 ns/op	 4194398 B/op	       3 allocs/op
BenchmarkDequeueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-1.25GB-8         	     966	   2062903 ns/op	 4194384 B/op	       3 allocs/op
BenchmarkDequeueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-NoLimit-8        	    1008	   1469626 ns/op	 4194344 B/op	       3 allocs/op

Note: Before running benchmarks ulimit and vm.max_map_count parameters should be adjusted using below commands:

ulimit -n 50000
echo 262144 > /proc/sys/vm/max_map_count

Documentation

Overview

Package bigqueue provides embedded, fast and persistent queue written in pure Go using memory mapped file. bigqueue is currently not thread safe. To use bigqueue in parallel context, a Write lock needs to be acquired (even for Read APIs).

Create or open a bigqueue:

bq, err := bigqueue.NewQueue("path/to/queue")
defer bq.Close()

bigqueue persists the data of the queue in multiple Arenas. Each Arena is a file on disk that is mapped into memory (RAM) using mmap syscall. Default size of each Arena is set to 128MB. It is possible to create a bigqueue with custom Arena size:

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetArenaSize(4*1024))
defer bq.Close()

bigqueue also allows setting up the maximum possible memory that it can use. By default, the maximum memory is set to [3 x Arena Size].

 bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetArenaSize(4*1024),
	     bigqueue.SetMaxInMemArenas(10))
 defer bq.Close()

In this case, bigqueue will never allocate more memory than `4KB*10=40KB`. This memory is above and beyond the memory used in buffers for copying data.

Bigqueue allows to set periodic flush based on either elapsed time or number of mutate (enqueue/dequeue) operations. Flush syncs the in memory changes of all memory mapped files with disk. *This is a best effort flush*. Elapsed time and number of mutate operations are only checked upon an enqueue/dequeue.

This is how we can set these options:

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushOps(2))

In this case, a flush is done after every two mutate operations.

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushDuration(time.Minute))

In this case, a flush is done after one minute elapses and an Enqueue/Dequeue is called.

Write to bigqueue:

err := bq.Enqueue([]byte("elem"))   // size = 1

bigqueue allows writing string data directly, avoiding conversion to `[]byte`:

err := bq.EnqueueString("elem")   // size = 2

Read from bigqueue:

elem, err := bq.Dequeue()

we can also read string data from bigqueue:

elem, err := bq.DequeueString()

Check whether bigqueue has non zero elements:

isEmpty := bq.IsEmpty()

bigqueue allows reading data from bigqueue using consumers similar to Kafka. This allows multiple consumers from reading data at different offsets (not in thread safe manner yet). The offsets of each consumer are persisted on disk and can be retrieved by creating a consumer with the same name. Data will be read from the same offset where it was left off.

We can create a new consumer as follows. The offsets of a new consumer are set at the start of the queue wherever the first non-deleted element is.

consumer, err := bq.NewConsumer("consumer")

We can also copy an existing consumer. This will create a consumer that will have the same offsets into the queue as that of the existing consumer.

copyConsumer, err := bq.FromConsumer("copyConsumer", consumer)

Now, read operations can be performed on the consumer:

isEmpty := consumer.IsEmpty()
elem, err := consumer.Dequeue()
elem, err := consumer.DequeueString()

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidArenaSize is returned when persisted arena size
	// doesn't match with desired arena size.
	ErrInvalidArenaSize = errors.New("mismatch in arena size")
	// ErrDifferentQueues is returned when caller wants to copy
	// offsets from a consumer from a different queue.
	ErrDifferentQueues = errors.New("consumers from different queues")
)
View Source
var (
	// ErrTooSmallArenaSize is returned when arena size is smaller than OS page size.
	ErrTooSmallArenaSize = errors.New("too small arena size")
	// ErrTooFewInMemArenas is returned when number of arenas allowed in memory < 3.
	ErrTooFewInMemArenas = errors.New("too few in memory arenas")
)
View Source
var (
	// ErrEmptyQueue is returned when dequeue is performed on an empty queue.
	ErrEmptyQueue = errors.New("queue is empty")
)
View Source
var (
	// ErrIncompatibleVersion is returned when file format is older/newer.
	ErrIncompatibleVersion = errors.New("incompatible format of the code and data")
)

Functions

This section is empty.

Types

type Consumer added in v0.4.0

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a bigqueue consumer that allows reading data from bigqueue. A consumer is represented using just a base offset into the metadata

func (*Consumer) Dequeue added in v0.4.0

func (c *Consumer) Dequeue() ([]byte, error)

Dequeue removes an element from the queue and returns it.

func (*Consumer) DequeueString added in v0.4.0

func (c *Consumer) DequeueString() (string, error)

DequeueString removes a string element from the queue and returns it.

func (*Consumer) IsEmpty added in v0.4.0

func (c *Consumer) IsEmpty() bool

IsEmpty returns true when queue is empty for the consumer.

type MmapQueue added in v0.3.0

type MmapQueue struct {
	// contains filtered or unexported fields
}

MmapQueue implements Queue interface.

func NewMmapQueue added in v0.3.0

func NewMmapQueue(dir string, opts ...Option) (*MmapQueue, error)

NewMmapQueue constructs a new persistent queue.

func (*MmapQueue) Close added in v0.3.0

func (q *MmapQueue) Close() error

Close will close metadata and arena manager.

func (*MmapQueue) Dequeue added in v0.3.0

func (q *MmapQueue) Dequeue() ([]byte, error)

Dequeue removes an element from the queue and returns it. This function uses the default consumer to consume from the queue.

func (*MmapQueue) DequeueString added in v0.4.0

func (q *MmapQueue) DequeueString() (string, error)

DequeueString removes a string element from the queue and returns it. This function uses the default consumer to consume from the queue.

func (*MmapQueue) Enqueue added in v0.3.0

func (q *MmapQueue) Enqueue(message []byte) error

Enqueue adds a new slice of byte element to the tail of the queue.

func (*MmapQueue) EnqueueString added in v0.3.0

func (q *MmapQueue) EnqueueString(message string) error

EnqueueString adds a new string element to the tail of the queue.

func (*MmapQueue) Flush added in v0.3.0

func (q *MmapQueue) Flush() error

Flush syncs the in memory content of bigqueue to disk.

func (*MmapQueue) FromConsumer added in v0.4.0

func (q *MmapQueue) FromConsumer(name string, from *Consumer) (*Consumer, error)

FromConsumer creates a new consumer or finds an existing one with same name. It also copies the offsets from the given consumer to this consumer.

func (*MmapQueue) IsEmpty added in v0.3.0

func (q *MmapQueue) IsEmpty() bool

IsEmpty returns true when queue is empty for the default consumer.

func (*MmapQueue) NewConsumer added in v0.4.0

func (q *MmapQueue) NewConsumer(name string) (*Consumer, error)

NewConsumer creates a new consumer or finds an existing one with same name.

type Option added in v0.2.0

type Option func(*bqConfig) error

Option is function type that takes a bqConfig object and sets various config parameters in the object.

func SetArenaSize added in v0.2.0

func SetArenaSize(arenaSize int) Option

SetArenaSize returns an Option closure that sets the arena size.

func SetMaxInMemArenas added in v0.2.0

func SetMaxInMemArenas(maxInMemArenas int) Option

SetMaxInMemArenas returns an Option closure that sets maximum number of Arenas that could reside in memory (RAM) at any time. By default, all the arenas reside in memory and Operating System takes care of memory by paging in and out the pages from disk. A recommended value of maximum arenas that should be in memory should be chosen such that:

maxInMemArenas > (# of consumers) * 2 + 1

func SetPeriodicFlushDuration added in v0.3.0

func SetPeriodicFlushDuration(flushPeriod time.Duration) Option

SetPeriodicFlushDuration returns an Option that sets a periodic flush every given duration after which the queue's in-memory changes will be flushed to disk. This is a best effort flush and elapsed time is checked upon an enqueue/dequeue only. If the value is set to <= 0, no periodic flush will be performed.

For durability this value should be low. For performance this value should be high.

func SetPeriodicFlushOps added in v0.3.0

func SetPeriodicFlushOps(flushMutOps int64) Option

SetPeriodicFlushOps returns an Option that sets the number of mutate operations (enqueue/dequeue) after which the queue's in-memory changes will be flushed to disk. This is a best effort flush and number of mutate operations is checked upon an enqueue/dequeue. If the value is set to <= 0, no periodic flush will be performed.

For durability this value should be low. For performance this value should be high.

Directories

Path Synopsis
examples
basic command
config command
consumer command
extsort command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL