rmq

package module
v0.0.0-...-975a70e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2019 License: MIT Imports: 12 Imported by: 0

README

Overview

rmq is short for Redis message queue. It's a message queue system written in Go and backed by Redis. It's similar to redismq, but implemented independently with a different interface in mind.

wercker status

Basic Usage

Lets take a look at how to use rmq.

Import

Of course you need to import rmq wherever you want to use it.

import "github.com/adjust/rmq"

Connection

Before we get to queues, we first need to establish a connection. Each rmq connection has a name (used in statistics) and Redis connection details including which Redis database to use. The most basic Redis connection uses a TCP connection to a given host and a port.

connection := rmq.OpenConnection("my service", "tcp", "localhost:6379", 1)

But it's also possible to access a Redis listening on a Unix socket.

connection := rmq.OpenConnection("my service", "unix", "/tmp/redis.sock", 1)

Note: rmq panics on Redis connection errors. Your producers and consumers will crash if Redis goes down. Please let us know if you would see this handled differently.

Queue

Once we have a connection we can use it to finally access queues. Each queue must have a unique name by which we address it. Queues are created once they are accessed. There is no need to declare them in advance. Here we open a queue named "tasks":

taskQueue := connection.OpenQueue("tasks")

Producer

An empty queue is boring, lets add some deliveries! Internally all deliveries are saved to Redis as strings. This is how you can publish a string payload to a queue:

delivery := "task payload"
taskQueue.Publish(delivery)

In practice, however, it's more common to have instances of some struct that we want to publish to a queue. Assuming task is of some type like Kind, this is how to publish the JSON representation of that task:

// create task
taskBytes, err := json.Marshal(task)
if err != nil {
    // handle error
    return
}

taskQueue.Publish(string(taskBytes))

For a full example see _example/producer.go

Consumer

Now that our queue starts filling, lets add a consumer. After opening the queue as before, we need it to start consuming before we can add consumers.

taskQueue.StartConsuming(10, time.Second)

This sets the prefetch limit to 10 and the poll duration to one second. This means the queue will fetch up to 10 deliveries at a time before giving them to the consumers. To avoid idling producers in times of full queues, the prefetch limit should always be greater than the number of consumers you are going to add. If the queue gets empty, the poll duration sets how long to wait before checking for new deliveries in Redis.

Once this is set up, we can actually add consumers to the consuming queue.

taskConsumer := &TaskConsumer{}
taskQueue.AddConsumer("task consumer", taskConsumer)

For our example this assumes that you have a struct TaskConsumer that implements the rmq.Consumer interface like this:

func (consumer *TaskConsumer) Consume(delivery rmq.Delivery) {
    var task Task
    if err = json.Unmarshal([]byte(delivery.Payload()), &task); err != nil {
        // handle error
        delivery.Reject()
        return
    }

    // perform task
    log.Printf("performing task %s", task)
    delivery.Ack()
}

First we unmarshal the JSON package found in the delivery payload. If this fails we reject the delivery, otherwise we perform the task and ack the delivery.

For a full example see _example/consumer.go

Testing Included

To simplify testing of queue producers and consumers we include test mocks.

Test Connection

As before, we first need a queue connection, but this time we use a rmq.TestConnection that doesn't need any connection settings.

testConn := rmq.NewTestConnection()

If you are using a testing framework that uses test suites, you can reuse that test connection by setting it up once for the suite and resetting it with testConn.Reset before each test.

Producer Test

Now lets say we want to test the function publishTask that creates a task and publishes it to a queue from that connection.

// call the function that should publish a task
publishTask(testConn)

// check that the task is published
c.Check(suite.testConn.GetDelivery("tasks", 0), Equals, "task payload")

The c.Check part is from gocheck, but it will look similar for other testing frameworks. Given a rmq.TestConnection, we can check the deliveries that were published to it's queues (since the last Reset() call) with GetDelivery(queue, index). In this case we want to extract the first (and possibly only) delivery that was published to queue tasks and just check the payload string.

If the payload is JSON again, the unmarshalling and check might look like this:

var task Task
err := json.Unmarshal([]byte(suite.testConn.GetDelivery("tasks", 0)), &task)
c.Assert(err, IsNil)
c.Assert(task, NotNil)
c.Check(task.Property, Equals, "value")

If you expect a producer to create multiple deliveries you can use different indexes to access them all.

c.Check(suite.testConn.GetDelivery("tasks", 0), Equals, "task1")
c.Check(suite.testConn.GetDelivery("tasks", 1), Equals, "task2")

For convenience there's also a function GetDeliveries that returns all published deliveries to a queue as string array.

c.Check(suite.testConn.GetDeliveries("tasks"), DeepEquals, []string{"task1", "task2"})

If your producer doesn't have guarantees about the order of its deliveries, you could implement a selector function like findByPrefix and then check each delivery regardless of their index.

tasks := suite.testConn.GetDeliveries("tasks")
c.Assert(tasks, HasLen, 2)
xTask := findByPrefix(tasks, "x")
yTask := findByPrefix(tasks, "y")
c.Check(xTask.Id, Equals, "3")
c.Check(yTask.Id, Equals, "4")

These examples assumed that you inject the rmq.Connection into your testable functions. If you inject instances of rmq.Queue instead, you can use rmq.TestQueue instances in tests and access their LastDeliveries (since Reset()) directly.

Consumer Test

Testing consumers is a bit easier because consumers must implement the rmq.Consumer interface. In the tests just create rmq.TestDelivery and pass it to your Consume implementation. This example creates a test delivery from a string and then checks that the delivery was acked.

consumer := &TaskConsumer{}
delivery := rmq.NewTestDeliveryString("task payload")

consumer.Consume(delivery)

c.Check(delivery.State, Equals, rmq.Acked)

The State field will always be one of these values:

  • rmq.Acked: The delivery was acked
  • rmq.Rejected: The delivery was rejected
  • rmq.Pushed: The delivery was pushed (see below)
  • rmq.Unacked: Nothing of the above

If your packages are JSON marshalled objects, then you can create test deliveries out of those like this:

task := Task{Property: "bad value"}
delivery := rmq.NewTestDelivery(task)

Statistics

Given a connection, you can call connection.CollectStats to receive rmq.Stats about all open queues, connections and consumers. If you run _example/handler.go you can see what's available:

In this example you see three connections consuming things, each wich 10 consumers each. Two of them have 8 packages unacked. Below the marker you see connections which are not consuming. One of the handler connections died because I stopped the handler. Running the cleaner would clean that up (see below).

TODO

There are some features and aspects not properly documented yet. I will quickly list those here, hopefully they will be expanded in the future. 😉

  • Batch Consumers: Use queue.AddBatchConsumer() to register a consumer that receives batches of deliveries to be consumed at once (database bulk insert) See _example/batch_consumer.go
  • Push Queues: When consuming queue A you can set up its push queue to be queue B. The consumer can then call delivery.Push() to push this delivery (originally from queue A) to the associated push queue B. (useful for retries)
  • Cleaner: Run this regularly to return unacked deliveries of stopped or crashed consumers back to ready so they can be consumed by a new consumer. See _example/cleaner.go
  • Returner: Imagine there was some error that made you reject a lot of deliveries by accident. Just call queue.ReturnRejected() to return all rejected deliveries of that queue back to ready. (Similar to ReturnUnacked which is used by the cleaner) Consider using push queues if you do this regularly. See _example/returner.go
  • Purger: If deliveries failed you don't want to retry them anymore for whatever reason, you can call queue.PurgeRejected() to dispose of them for good. There's also queue.PurgeReady if you want to get a queue clean without consuming possibly bad deliveries. See _example/purger.go

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ActiveSign

func ActiveSign(active bool) string

func OpenConnection

func OpenConnection(tag, network, address string, db int) *redisConnection

OpenConnection opens and returns a new connection

func OpenConnectionWithRedisClient

func OpenConnectionWithRedisClient(tag string, redisClient redis.UniversalClient) *redisConnection

OpenConnectionWithRedisClient opens and returns a new connection

Types

type BatchConsumer

type BatchConsumer interface {
	Consume(batch Deliveries)
}

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields
}

func NewCleaner

func NewCleaner(connection *redisConnection) *Cleaner

func (*Cleaner) Clean

func (cleaner *Cleaner) Clean() error

func (*Cleaner) CleanConnection

func (cleaner *Cleaner) CleanConnection(connection *redisConnection) error

func (*Cleaner) CleanQueue

func (cleaner *Cleaner) CleanQueue(queue *redisQueue)

type Connection

type Connection interface {
	OpenQueue(name string) Queue
	CollectStats(queueList []string) Stats
	GetOpenQueues() []string
}

Connection is an interface that can be used to test publishing

type ConnectionStat

type ConnectionStat struct {
	// contains filtered or unexported fields
}

func (ConnectionStat) String

func (stat ConnectionStat) String() string

type ConnectionStats

type ConnectionStats map[string]ConnectionStat

type Consumer

type Consumer interface {
	Consume(delivery Delivery)
}

type CustomTestConsumer

type CustomTestConsumer struct {
	// contains filtered or unexported fields
}

func NewCustomTestConsumer

func NewCustomTestConsumer(deliveryFunc func(Delivery)) *CustomTestConsumer

func (*CustomTestConsumer) Consume

func (consumer *CustomTestConsumer) Consume(delivery Delivery)

type Deliveries

type Deliveries []Delivery

func (Deliveries) Ack

func (deliveries Deliveries) Ack() int

func (Deliveries) Reject

func (deliveries Deliveries) Reject() int

type Delivery

type Delivery interface {
	Payload() string
	Ack() bool
	Delay(time.Duration) bool
	Reject() bool
	Push() bool
}

type Queue

type Queue interface {
	Publish(payload string) bool
	PublishToDelayedQueue(payload string, delayedTime time.Duration) bool
	SetPushQueue(pushQueue Queue)
	StartConsuming(prefetchLimit int, pollDuration time.Duration) bool
	StopConsuming() bool
	WaitForConsuming()
	AddConsumer(tag string, consumer Consumer) string
	AddBatchConsumer(tag string, batchSize int, consumer BatchConsumer) string
	AddBatchConsumerWithTimeout(tag string, batchSize int, timeout time.Duration, consumer BatchConsumer) string
	PurgeReady() int
	PurgeRejected() int
	ReturnRejected(count int) int
	ReturnAllRejected() int
	Close() bool
}

type QueueStat

type QueueStat struct {
	ReadyCount    int `json:"ready"`
	RejectedCount int `json:"rejected"`
	// contains filtered or unexported fields
}

func NewQueueStat

func NewQueueStat(readyCount, rejectedCount int) QueueStat

func (QueueStat) ConnectionCount

func (stat QueueStat) ConnectionCount() int

func (QueueStat) ConsumerCount

func (stat QueueStat) ConsumerCount() int

func (QueueStat) String

func (stat QueueStat) String() string

func (QueueStat) UnackedCount

func (stat QueueStat) UnackedCount() int

type QueueStats

type QueueStats map[string]QueueStat

type State

type State int
const (
	Unacked State = iota
	Acked
	Delayed
	Rejected
	Pushed
)

func (State) String

func (i State) String() string

type Stats

type Stats struct {
	QueueStats QueueStats `json:"queues"`
	// contains filtered or unexported fields
}

func CollectStats

func CollectStats(queueList []string, mainConnection *redisConnection) Stats

func NewStats

func NewStats() Stats

func (Stats) GetHtml

func (stats Stats) GetHtml(layout, refresh string) string

func (Stats) String

func (stats Stats) String() string

type TestBatchConsumer

type TestBatchConsumer struct {
	LastBatch Deliveries
	// contains filtered or unexported fields
}

func NewTestBatchConsumer

func NewTestBatchConsumer() *TestBatchConsumer

func (*TestBatchConsumer) Consume

func (consumer *TestBatchConsumer) Consume(batch Deliveries)

func (*TestBatchConsumer) Finish

func (consumer *TestBatchConsumer) Finish()

type TestConnection

type TestConnection struct {
	// contains filtered or unexported fields
}

func NewTestConnection

func NewTestConnection() TestConnection

func (TestConnection) CollectStats

func (connection TestConnection) CollectStats(queueList []string) Stats

func (TestConnection) GetDeliveries

func (connection TestConnection) GetDeliveries(queueName string) []string

func (TestConnection) GetDelivery

func (connection TestConnection) GetDelivery(queueName string, index int) string

func (TestConnection) GetOpenQueues

func (connection TestConnection) GetOpenQueues() []string

func (TestConnection) OpenQueue

func (connection TestConnection) OpenQueue(name string) Queue

func (TestConnection) Reset

func (connection TestConnection) Reset()

type TestConsumer

type TestConsumer struct {
	AutoAck       bool
	AutoFinish    bool
	SleepDuration time.Duration

	LastDelivery   Delivery
	LastDeliveries []Delivery
	// contains filtered or unexported fields
}

func NewTestConsumer

func NewTestConsumer(name string) *TestConsumer

func (*TestConsumer) Consume

func (consumer *TestConsumer) Consume(delivery Delivery)

func (*TestConsumer) Finish

func (consumer *TestConsumer) Finish()

func (*TestConsumer) String

func (consumer *TestConsumer) String() string

type TestDelivery

type TestDelivery struct {
	State State
	// contains filtered or unexported fields
}

func NewTestDelivery

func NewTestDelivery(content interface{}) *TestDelivery

func NewTestDeliveryString

func NewTestDeliveryString(payload string) *TestDelivery

func (*TestDelivery) Ack

func (delivery *TestDelivery) Ack() bool

func (*TestDelivery) Delay

func (delivery *TestDelivery) Delay(_ time.Duration) bool

func (*TestDelivery) Payload

func (delivery *TestDelivery) Payload() string

func (*TestDelivery) Push

func (delivery *TestDelivery) Push() bool

func (*TestDelivery) Reject

func (delivery *TestDelivery) Reject() bool

type TestQueue

type TestQueue struct {
	LastDeliveries []string
	// contains filtered or unexported fields
}

func NewTestQueue

func NewTestQueue(name string) *TestQueue

func (*TestQueue) AddBatchConsumer

func (queue *TestQueue) AddBatchConsumer(tag string, batchSize int, consumer BatchConsumer) string

func (*TestQueue) AddBatchConsumerWithTimeout

func (queue *TestQueue) AddBatchConsumerWithTimeout(tag string, batchSize int, timeout time.Duration, consumer BatchConsumer) string

func (*TestQueue) AddConsumer

func (queue *TestQueue) AddConsumer(tag string, consumer Consumer) string

func (*TestQueue) Close

func (queue *TestQueue) Close() bool

func (*TestQueue) Publish

func (queue *TestQueue) Publish(payload string) bool

func (*TestQueue) PublishToDelayedQueue

func (queue *TestQueue) PublishToDelayedQueue(payload string, delayedTime time.Duration) bool

func (*TestQueue) PurgeReady

func (queue *TestQueue) PurgeReady() int

func (*TestQueue) PurgeRejected

func (queue *TestQueue) PurgeRejected() int

func (*TestQueue) Reset

func (queue *TestQueue) Reset()

func (*TestQueue) ReturnAllRejected

func (queue *TestQueue) ReturnAllRejected() int

func (*TestQueue) ReturnRejected

func (queue *TestQueue) ReturnRejected(count int) int

func (*TestQueue) SetPushQueue

func (queue *TestQueue) SetPushQueue(pushQueue Queue)

func (*TestQueue) StartConsuming

func (queue *TestQueue) StartConsuming(prefetchLimit int, pollDuration time.Duration) bool

func (*TestQueue) StopConsuming

func (queue *TestQueue) StopConsuming() bool

func (*TestQueue) String

func (queue *TestQueue) String() string

func (*TestQueue) WaitForConsuming

func (queue *TestQueue) WaitForConsuming()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL