Documentation
¶
Index ¶
- func UseHashTagKey() interface{}
- func WithMsgTTL(d time.Duration) interface{}
- func WithRetryCount(count int) interface{}
- type DelayQueue
- func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error
- func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error
- func (q *DelayQueue) StartConsume() (done <-chan struct{})
- func (q *DelayQueue) StopConsume()
- func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue
- func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue
- func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue
- func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue
- func (q *DelayQueue) WithLogger(logger *log.Logger) *DelayQueue
- func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func UseHashTagKey ¶ added in v1.0.3
func UseHashTagKey() interface{}
UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot. If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis see more: https://redis.io/docs/reference/cluster-spec/#hash-tags
func WithMsgTTL ¶ added in v1.0.2
WithMsgTTL set ttl for a msg example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour))
func WithRetryCount ¶
func WithRetryCount(count int) interface{}
WithRetryCount set retry count for a msg example: queue.SendDelayMsg(payload, duration, delayqueue.WithRetryCount(3))
Types ¶
type DelayQueue ¶
type DelayQueue struct {
// contains filtered or unexported fields
}
DelayQueue is a message queue supporting delayed/scheduled delivery based on redis
func NewQueue ¶
func NewQueue(name string, cli *redis.Client, callback func(string) bool, opts ...interface{}) *DelayQueue
NewQueue creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
func (*DelayQueue) SendDelayMsg ¶
func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error
SendDelayMsg submits a message delivered after given duration
func (*DelayQueue) SendScheduleMsg ¶
func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error
SendScheduleMsg submits a message delivered at given time
func (*DelayQueue) StartConsume ¶
func (q *DelayQueue) StartConsume() (done <-chan struct{})
StartConsume creates a goroutine to consume message from DelayQueue use `<-done` to wait consumer stopping
func (*DelayQueue) StopConsume ¶
func (q *DelayQueue) StopConsume()
StopConsume stops consumer goroutine
func (*DelayQueue) WithConcurrent ¶ added in v1.0.1
func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue
WithConcurrent sets the number of concurrent consumers
func (*DelayQueue) WithDefaultRetryCount ¶ added in v1.0.1
func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue
WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message
func (*DelayQueue) WithFetchInterval ¶
func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue
WithFetchInterval customizes the interval at which consumer fetch message from redis
func (*DelayQueue) WithFetchLimit ¶
func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue
WithFetchLimit limits the max number of unack (processing) messages
func (*DelayQueue) WithLogger ¶
func (q *DelayQueue) WithLogger(logger *log.Logger) *DelayQueue
WithLogger customizes logger for queue
func (*DelayQueue) WithMaxConsumeDuration ¶
func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue
WithMaxConsumeDuration customizes max consume duration If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again
