delayqueue

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2025 License: Apache-2.0 Imports: 11 Imported by: 3

README

DelayQueue

license Build Status Coverage Status Go Report Card Go Reference

中文版

DelayQueue is a message queue supporting delayed/scheduled delivery based on redis. It is designed to be reliable, scalable and easy to get started.

Core Advantages:

  • Guaranteed at least once consumption
  • Auto retry failed messages
  • Works out of the box, Config Nothing and Deploy Nothing, A Redis is all you need.
  • Natively adapted to the distributed environment, messages processed concurrently on multiple machines . Workers can be added, removed or migrated at any time
  • Support Redis Cluster or clusters of most cloud service providers. see chapter Cluster
  • Easy to use monitoring data exporter, see Monitoring

Install

DelayQueue requires a Go version with modules support. Run following command line in your project with go.mod:

go get github.com/hdt3213/delayqueue

if you are using github.com/go-redis/redis/v8 please use go get github.com/hdt3213/delayqueue@redisv8

Get Started

package main

import (
	"github.com/redis/go-redis/v9"
	"github.com/hdt3213/delayqueue"
	"strconv"
	"time"
)

func main() {
	redisCli := redis.NewClient(&redis.Options{
		Addr: "127.0.0.1:6379",
	})
	queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool {
		// callback returns true to confirm successful consumption.
		// If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
		return true
	}).WithConcurrent(4) // set the number of concurrent consumers 
	// send delay message
	for i := 0; i < 10; i++ {
		_, err := queue.SendDelayMsgV2(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
		if err != nil {
			panic(err)
		}
	}
	// send schedule message
	for i := 0; i < 10; i++ {
		_, err := queue.SendScheduleMsgV2(strconv.Itoa(i), time.Now().Add(time.Hour))
		if err != nil {
			panic(err)
		}
	}
	// start consume
	done := queue.StartConsume()
	<-done
}

SendScheduleMsgV2 (SendDelayMsgV2) is fully compatible with SendScheduleMsg (SendDelayMsg)

Please note that redis/v8 is not compatible with redis cluster 7.x. detail

If you are using redis client other than go-redis, you could wrap your redis client into RedisCli interface

If you don't want to set the callback during initialization, you can use func WithCallback.

Producer consumer distributed deployment

By default, delayqueue instances can be both producers and consumers.

If your program only need producers and consumers are placed elsewhere, delayqueue.NewPublisher is a good option for you.

func consumer() {
	queue := NewQueue("test", redisCli, cb)
	queue.StartConsume()
}

func producer() {
	publisher := NewPublisher("test", redisCli)
	publisher.SendDelayMsg(strconv.Itoa(i), 0)
}

Intercept/Delete Messages

msg, err := queue.SendScheduleMsgV2(strconv.Itoa(i), time.Now().Add(time.Second))
if err != nil {
	panic(err)
}
result, err := queue.TryIntercept(msg)
if err != nil {
	panic(err)
}
if result.Intercepted {
	println("interception success!")
} else {
	println("interception failed, message has been consumed!")
}

SendScheduleMsgV2 and SendDelayMsgV2 return a structure which contains message tracking information.Then passing it to TryIntercept to try to intercept the consumption of the message.

If the message is pending or waiting to consume the interception will succeed.If the message has been consumed or is awaiting retry, the interception will fail, but TryIntercept will prevent subsequent retries.

TryIntercept returns a InterceptResult, which Intercepted field indicates whether it is successful.

Options

Consume Function
func (q *DelayQueue)WithCallback(callback CallbackFunc) *DelayQueue

WithCallback set callback for queue to receives and consumes messages callback returns true to confirm successfully consumed, false to re-deliver this message.

If there is no callback set, StartConsume will panic

queue := NewQueue("test", redisCli)
queue.WithCallback(func(payload string) bool {
	return true
})
Logger
func (q *DelayQueue)WithLogger(logger Logger) *DelayQueue

WithLogger customizes logger for queue. Logger should implemented the following interface:

type Logger interface {
	Printf(format string, v ...interface{})
}
Concurrent
func (q *DelayQueue)WithConcurrent(c uint) *DelayQueue

WithConcurrent sets the number of concurrent consumers

Polling Interval
func (q *DelayQueue)WithFetchInterval(d time.Duration) *DelayQueue

WithFetchInterval customizes the interval at which consumer fetch message from redis

Timeout
func (q *DelayQueue)WithMaxConsumeDuration(d time.Duration) *DelayQueue

WithMaxConsumeDuration customizes max consume duration

If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again

Max Processing Limit
func (q *DelayQueue)WithFetchLimit(limit uint) *DelayQueue

WithFetchLimit limits the max number of unack (processing) messages

Hash Tag
UseHashTagKey()

UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.

If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, you should add this option to NewQueue: NewQueue("test", redisCli, cb, UseHashTagKey()). This Option cannot be changed after DelayQueue has been created.

WARNING! CHANGING(add or remove) this option will cause DelayQueue failing to read existed data in redis

see more: https://redis.io/docs/reference/cluster-spec/#hash-tags

Default Retry Count
WithDefaultRetryCount(count uint)  *DelayQueue

WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue

use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message

queue.SendDelayMsg(msg, time.Hour, delayqueue.WithRetryCount(3))
Nack Redelivery Delay
WithNackRedeliveryDelay(d time.Duration) *DelayQueue

WithNackRedeliveryDelay customizes the interval between redelivery and nack (callback returns false) But if consumption exceeded deadline, the message will be redelivered immediately.

Script Preload
(q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue

WithScriptPreload(true) makes DelayQueue preload scripts and call them using EvalSha to reduce communication costs. WithScriptPreload(false) makes DelayQueue run scripts by Eval commnand. Using preload and EvalSha by Default

Customize Prefix
queue := delayqueue.NewQueue("example", redisCli, callback, UseCustomPrefix("MyPrefix"))

All keys of delayqueue has a smae prefix, dp by default. If you want to modify the prefix, you could use UseCustomPrefix.

Monitoring

We provides Monitor to monitor the running status.

monitor := delayqueue.NewMonitor("example", redisCli)

Monitor.ListenEvent can register a listener that can receive all internal events, so you can use it to implement customized data reporting and metrics.

The monitor can receive events from all workers, even if they are running on another server.

type EventListener interface {
	OnEvent(*Event)
}

// returns: close function, error
func (m *Monitor) ListenEvent(listener EventListener) (func(), error) 

The definition of event could be found in events.go.

Besides, We provide a demo that uses EventListener to monitor the production and consumption amount per minute.

The complete demo code can be found in example/monitor.

type MyProfiler struct {
	List  []*Metrics
	Start int64
}

func (p *MyProfiler) OnEvent(event *delayqueue.Event) {
	sinceUptime := event.Timestamp - p.Start
	upMinutes := sinceUptime / 60
	if len(p.List) <= int(upMinutes) {
		p.List = append(p.List, &Metrics{})
	}
	current := p.List[upMinutes]
	switch event.Code {
	case delayqueue.NewMessageEvent:
		current.ProduceCount += event.MsgCount
	case delayqueue.DeliveredEvent:
		current.DeliverCount += event.MsgCount
	case delayqueue.AckEvent:
		current.ConsumeCount += event.MsgCount
	case delayqueue.RetryEvent:
		current.RetryCount += event.MsgCount
	case delayqueue.FinalFailedEvent:
		current.FailCount += event.MsgCount
	}
}

func main() {
	queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool {
		return true
	})
	start := time.Now()
	// IMPORTANT: EnableReport must be called so monitor can do its work
	queue.EnableReport() 

	// setup monitor
	monitor := delayqueue.NewMonitor("example", redisCli)
	listener := &MyProfiler{
		Start: start.Unix(),
	}
	monitor.ListenEvent(listener)

	// print metrics every minute
	tick := time.Tick(time.Minute)
	go func() {
		for range tick {
			minutes := len(listener.List)-1
			fmt.Printf("%d: %#v", minutes, listener.List[minutes])
		}
	}()
}

Monitor use redis pub/sub to collect data, so it is important to call DelayQueue.EnableReport of all workers, to enable events reporting for monitor.

If you do not want to use redis pub/sub, you can use DelayQueue.ListenEvent to collect data yourself.

Please be advised, DelayQueue.ListenEvent can only receive events from the current instance, while monitor can receive events from all instances in the queue.

Once DelayQueue.ListenEvent is called, the monitor's listener will be overwritten unless EnableReport is called again to re-enable the monitor.

Get Status

You could get Pending Count, Ready Count and Processing Count from the monitor:

func (m *Monitor) GetPendingCount() (int64, error) 

GetPendingCount returns the number of which delivery time has not arrived.

func (m *Monitor) GetReadyCount() (int64, error)

GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered yet

func (m *Monitor) GetProcessingCount() (int64, error)

GetProcessingCount returns the number of messages which are being processed

Cluster

If you are using Redis Cluster, please use NewQueueOnCluster

redisCli := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs: []string{
        "127.0.0.1:7000",
        "127.0.0.1:7001",
        "127.0.0.1:7002",
    },
})
callback := func(s string) bool {
    return true
}
queue := NewQueueOnCluster("test", redisCli, callback)

If you are using transparent clusters, such as codis, twemproxy, or the redis of cluster architecture on aliyun, tencentcloud, just use NewQueue and enable hash tag

redisCli := redis.NewClient(&redis.Options{
    Addr: "127.0.0.1:6379",
})
callback := func(s string) bool {
    return true
}
queue := delayqueue.NewQueue("example", redisCli, callback, UseHashTagKey())

More Details

Here is the complete flowchart:

  • pending: A sorted set of messages pending for delivery. member is message id, score is delivery unix timestamp.
  • ready: A list of messages ready to deliver. Workers fetch messages from here.
  • unack: A sorted set of messages waiting for ack (successfully consumed confirmation) which means the messages here is being processing. member is message id, score is the unix timestamp of processing deadline.
  • retry: A list of messages which processing exceeded deadline and waits for retry
  • garbage: A list of messages reaching max retry count and waits for cleaning

Documentation

Index

Constants

View Source
const (
	StatePending    = "pending"
	StateReady      = "ready"
	StateReadyRetry = "ready_to_retry"
	StateConsuming  = "consuming"
	StateUnknown    = "unknown"
)
View Source
const (
	// NewMessageEvent emmited when send message
	NewMessageEvent = iota + 1
	// ReadyEvent emmited when messages has reached delivery time
	ReadyEvent
	// DeliveredEvent emmited when messages has been delivered to consumer
	DeliveredEvent
	// AckEvent emmited when receive message successfully consumed callback
	AckEvent
	// AckEvent emmited when receive message consumption failure callback
	NackEvent
	// RetryEvent emmited when message re-delivered to consumer
	RetryEvent
	// FinalFailedEvent emmited when message reaches max retry attempts
	FinalFailedEvent
)

Variables

View Source
var NilErr = errors.New("nil")

NilErr represents redis nil

Functions

func UseCustomPrefix added in v1.1.0

func UseCustomPrefix(prefix string) interface{}

UseCustomPrefix customize prefix to instead of default prefix "dp"

func UseHashTagKey added in v1.0.3

func UseHashTagKey() interface{}

UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot. If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis see more: https://redis.io/docs/reference/cluster-spec/#hash-tags

func WithMsgTTL added in v1.0.2

func WithMsgTTL(d time.Duration) interface{}

WithMsgTTL set ttl for a msg example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour))

func WithRetryCount

func WithRetryCount(count int) interface{}

WithRetryCount set retry count for a msg example: queue.SendDelayMsg(payload, duration, delayqueue.WithRetryCount(3))

Types

type CallbackFunc added in v1.0.5

type CallbackFunc = func(string) bool

CallbackFunc receives and consumes messages returns true to confirm successfully consumed, false to re-deliver this message

type DelayQueue

type DelayQueue struct {
	// contains filtered or unexported fields
}

DelayQueue is a message queue supporting delayed/scheduled delivery based on redis

func NewQueue

func NewQueue(name string, cli *redis.Client, opts ...interface{}) *DelayQueue

NewQueue creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message

 queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool {
     // callback returns true to confirm successful consumption.
     // If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
		return true
	})

func NewQueue0 added in v1.0.5

func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue

NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message

func NewQueueOnCluster added in v1.0.5

func NewQueueOnCluster(name string, cli *redis.ClusterClient, opts ...interface{}) *DelayQueue

func (*DelayQueue) DisableListener added in v1.0.5

func (q *DelayQueue) DisableListener()

RemoveListener stops reporting events to EventListener

func (*DelayQueue) DisableReport added in v1.0.5

func (q *DelayQueue) DisableReport()

DisableReport stops reporting to monitor

func (*DelayQueue) EnableReport added in v1.0.5

func (q *DelayQueue) EnableReport()

EnableReport enables reporting to monitor

func (*DelayQueue) GetPendingCount added in v1.0.5

func (q *DelayQueue) GetPendingCount() (int64, error)

GetPendingCount returns the number of pending messages

func (*DelayQueue) GetProcessingCount added in v1.0.5

func (q *DelayQueue) GetProcessingCount() (int64, error)

GetProcessingCount returns the number of messages which are being processed

func (*DelayQueue) GetReadyCount added in v1.0.5

func (q *DelayQueue) GetReadyCount() (int64, error)

GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered

func (*DelayQueue) ListenEvent added in v1.0.5

func (q *DelayQueue) ListenEvent(listener EventListener)

ListenEvent register a listener which will be called when events occur, so it can be used to monitor running status

But It can ONLY receive events from the CURRENT INSTANCE, if you want to listen to all events in queue, just use Monitor.ListenEvent

There can be AT MOST ONE EventListener in an DelayQueue instance. If you are using customized listener, Monitor will stop working

func (*DelayQueue) SendDelayMsg

func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error

SendDelayMsg submits a message delivered after given duration It is compatible with SendDelayMsgV2, but does not return MessageInfo

func (*DelayQueue) SendDelayMsgV2 added in v1.0.9

func (q *DelayQueue) SendDelayMsgV2(payload string, duration time.Duration, opts ...interface{}) (*MessageInfo, error)

SendDelayMsg submits a message delivered after given duration

func (*DelayQueue) SendScheduleMsg

func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error

SendScheduleMsg submits a message delivered at given time It is compatible with SendScheduleMsgV2, but does not return MessageInfo

func (*DelayQueue) SendScheduleMsgV2 added in v1.0.9

func (q *DelayQueue) SendScheduleMsgV2(payload string, t time.Time, opts ...interface{}) (*MessageInfo, error)

SendScheduleMsgV2 submits a message delivered at given time

func (*DelayQueue) StartConsume

func (q *DelayQueue) StartConsume() (done <-chan struct{})

StartConsume creates a goroutine to consume message from DelayQueue use `<-done` to wait consumer stopping If there is no callback set, StartConsume will panic

func (*DelayQueue) StopConsume

func (q *DelayQueue) StopConsume()

StopConsume stops consumer goroutine

func (*DelayQueue) TryIntercept added in v1.0.9

func (q *DelayQueue) TryIntercept(msg *MessageInfo) (*InterceptResult, error)

TryIntercept trys to intercept a message

func (*DelayQueue) WithCallback added in v1.0.5

func (q *DelayQueue) WithCallback(callback CallbackFunc) *DelayQueue

WithCallback set callback for queue to receives and consumes messages callback returns true to confirm successfully consumed, false to re-deliver this message

func (*DelayQueue) WithConcurrent added in v1.0.1

func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue

WithConcurrent sets the number of concurrent consumers

func (*DelayQueue) WithDefaultRetryCount added in v1.0.1

func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue

WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message

func (*DelayQueue) WithFetchInterval

func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue

WithFetchInterval customizes the interval at which consumer fetch message from redis

func (*DelayQueue) WithFetchLimit

func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue

WithFetchLimit limits the max number of processing messages, 0 means no limit

func (*DelayQueue) WithLogger

func (q *DelayQueue) WithLogger(logger Logger) *DelayQueue

WithLogger customizes logger for queue

func (*DelayQueue) WithMaxConsumeDuration

func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue

WithMaxConsumeDuration customizes max consume duration If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again

func (*DelayQueue) WithNackRedeliveryDelay added in v1.0.8

func (q *DelayQueue) WithNackRedeliveryDelay(d time.Duration) *DelayQueue

WithNackRedeliveryDelay customizes the interval between redelivery and nack (callback returns false) If consumption exceeded deadline, the message will be redelivered immediately

func (*DelayQueue) WithScriptPreload added in v1.0.8

func (q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue

WithScriptPreload use script load command preload scripts to redis

type Event added in v1.0.5

type Event struct {
	// Code represents event type, such as NewMessageEvent, ReadyEvent
	Code int
	// Timestamp is the event time
	Timestamp int64
	// MsgCount represents the number of messages related to the event
	MsgCount int
}

Event contains internal event information during the queue operation and can be used to monitor the queue status.

type EventListener added in v1.0.5

type EventListener interface {
	// OnEvent will be called when events occur
	OnEvent(*Event)
}

EventListener which will be called when events occur This Listener can be used to monitor running status

type InterceptResult added in v1.0.9

type InterceptResult struct {
	Intercepted bool
	State       string
}

type Logger added in v1.0.8

type Logger interface {
	Printf(format string, v ...interface{})
}

Logger is an abstraction of logging system

type MessageInfo added in v1.0.9

type MessageInfo struct {
	// contains filtered or unexported fields
}

MessageInfo stores information to trace a message

func (*MessageInfo) ID added in v1.0.9

func (msg *MessageInfo) ID() string

type Monitor added in v1.0.5

type Monitor struct {
	// contains filtered or unexported fields
}

Monitor can get running status and events of DelayQueue

func NewMonitor added in v1.0.5

func NewMonitor(name string, cli *redis.Client, opts ...interface{}) *Monitor

NewMonitor creates a new Monitor by a *redis.Client

func NewMonitor0 added in v1.0.5

func NewMonitor0(name string, cli RedisCli, opts ...interface{}) *Monitor

NewMonitor0 creates a new Monitor by a RedisCli instance

func NewMonitorOnCluster added in v1.1.0

func NewMonitorOnCluster(name string, cli *redis.ClusterClient, opts ...interface{}) *Monitor

NewMonitor creates a new Monitor by a *redis.ClusterClient

func (*Monitor) GetPendingCount added in v1.0.5

func (m *Monitor) GetPendingCount() (int64, error)

GetPendingCount returns the number of messages which delivery time has not arrived

func (*Monitor) GetProcessingCount added in v1.0.5

func (m *Monitor) GetProcessingCount() (int64, error)

GetProcessingCount returns the number of messages which are being processed

func (*Monitor) GetReadyCount added in v1.0.5

func (m *Monitor) GetReadyCount() (int64, error)

GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered yet

func (*Monitor) ListenEvent added in v1.0.5

func (m *Monitor) ListenEvent(listener EventListener) (func(), error)

ListenEvent register a listener which will be called when events occured in this queue so it can be used to monitor running status returns: close function, error

func (*Monitor) WithLogger added in v1.0.5

func (m *Monitor) WithLogger(logger *log.Logger) *Monitor

WithLogger customizes logger for queue

type Publisher added in v1.0.5

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher only publishes messages to delayqueue, it is a encapsulation of delayqueue

func NewPublisher added in v1.0.5

func NewPublisher(name string, cli *redis.Client, opts ...interface{}) *Publisher

NewPublisher creates a new Publisher by a *redis.Client

func NewPublisher0 added in v1.0.5

func NewPublisher0(name string, cli RedisCli, opts ...interface{}) *Publisher

NewPublisher0 creates a new Publisher by a RedisCli instance

func (*Publisher) SendDelayMsg added in v1.0.5

func (p *Publisher) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error

SendDelayMsg submits a message delivered after given duration

func (*Publisher) SendScheduleMsg added in v1.0.5

func (p *Publisher) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error

SendScheduleMsg submits a message delivered at given time

func (*Publisher) WithLogger added in v1.0.5

func (p *Publisher) WithLogger(logger *log.Logger) *Publisher

WithLogger customizes logger for queue

type RedisCli added in v1.0.5

type RedisCli interface {
	// Eval sends lua script to redis
	// args should be string, integer or float
	// returns string, int64, []interface{} (elements can be string or int64)
	Eval(script string, keys []string, args []interface{}) (interface{}, error)
	Set(key string, value string, expiration time.Duration) error
	// Get represents redis command GET
	// please NilErr when no such key in redis
	Get(key string) (string, error)
	Del(keys []string) error
	HSet(key string, field string, value string) error
	HDel(key string, fields []string) error
	SMembers(key string) ([]string, error)
	SRem(key string, members []string) error
	ZAdd(key string, values map[string]float64) error
	ZRem(key string, fields []string) (int64, error)
	ZCard(key string) (int64, error)
	ZScore(key string, member string) (float64, error)
	LLen(key string) (int64, error)
	LRem(key string, count int64, value string) (int64, error)

	// Publish used for monitor only
	Publish(channel string, payload string) error
	// Subscribe used for monitor only
	// returns: payload channel, subscription closer, error; the subscription closer should close payload channel as well
	Subscribe(channel string) (payloads <-chan string, close func(), err error)

	// ScriptLoad call `script load` command
	ScriptLoad(script string) (string, error)
	// EvalSha run preload scripts
	// If there is no preload scripts please return error with message "NOSCRIPT"
	EvalSha(sha1 string, keys []string, args []interface{}) (interface{}, error)
}

RedisCli is abstraction for redis client, required commands only not all commands

Directories

Path Synopsis
example
getstarted command
monitor command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL