delayqueue

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

README

DelayQueue

license Build Status Coverage Status Go Report Card Go Reference

中文版

DelayQueue is a message queue supporting delayed/scheduled delivery based on redis. It is designed to be reliable, scalable and easy to get started.

Core Advantages:

  • Guaranteed at least once consumption
  • Auto retry failed messages
  • Works out of the box, Config Nothing and Deploy Nothing, A Redis is all you need.
  • Natively adapted to the distributed environment, messages processed concurrently on multiple machines . Workers can be added, removed or migrated at any time
  • Support Redis Cluster for high availability

Install

DelayQueue requires a Go version with modules support. Run following command line in your project with go.mod:

go get github.com/AnnonaOrg/delayqueue

Get Started

package main

import (
	"github.com/redis/go-redis/v9"
	"github.com/AnnonaOrg/delayqueue"
	"strconv"
	"time"
)

func main() {
	redisCli := redis.NewClient(&redis.Options{
		Addr: "127.0.0.1:6379",
	})
	queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool {
		// callback returns true to confirm successful consumption.
		// If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
		return true
	}).WithConcurrent(4) // set the number of concurrent consumers 
	// send delay message
	for i := 0; i < 10; i++ {
		err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
		if err != nil {
			panic(err)
		}
	}
	// send schedule message
	for i := 0; i < 10; i++ {
		err := queue.SendScheduleMsg(strconv.Itoa(i), time.Now().Add(time.Hour))
		if err != nil {
			panic(err)
		}
	}
	// start consume
	done := queue.StartConsume()
	<-done
}

if you are using github.com/go-redis/redis/v8 please use go get github.com/hdt3213/delayqueue@v8 If you are using redis client other than go-redis, you could wrap your redis client into RedisCli interface

Options

WithLogger(logger *log.Logger)

WithLogger customizes logger for queue

WithConcurrent(c uint) 

WithConcurrent sets the number of concurrent consumers

WithFetchInterval(d time.Duration)

WithFetchInterval customizes the interval at which consumer fetch message from redis

WithMaxConsumeDuration(d time.Duration)

WithMaxConsumeDuration customizes max consume duration

If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again

WithFetchLimit(limit uint)

WithFetchLimit limits the max number of unack (processing) messages

UseHashTagKey()

UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.

If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, you should add this option to NewQueue: NewQueue("test", redisCli, cb, UseHashTagKey()). This Option cannot be changed after DelayQueue has been created.

WARNING! CHANGING(add or remove) this option will cause DelayQueue failing to read existed data in redis

see more: https://redis.io/docs/reference/cluster-spec/#hash-tags

WithDefaultRetryCount(count uint)

WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue

use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message

Cluster

If you are using Redis Cluster, please use NewQueueOnCluster

redisCli := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs: []string{
        "127.0.0.1:7000",
        "127.0.0.1:7001",
        "127.0.0.1:7002",
    },
})
callback := func(s string) bool {
    return true
}
queue := NewQueueOnCluster("test", redisCli, callback)

If you are using transparent clusters, such as codis, twemproxy, or the redis of cluster architecture on aliyun, tencentcloud, just use NewQueue and enable hash tag

redisCli := redis.NewClient(&redis.Options{
    Addr: "127.0.0.1:6379",
})
callback := func(s string) bool {
    return true
}
queue := delayqueue.NewQueue("example", redisCli, callback, UseHashTagKey())

More Details

Here is the complete flowchart:

  • pending: A sorted set of messages pending for delivery. member is message id, score is delivery unix timestamp.
  • ready: A list of messages ready to deliver. Workers fetch messages from here.
  • unack: A sorted set of messages waiting for ack (successfully consumed confirmation) which means the messages here is being processing. member is message id, score is the unix timestamp of processing deadline.
  • retry: A list of messages which processing exceeded deadline and waits for retry
  • garbage: A list of messages reaching max retry count and waits for cleaning

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NilErr = errors.New("nil")

NilErr represents redis nil

Functions

func UseHashTagKey

func UseHashTagKey() interface{}

UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot. If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis see more: https://redis.io/docs/reference/cluster-spec/#hash-tags

func WithMsgTTL

func WithMsgTTL(d time.Duration) interface{}

WithMsgTTL set ttl for a msg example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour))

func WithRetryCount

func WithRetryCount(count int) interface{}

WithRetryCount set retry count for a msg example: queue.SendDelayMsg(payload, duration, delayqueue.WithRetryCount(3))

Types

type DelayQueue

type DelayQueue struct {
	// contains filtered or unexported fields
}

DelayQueue is a message queue supporting delayed/scheduled delivery based on redis

func NewQueue

func NewQueue(name string, cli *redis.Client, callback func(string) bool, opts ...interface{}) *DelayQueue

func NewQueue0

func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...interface{}) *DelayQueue

NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message

func NewQueueOnCluster

func NewQueueOnCluster(name string, cli *redis.ClusterClient, callback func(string) bool, opts ...interface{}) *DelayQueue

func (*DelayQueue) SendDelayMsg

func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error

SendDelayMsg submits a message delivered after given duration

func (*DelayQueue) SendScheduleMsg

func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error

SendScheduleMsg submits a message delivered at given time

func (*DelayQueue) StartConsume

func (q *DelayQueue) StartConsume() (done <-chan struct{})

StartConsume creates a goroutine to consume message from DelayQueue use `<-done` to wait consumer stopping

func (*DelayQueue) StopConsume

func (q *DelayQueue) StopConsume()

StopConsume stops consumer goroutine

func (*DelayQueue) WithConcurrent

func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue

WithConcurrent sets the number of concurrent consumers

func (*DelayQueue) WithDefaultRetryCount

func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue

WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message

func (*DelayQueue) WithFetchInterval

func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue

WithFetchInterval customizes the interval at which consumer fetch message from redis

func (*DelayQueue) WithFetchLimit

func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue

WithFetchLimit limits the max number of unack (processing) messages

func (*DelayQueue) WithLogger

func (q *DelayQueue) WithLogger(logger *log.Logger) *DelayQueue

WithLogger customizes logger for queue

func (*DelayQueue) WithMaxConsumeDuration

func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue

WithMaxConsumeDuration customizes max consume duration If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again

type RedisCli

type RedisCli interface {
	Eval(script string, keys []string, args []interface{}) (interface{}, error) // args should be string, integer or float
	Set(key string, value string, expiration time.Duration) error
	Get(key string) (string, error)
	Del(keys []string) error
	HSet(key string, field string, value string) error
	HDel(key string, fields []string) error
	SMembers(key string) ([]string, error)
	SRem(key string, members []string) error
	ZAdd(key string, values map[string]float64) error
	ZRem(key string, fields []string) error
}

RedisCli is abstraction for redis client, required commands only not all commands

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL