Documentation
¶
Index ¶
- Variables
- func UseHashTagKey() interface{}
- func WithMsgTTL(d time.Duration) interface{}
- func WithRetryCount(count int) interface{}
- type DelayQueue
- func NewQueue(name string, cli *redis.Client, callback func(string) bool, ...) *DelayQueue
- func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...interface{}) *DelayQueue
- func NewQueueOnCluster(name string, cli *redis.ClusterClient, callback func(string) bool, ...) *DelayQueue
- func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error
- func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error
- func (q *DelayQueue) StartConsume() (done <-chan struct{})
- func (q *DelayQueue) StopConsume()
- func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue
- func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue
- func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue
- func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue
- func (q *DelayQueue) WithLogger(logger *log.Logger) *DelayQueue
- func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue
- type RedisCli
Constants ¶
This section is empty.
Variables ¶
var NilErr = errors.New("nil")
NilErr represents redis nil
Functions ¶
func UseHashTagKey ¶
func UseHashTagKey() interface{}
UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot. If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis see more: https://redis.io/docs/reference/cluster-spec/#hash-tags
func WithMsgTTL ¶
WithMsgTTL set ttl for a msg example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour))
func WithRetryCount ¶
func WithRetryCount(count int) interface{}
WithRetryCount set retry count for a msg example: queue.SendDelayMsg(payload, duration, delayqueue.WithRetryCount(3))
Types ¶
type DelayQueue ¶
type DelayQueue struct {
// contains filtered or unexported fields
}
DelayQueue is a message queue supporting delayed/scheduled delivery based on redis
func NewQueue ¶
func NewQueue(name string, cli *redis.Client, callback func(string) bool, opts ...interface{}) *DelayQueue
func NewQueue0 ¶
func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...interface{}) *DelayQueue
NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
func NewQueueOnCluster ¶
func NewQueueOnCluster(name string, cli *redis.ClusterClient, callback func(string) bool, opts ...interface{}) *DelayQueue
func (*DelayQueue) SendDelayMsg ¶
func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error
SendDelayMsg submits a message delivered after given duration
func (*DelayQueue) SendScheduleMsg ¶
func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error
SendScheduleMsg submits a message delivered at given time
func (*DelayQueue) StartConsume ¶
func (q *DelayQueue) StartConsume() (done <-chan struct{})
StartConsume creates a goroutine to consume message from DelayQueue use `<-done` to wait consumer stopping
func (*DelayQueue) StopConsume ¶
func (q *DelayQueue) StopConsume()
StopConsume stops consumer goroutine
func (*DelayQueue) WithConcurrent ¶
func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue
WithConcurrent sets the number of concurrent consumers
func (*DelayQueue) WithDefaultRetryCount ¶
func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue
WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message
func (*DelayQueue) WithFetchInterval ¶
func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue
WithFetchInterval customizes the interval at which consumer fetch message from redis
func (*DelayQueue) WithFetchLimit ¶
func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue
WithFetchLimit limits the max number of unack (processing) messages
func (*DelayQueue) WithLogger ¶
func (q *DelayQueue) WithLogger(logger *log.Logger) *DelayQueue
WithLogger customizes logger for queue
func (*DelayQueue) WithMaxConsumeDuration ¶
func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue
WithMaxConsumeDuration customizes max consume duration If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again
type RedisCli ¶
type RedisCli interface { Eval(script string, keys []string, args []interface{}) (interface{}, error) // args should be string, integer or float Set(key string, value string, expiration time.Duration) error Get(key string) (string, error) Del(keys []string) error HSet(key string, field string, value string) error HDel(key string, fields []string) error SMembers(key string) ([]string, error) SRem(key string, members []string) error ZAdd(key string, values map[string]float64) error ZRem(key string, fields []string) error }
RedisCli is abstraction for redis client, required commands only not all commands