package module
Version: v0.0.0-...-e2a56d9 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2017 License: MIT Imports: 8 Imported by: 23


Note: This project is no longer actively maintained. Please refer to its spiritual successor rmq.



Build Status godoc

What is this

This is a fast, persistent, atomic message queue implementation that uses redis as its storage engine written in go. It uses atomic list commands to ensure that messages are delivered only once in the right order without being lost by crashing consumers.

Details can be found in the blog post about its initial design:

A second article desribes the performance improvements of the current version:

What it's not

It's not a standalone server that you can use as a message queue, at least not for now. The implementation is done purely client side. All message queue commands are "translated" into redis commands and then executed via a redis client.

If you want to use this with any other language than go you have to translate all of the commands into your language of choice.

How to use it

All most all use cases are either covered in the examples or in the tests.

So the best idea is just to read those and figure it from there. But in any case:


To get started you need a running redis server. Since the tests run FlushDB() an otherwise unused database is highly recommended The first step is to create a new queue:

package main

import (

func main() {
	testQueue := redismq.CreateQueue("localhost", "6379", "", 9, "clicks")

To write into the queue you simply use Put():

	testQueue := redismq.CreateQueue("localhost", "6379", "", 9, "clicks")

The payload can be any kind of string, yes even a 10MB one.

To get messages out of the queue you need a consumer:

	consumer, err := testQueue.AddConsumer("testconsumer")
	if err != nil {
	package, err := consumer.Get()
	if err != nil {

Payload will hold the original string, while package will have some additional header information.

To remove a package from the queue you have to Ack() it:

	package, err := consumer.Get()
	if err != nil {
	err = package.Ack()
	if err != nil {
Buffered Queues

When input speed is of the essence BufferedQueues will scratch that itch. They pipeline multiple puts into one fast operation. The only issue is that upon crashing or restart the packages in the buffer that haven't been written yet will be lost. So it's advised to wait one second before terminating your program to flush the buffer.

The usage is as easy as it gets:

	bufferSize := 100
	testQueue := redismq.CreateBufferedQueue("localhost", "6379", "", 9, "clicks", bufferSize)

Put() and Get() stay exactly the same. I have found anything over 200 as bufferSize not to increase performance any further.

To ensure that no packages are left in the buffer when you shut down your program you need to call FlushBuffer() which will tell the queue to flush the buffer and wait till it's empty.

Multi Get

Like BufferedQueues for Get() MultiGet() speeds up the fetching of messages. The good news it comes without the buffer loss issues.

Usage is pretty straight forward with the only difference being the MultiAck():

	packages, err := consumer.MultiGet(100)
	if err != nil {
	for i := range packages {

MultiAck() can be called on any package in the array with all the prior packages being "acked". This way you can Fail() single packages.

Reject and Failed Queues

Similar to AMQP redismq supports Failed Queues meaning that packages that are rejected by a consumer will be stored in separate queue for further inspection. Alternatively a consumer can also Requeue() a package and put it back into the queue:

	package, err := consumer.Get()
	if err != nil {
	err = package.Requeue()
	if err != nil {

To push the message into the Failed Queue of this consumer simply use Fail():

	package, err := consumer.Get()
	if err != nil {
	err = package.Fail()
	if err != nil {
	package, err = suite.consumer.GetUnacked()

As you can see there is also a command to get messages from the Failed Queue.

How fast is it

Even though the original implementation wasn't aiming for high speeds the addition of BufferedQueues and MultiGet make it go something like this.

All of the following benchmarks were conducted on a MacBook Retina with a 2.4 GHz i7. The InputRate is the number of messages per second that get inserted, WorkRate the messages per second consumed.

Single Publisher, Two Consumers only atomic Get and Put

InputRate:	12183
WorkRate:	12397

Single Publisher, Two Consumers using BufferedQueues and MultiGet

InputRate:	46994
WorkRate:	25000

And yes that is a persistent message queue that can move over 70k messages per second.

If you want to find out for yourself checkout the example folder. The load.go or buffered_queue.go will start a web server that will display performance stats under http://localhost:9999/stats.

How persistent is it

As redis is the underlying storage engine you can set your desired persistence somewhere between YOLO and fsync(). With somewhat sane settings you should see no significant performance decrease.

redismq is Copyright © 2014 adjust GmbH.

It is free software, and may be redistributed under the terms specified in the LICENSE file.




This section is empty.


This section is empty.


This section is empty.


type BufferedQueue

type BufferedQueue struct {
	BufferSize int
	Buffer     chan *Package
	// contains filtered or unexported fields

BufferedQueue provides an queue with buffered writes for increased performance. Only one buffered queue (per name) can be started at a time. Before terminating the queue should be flushed using FlushBuffer() to avoid package loss

func CreateBufferedQueue

func CreateBufferedQueue(redisHost, redisPort, redisPassword string, redisDB int64, name string, bufferSize int) *BufferedQueue

CreateBufferedQueue returns BufferedQueue. To start writing the buffer to redis use Start(). Optimal BufferSize seems to be around 200. Works like SelectBufferedQueue for existing queues

func SelectBufferedQueue

func SelectBufferedQueue(redisHost, redisPort, redisPassword string, redisDB int64, name string, bufferSize int) (queue *BufferedQueue, err error)

SelectBufferedQueue returns a BufferedQueue if a queue with the name exists

func (*BufferedQueue) FlushBuffer

func (queue *BufferedQueue) FlushBuffer()

FlushBuffer tells the background writer to flush the buffer to redis

func (*BufferedQueue) Put

func (queue *BufferedQueue) Put(payload string) error

Put writes the payload to the buffer

func (*BufferedQueue) Start

func (queue *BufferedQueue) Start() error

Start dispatches the background writer that flushes the buffer. If there is already a BufferedQueue running it will return an error.

type Consumer

type Consumer struct {
	Name  string
	Queue *Queue
	// contains filtered or unexported fields

Consumer are used for reading from queues

func (*Consumer) Get

func (consumer *Consumer) Get() (*Package, error)

Get returns a single package from the queue (blocking)

func (*Consumer) GetFailed

func (consumer *Consumer) GetFailed() (*Package, error)

GetFailed returns a single packages from the failed queue of this consumer

func (*Consumer) GetUnacked

func (consumer *Consumer) GetUnacked() (*Package, error)

GetUnacked returns a single packages from the working queue of this consumer

func (*Consumer) GetUnackedLength

func (consumer *Consumer) GetUnackedLength() int64

GetUnackedLength returns the number of packages in the unacked queue

func (*Consumer) HasUnacked

func (consumer *Consumer) HasUnacked() bool

HasUnacked returns true if the consumers has unacked packages

func (*Consumer) MultiGet

func (consumer *Consumer) MultiGet(length int) ([]*Package, error)

MultiGet returns an array of packages from the queue

func (*Consumer) NoWaitGet

func (consumer *Consumer) NoWaitGet() (*Package, error)

NoWaitGet returns a single package from the queue (returns nil, nil if no package in queue)

func (*Consumer) Quit

func (consumer *Consumer) Quit()

func (*Consumer) RequeueWorking

func (consumer *Consumer) RequeueWorking() error

RequeueWorking requeues all packages from working to input

func (*Consumer) ResetWorking

func (consumer *Consumer) ResetWorking() error

ResetWorking deletes! all messages in the working queue of this consumer

type ConsumerStat

type ConsumerStat struct {
	WorkRateSecond int64
	WorkRateMinute int64
	WorkRateHour   int64

ConsumerStat collects data about a queues consumer

type Observer

type Observer struct {
	Stats map[string]*QueueStat
	// contains filtered or unexported fields

Observer is a very simple implementation of an statistics observer far more complex things could be implemented with the way stats are written for now it allows basic access to throughput rates and queue size averaged over seconds, minutes and hours

func NewObserver

func NewObserver(redisHost, redisPort, redisPassword string, redisDb int64) *Observer

NewObserver returns an Oberserver to monitor different statistics from redis

func (*Observer) GetAllQueues

func (observer *Observer) GetAllQueues() (queues []string, err error)

GetAllQueues returns a list of all registed queues

func (*Observer) ToJSON

func (observer *Observer) ToJSON() string

ToJSON renders the whole observer as a JSON string

func (*Observer) UpdateAllStats

func (observer *Observer) UpdateAllStats()

UpdateAllStats fetches stats for all queues and all their consumers

func (*Observer) UpdateQueueStats

func (observer *Observer) UpdateQueueStats(queue string)

UpdateQueueStats fetches stats for one specific queue and its consumers

type Package

type Package struct {
	Payload    string
	CreatedAt  time.Time
	Queue      interface{} `json:"-"`
	Consumer   *Consumer   `json:"-"`
	Collection *[]*Package `json:"-"`
	Acked      bool        `json:"-"`

Package provides headers and handling functions around payloads

func (*Package) Ack

func (pack *Package) Ack() error

Ack removes the packages from the queue

func (*Package) Fail

func (pack *Package) Fail() error

Fail moves a package to the failed queue

func (*Package) MultiAck

func (pack *Package) MultiAck() (err error)

MultiAck removes all packaes from the fetched array up to and including this package

func (*Package) Requeue

func (pack *Package) Requeue() error

Requeue moves a package back to input

type Queue

type Queue struct {
	Name string
	// contains filtered or unexported fields

Queue is the central element of this library. Packages can be put into or get from the queue. To read from a queue you need a consumer.

func CreateQueue

func CreateQueue(redisHost, redisPort, redisPassword string, redisDB int64, name string) *Queue

CreateQueue return a queue that you can Put() or AddConsumer() to Works like SelectQueue for existing queues

func SelectQueue

func SelectQueue(redisHost, redisPort, redisPassword string, redisDB int64, name string) (queue *Queue, err error)

SelectQueue returns a Queue if a queue with the name exists

func (*Queue) AddConsumer

func (queue *Queue) AddConsumer(name string) (c *Consumer, err error)

AddConsumer returns a conumser that can write from the queue

func (*Queue) Delete

func (queue *Queue) Delete() error

Delete clears all input and failed queues as well as all consumers will not proceed as long as consumers are running

func (*Queue) GetFailedLength

func (queue *Queue) GetFailedLength() int64

GetFailedLength returns the number of packages in the failed queue

func (*Queue) GetInputLength

func (queue *Queue) GetInputLength() int64

GetInputLength returns the number of packages in the input queue

func (*Queue) Put

func (queue *Queue) Put(payload string) error

Put writes the payload into the input queue

func (*Queue) RequeueFailed

func (queue *Queue) RequeueFailed() error

RequeueFailed moves all failed packages back to the input queue

func (*Queue) ResetFailed

func (queue *Queue) ResetFailed() error

ResetFailed deletes all packages from the failed queue

func (*Queue) ResetInput

func (queue *Queue) ResetInput() error

ResetInput deletes all packages from the input queue

type QueueStat

type QueueStat struct {
	InputSizeSecond int64
	InputSizeMinute int64
	InputSizeHour   int64

	FailSizeSecond int64
	FailSizeMinute int64
	FailSizeHour   int64

	InputRateSecond int64
	InputRateMinute int64
	InputRateHour   int64

	WorkRateSecond int64
	WorkRateMinute int64
	WorkRateHour   int64

	ConsumerStats map[string]*ConsumerStat

QueueStat collects data about a queue

type Server

type Server struct {
	// contains filtered or unexported fields

Server is the web server API for monitoring via JSON

func NewServer

func NewServer(redisHost, redisPort, redisPassword string, redisDb int64, port string) *Server

NewServer returns a Server that can be started with Start()

func (*Server) Start

func (server *Server) Start()

Start enables the Server to listen on his port


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL