bigqueue

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2019 License: MIT Imports: 9 Imported by: 5

README

GoDoc MIT license Build Status codecov

Go Report Card golangci Codacy Badge Maintainability CodeFactor

bigqueue

bigqueue provides embedded, fast and persistent queue written in pure Go using memory mapped (mmap) files. bigqueue is currently not thread safe. Check out the roadmap for v0.3.0 for more details on progress on thread safety. To use bigqueue in parallel context, a Write lock needs to be acquired (even for Read APIs).

Installation

go get github.com/grandecola/bigqueue

Requirements

  • Only works for linux and darwin OS
  • Only works on Little Endian architecture

Usage

Create or open a bigqueue:

bq, err := bigqueue.NewMmapQueue("path/to/queue")
defer bq.Close()

bigqueue persists the data of the queue in multiple Arenas. Each Arena is a file on disk that is mapped into memory (RAM) using mmap syscall. Default size of each Arena is set to 128MB. It is possible to create a bigqueue with custom Arena size:

bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetArenaSize(4*1024))
defer bq.Close()

Bigqueue also allows setting up the maximum possible memory that it can use. By default, the maximum memory is set to [3 x Arena Size].

bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetArenaSize(4*1024), bigqueue.SetMaxInMemArenas(10))
defer bq.Close()

In this case, bigqueue will never allocate more memory than 4KB*10=40KB. This memory is above and beyond the memory used in buffers for copying data.

Bigqueue allows to set periodic flush based on either elapsed time or number of mutate (enqueue/dequeue) operations. Flush syncs the in memory changes of all memory mapped files with disk. This is a best effort flush. Elapsed time and number of mutate operations are only checked upon an enqueue/dequeue.

This is how you can set these options -

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushOps(2))

In this case, a flush is done after every two mutate operations.

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushDuration(time.Minute))

In this case, a flush is done after one minute elapses and an Enqueue/Dequeue is called.

Write to bigqueue:

err := bq.Enqueue([]byte("elem"))   // size = 1

bigqueue allows writing string data directly, avoiding conversion to []byte:

err := bq.EnqueueString("elem")   // size = 2

Read from bigqueue:

elem, err := bq.Peek()        // size = 2
err := bq.Dequeue()           // size = 1

we can also read string data from bigqueue:

elem, err := bq.PeekString()  // size = 1
err := bq.Dequeue()           // size = 0

Check whether bigqueue has non zero elements:

isEmpty := bq.IsEmpty()

Benchmarks

Benchmarks are run on a Lenovo P52s laptop (i7-8550U, 8 core @1.80GHz, 15.4GB RAM) having ubuntu 18.10, 64 bit machine.

Go version: 1.12

NewMmapQueue
BenchmarkNewMmapQueue/ArenaSize-4KB-8         	   50000	     39909 ns/op	    1381 B/op	      30 allocs/op
BenchmarkNewMmapQueue/ArenaSize-128KB-8       	   30000	     40594 ns/op	    1381 B/op	      30 allocs/op
BenchmarkNewMmapQueue/ArenaSize-4MB-8         	   30000	     40160 ns/op	    1381 B/op	      30 allocs/op
BenchmarkNewMmapQueue/ArenaSize-128MB-8       	   30000	     40510 ns/op	    1381 B/op	      30 allocs/op
Enqueue
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-8         	 2000000	       827 ns/op	      21 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-8         	 2000000	       814 ns/op	      21 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-1GB-8          	 2000000	       733 ns/op	      23 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-8      	 2000000	       742 ns/op	      21 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-16KB/MaxMem-40KB-8         	   20000	     93169 ns/op	    2586 B/op	      52 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-16KB/MaxMem-1GB-8          	   20000	     84426 ns/op	    2585 B/op	      52 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-16KB/MaxMem-NoLimit-8      	   20000	     81964 ns/op	    2585 B/op	      52 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-1MB/MaxMem-1GB-8           	     300	   5227199 ns/op	  165919 B/op	    3328 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-1MB/MaxMem-NoLimit-8       	     300	   5365171 ns/op	  165918 B/op	    3328 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-128B/MaxMem-384KB-8      	10000000	       153 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-128B/MaxMem-1.25MB-8     	10000000	       147 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-128B/MaxMem-1GB-8        	10000000	       132 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-128B/MaxMem-NoLimit-8    	10000000	       130 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-16KB/MaxMem-384KB-8      	  200000	     11989 ns/op	      80 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-16KB/MaxMem-1.25MB-8     	  100000	     11561 ns/op	      80 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-16KB/MaxMem-1GB-8        	  200000	     12661 ns/op	      80 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-16KB/MaxMem-NoLimit-8    	  200000	     12289 ns/op	      80 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-1MB/MaxMem-384KB-8       	    2000	    759625 ns/op	    5133 B/op	     104 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-1MB/MaxMem-1.25MB-8      	    2000	    760162 ns/op	    5133 B/op	     104 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-1MB/MaxMem-1GB-8         	    2000	    772780 ns/op	    5133 B/op	     104 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-1MB/MaxMem-NoLimit-8     	    2000	    731294 ns/op	    5133 B/op	     104 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128B/MaxMem-12MB-8         	10000000	       113 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128B/MaxMem-40MB-8         	20000000	       116 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128B/MaxMem-1GB-8          	20000000	       132 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128B/MaxMem-NoLimit-8      	20000000	       125 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-16KB/MaxMem-12MB-8         	  200000	      8446 ns/op	       2 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-16KB/MaxMem-40MB-8         	  200000	      8695 ns/op	       2 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-16KB/MaxMem-1GB-8          	  200000	      9203 ns/op	       2 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-16KB/MaxMem-NoLimit-8      	  200000	      9807 ns/op	       2 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-1MB/MaxMem-12MB-8          	    2000	    536200 ns/op	     154 B/op	       3 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-1MB/MaxMem-40MB-8          	    3000	    540404 ns/op	     155 B/op	       3 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-1MB/MaxMem-1GB-8           	    3000	    601541 ns/op	     155 B/op	       3 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-1MB/MaxMem-NoLimit-8       	    3000	    623102 ns/op	     155 B/op	       3 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-128B/MaxMem-256MB-8      	20000000	       121 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-128B/MaxMem-1.25GB-8     	20000000	       126 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-128B/MaxMem-NoLimit-8    	20000000	       128 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-16KB/MaxMem-256MB-8      	  200000	      8344 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-16KB/MaxMem-1.25GB-8     	  200000	      9063 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-16KB/MaxMem-NoLimit-8    	  200000	      9743 ns/op	       0 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-1MB/MaxMem-256MB-8       	    3000	    550256 ns/op	       4 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-1MB/MaxMem-1.25GB-8      	    3000	    611339 ns/op	       4 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-1MB/MaxMem-NoLimit-8     	    3000	    617378 ns/op	       4 B/op	       0 allocs/op

Dequeue (-benchtime=200us)
BenchmarkDequeue/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-8         	    1000	      1455 ns/op	      96 B/op	       3 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-8         	    5000	      5540 ns/op	     467 B/op	      16 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-128B/MaxMem-1GB-8          	    5000	        58.7 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-8      	    5000	        67.3 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-16KB/MaxMem-40KB-8         	     100	    372351 ns/op	   32789 B/op	    1165 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-16KB/MaxMem-1GB-8          	    2000	       184 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-16KB/MaxMem-NoLimit-8      	    3000	       188 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-1MB/MaxMem-1GB-8           	     500	       501 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-1MB/MaxMem-NoLimit-8       	     500	       507 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-128B/MaxMem-384KB-8      	    5000	        99.7 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-128B/MaxMem-1.25MB-8     	    5000	        64.4 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-128B/MaxMem-1GB-8        	    5000	        75.9 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-128B/MaxMem-NoLimit-8    	    5000	        70.2 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-16KB/MaxMem-384KB-8      	     100	      5444 ns/op	     135 B/op	       4 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-16KB/MaxMem-1.25MB-8     	     100	      4769 ns/op	      52 B/op	       1 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-16KB/MaxMem-1GB-8        	    3000	       138 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-16KB/MaxMem-NoLimit-8    	    3000	       146 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-1MB/MaxMem-384KB-8       	     100	    795956 ns/op	   65072 B/op	    2322 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-1MB/MaxMem-1.25MB-8      	     100	    822750 ns/op	   65072 B/op	    2322 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-1MB/MaxMem-1GB-8         	    1000	       373 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-1MB/MaxMem-NoLimit-8     	    1000	       381 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-128B/MaxMem-12MB-8         	    5000	        65.2 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-128B/MaxMem-40MB-8         	    5000	        68.5 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-128B/MaxMem-1GB-8          	    5000	        60.3 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-128B/MaxMem-NoLimit-8      	    5000	        64.5 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-16KB/MaxMem-12MB-8         	    3000	      2669 ns/op	       3 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-16KB/MaxMem-40MB-8         	    5000	      2732 ns/op	       4 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-16KB/MaxMem-1GB-8          	    3000	       150 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-16KB/MaxMem-NoLimit-8      	    3000	       134 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-1MB/MaxMem-12MB-8          	     100	    122439 ns/op	     563 B/op	      18 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-1MB/MaxMem-40MB-8          	     100	    130529 ns/op	     480 B/op	      16 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-1MB/MaxMem-1GB-8           	    2000	    212331 ns/op	    7781 B/op	     277 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-1MB/MaxMem-NoLimit-8       	    2000	       219 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-128B/MaxMem-256MB-8      	    5000	        63.1 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-128B/MaxMem-1.25GB-8     	    5000	        62.9 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-128B/MaxMem-NoLimit-8    	    5000	        71.0 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-16KB/MaxMem-256MB-8      	    3000	       149 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-16KB/MaxMem-1.25GB-8     	    5000	       134 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-16KB/MaxMem-NoLimit-8    	    3000	       133 ns/op	       0 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-1MB/MaxMem-256MB-8       	    2000	    131369 ns/op	      10 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-1MB/MaxMem-1.25GB-8      	    2000	    115991 ns/op	       6 B/op	       0 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-1MB/MaxMem-NoLimit-8     	    2000	       205 ns/op	       0 B/op	       0 allocs/op
Peek
BenchmarkPeek/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-8         	20000000	       117 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-8         	20000000	       113 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4KB/MessageSize-128B/MaxMem-1GB-8          	20000000	       109 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-8      	20000000	       136 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4KB/MessageSize-16KB/MaxMem-40KB-8         	  300000	      3862 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4KB/MessageSize-16KB/MaxMem-1GB-8          	  500000	      3858 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4KB/MessageSize-16KB/MaxMem-NoLimit-8      	  300000	      3878 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4KB/MessageSize-1MB/MaxMem-1GB-8           	   10000	    169672 ns/op	 1048576 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4KB/MessageSize-1MB/MaxMem-NoLimit-8       	   10000	    175354 ns/op	 1048576 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128KB/MessageSize-128B/MaxMem-384KB-8      	20000000	       109 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128KB/MessageSize-128B/MaxMem-1.25MB-8     	10000000	       132 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128KB/MessageSize-128B/MaxMem-1GB-8        	10000000	       114 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128KB/MessageSize-128B/MaxMem-NoLimit-8    	10000000	       129 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128KB/MessageSize-16KB/MaxMem-384KB-8      	  500000	      2848 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128KB/MessageSize-16KB/MaxMem-1.25MB-8     	  500000	      2859 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128KB/MessageSize-16KB/MaxMem-1GB-8        	  500000	      2841 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128KB/MessageSize-16KB/MaxMem-NoLimit-8    	  500000	      2937 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128KB/MessageSize-1MB/MaxMem-384KB-8       	    3000	    364824 ns/op	 1052850 B/op	      81 allocs/op
BenchmarkPeek/ArenaSize-128KB/MessageSize-1MB/MaxMem-1.25MB-8      	   10000	    165552 ns/op	 1048577 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128KB/MessageSize-1MB/MaxMem-1GB-8         	    5000	    302818 ns/op	 1048576 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128KB/MessageSize-1MB/MaxMem-NoLimit-8     	    5000	    278720 ns/op	 1048576 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4MB/MessageSize-128B/MaxMem-12MB-8         	10000000	       100 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4MB/MessageSize-128B/MaxMem-40MB-8         	20000000	       128 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4MB/MessageSize-128B/MaxMem-1GB-8          	10000000	       142 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4MB/MessageSize-128B/MaxMem-NoLimit-8      	10000000	       125 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4MB/MessageSize-16KB/MaxMem-12MB-8         	  500000	      2505 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4MB/MessageSize-16KB/MaxMem-40MB-8         	  500000	      2586 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4MB/MessageSize-16KB/MaxMem-1GB-8          	  500000	      2771 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4MB/MessageSize-16KB/MaxMem-NoLimit-8      	  500000	      2440 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4MB/MessageSize-1MB/MaxMem-12MB-8          	    5000	    201685 ns/op	 1048581 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4MB/MessageSize-1MB/MaxMem-40MB-8          	   10000	    202935 ns/op	 1048585 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4MB/MessageSize-1MB/MaxMem-1GB-8           	   10000	    204652 ns/op	 1048585 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-4MB/MessageSize-1MB/MaxMem-NoLimit-8       	   10000	    206010 ns/op	 1048585 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128MB/MessageSize-128B/MaxMem-256MB-8      	20000000	       121 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128MB/MessageSize-128B/MaxMem-1.25GB-8     	10000000	       157 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128MB/MessageSize-128B/MaxMem-NoLimit-8    	10000000	       117 ns/op	     128 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128MB/MessageSize-16KB/MaxMem-256MB-8      	 1000000	      2694 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128MB/MessageSize-16KB/MaxMem-1.25GB-8     	  500000	      2400 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128MB/MessageSize-16KB/MaxMem-NoLimit-8    	  500000	      2548 ns/op	   16384 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128MB/MessageSize-1MB/MaxMem-256MB-8       	   10000	    204232 ns/op	 1048583 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128MB/MessageSize-1MB/MaxMem-1.25GB-8      	   10000	    205590 ns/op	 1048585 B/op	       1 allocs/op
BenchmarkPeek/ArenaSize-128MB/MessageSize-1MB/MaxMem-NoLimit-8     	   10000	    204979 ns/op	 1048585 B/op	       1 allocs/op

Note: Before running benchmarks ulimit and vm.max_map_count parameters should be adjusted using below commands:

ulimit -n 50000
echo 262144 > /proc/sys/vm/max_map_count

Documentation

Overview

Package bigqueue provides embedded, fast and persistent queue written in pure Go using memory mapped file. bigqueue is currently not thread safe. To use bigqueue in parallel context, a Write lock needs to be acquired (even for Read APIs).

Create or open a bigqueue:

bq, err := bigqueue.NewQueue("path/to/queue")
defer bq.Close()

bigqueue persists the data of the queue in multiple Arenas. Each Arena is a file on disk that is mapped into memory (RAM) using mmap syscall. Default size of each Arena is set to 128MB. It is possible to create a bigqueue with custom Arena size:

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetArenaSize(4*1024))
defer bq.Close()

bigqueue also allows setting up the maximum possible memory that it can use. By default, the maximum memory is set to [3 x Arena Size].

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetArenaSize(4*1024), bigqueue.SetMaxInMemArenas(10))
defer bq.Close()

In this case, bigqueue will never allocate more memory than `4KB*10=40KB`. This memory is above and beyond the memory used in buffers for copying data.

Bigqueue allows to set periodic flush based on either elapsed time or number of mutate (enqueue/dequeue) operations. Flush syncs the in memory changes of all memory mapped files with disk. *This is a best effort flush*. Elapsed time and number of mutate operations are only checked upon an enqueue/dequeue.

This is how you can set these options -

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushOps(2))

In this case, a flush is done after every two mutate operations.

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushDuration(time.Minute))

In this case, a flush is done after one minute elapses and an Enqueue/Dequeue is called.

Write to bigqueue:

err := bq.Enqueue([]byte("elem"))   // size = 1

bigqueue allows writing string data directly, avoiding conversion to `[]byte`:

err := bq.EnqueueString("elem")   // size = 2

Read from bigqueue:

elem, err := bq.Peek()        // size = 1
err := bq.Dequeue()           // size = 0

we can also read string data from bigqueue:

elem, err := bq.PeekString()  // size = 1
err := bq.Dequeue()           // size = 0

Check whether bigqueue has non zero elements:

isEmpty := bq.IsEmpty()

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTooSmallArenaSize is returned when arena size is smaller than OS page size
	ErrTooSmallArenaSize = errors.New("too small arena size")
	// ErrTooFewInMemArenas is returned when number of arenas allowed in memory < 3
	ErrTooFewInMemArenas = errors.New("too few in memory arenas")
	// ErrMustBeGreaterThanZero is returned when a config value has non-positive value
	ErrMustBeGreaterThanZero = errors.New("must be greater than zero")
)
View Source
var (
	// ErrEmptyQueue is returned when peek/dequeue is performed on an empty queue
	ErrEmptyQueue = errors.New("queue is empty")
)
View Source
var (
	// ErrInvalidArenaSize is returned when persisted arena size
	// doesn't match with desired arena size
	ErrInvalidArenaSize = errors.New("mismatch in arena size")
)

Functions

This section is empty.

Types

type MmapQueue added in v0.3.0

type MmapQueue struct {
	// contains filtered or unexported fields
}

MmapQueue implements Queue interface

func (*MmapQueue) Close added in v0.3.0

func (q *MmapQueue) Close() error

Close will close index and arena manager

func (*MmapQueue) Dequeue added in v0.3.0

func (q *MmapQueue) Dequeue() error

Dequeue removes an element from the queue

func (*MmapQueue) Enqueue added in v0.3.0

func (q *MmapQueue) Enqueue(message []byte) error

Enqueue adds a new slice of byte element to the tail of the queue

func (*MmapQueue) EnqueueString added in v0.3.0

func (q *MmapQueue) EnqueueString(message string) error

EnqueueString adds a new string element to the tail of the queue

func (*MmapQueue) Flush added in v0.3.0

func (q *MmapQueue) Flush() error

Flush syncs the in memory content of bigqueue to disk

func (*MmapQueue) IsEmpty added in v0.3.0

func (q *MmapQueue) IsEmpty() bool

IsEmpty returns true when queue is empty

func (*MmapQueue) Peek added in v0.3.0

func (q *MmapQueue) Peek() ([]byte, error)

Peek returns the head (slice of bytes) of the queue

func (*MmapQueue) PeekString added in v0.3.0

func (q *MmapQueue) PeekString() (string, error)

PeekString returns the head (string) of the queue

type Option added in v0.2.0

type Option func(*bqConfig) error

Option is function type that takes a bqConfig object and sets various config parameters in the object

func SetArenaSize added in v0.2.0

func SetArenaSize(arenaSize int) Option

SetArenaSize returns an Option closure that sets the arena size

func SetMaxInMemArenas added in v0.2.0

func SetMaxInMemArenas(maxInMemArenas int) Option

SetMaxInMemArenas returns an Option closure that sets maximum number of Arenas that could reside in memory (RAM) at any time. By default, all the arenas reside in memory and Operating System takes care of memory by paging in and out the pages from disk. A recommended value of maximum arenas that should be in memory is chosen such that -

maxInMemArenas > 2 + (maximum message size / arena size)
maxInMemArenas < (total available system memory - 1GB) / arena size

func SetPeriodicFlushDuration added in v0.3.0

func SetPeriodicFlushDuration(flushPeriod time.Duration) Option

SetPeriodicFlushDuration returns an Option that sets a periodic flush every given duration after which the queue's in-memory changes will be flushed to disk. This is a best effort flush and elapsed time is checked upon an enqueue/dequeue only.

For durability this value should be low. For performance this value should be high.

func SetPeriodicFlushOps added in v0.3.0

func SetPeriodicFlushOps(flushMutOps int64) Option

SetPeriodicFlushOps returns an Option that sets the number of mutate operations (enqueue/dequeue) after which the queue's in-memory changes will be flushed to disk. This is a best effort flush and number of mutate operations is checked upon an enqueue/dequeue.

For durability this value should be low. For performance this value should be high.

type Queue added in v0.3.0

type Queue interface {
	IsEmpty() bool
	Dequeue() error
	Flush() error
	Close() error

	Peek() ([]byte, error)
	Enqueue([]byte) error
	PeekString() (string, error)
	EnqueueString(string) error
}

Queue provides an interface to big, fast and persistent queue

func NewMmapQueue added in v0.3.0

func NewMmapQueue(dir string, opts ...Option) (Queue, error)

NewMmapQueue constructs a new persistent queue

Directories

Path Synopsis
examples
basic command
config command
extsort command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL