dq

package module
v0.0.0-...-f0f9394 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2022 License: Apache-2.0 Imports: 7 Imported by: 0

README

dq

用Redis实现延迟队列

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrDuplicateMessage = errors.New("duplicate message")

ErrDuplicateMessage 重复消息

Functions

This section is empty.

Types

type DelayServiceConsumer

type DelayServiceConsumer struct {
	// contains filtered or unexported fields
}

DelayServiceConsumer 延迟服务消费者

func NewDelayServiceConsumer

func NewDelayServiceConsumer(producer sarama.SyncProducer, delay time.Duration,
	realTopic string) *DelayServiceConsumer

func (*DelayServiceConsumer) Cleanup

func (*DelayServiceConsumer) ConsumeClaim

func (*DelayServiceConsumer) Setup

type KafkaDelayQueueProducer

type KafkaDelayQueueProducer struct {
	// contains filtered or unexported fields
}

func NewKafkaDelayQueueProducer

func NewKafkaDelayQueueProducer(producer sarama.SyncProducer, delayServiceConsumerGroup sarama.ConsumerGroup,
	delayTime time.Duration, delayTopic, realTopic string) *KafkaDelayQueueProducer

NewKafkaDelayQueueProducer 延迟队列,包含了生产者和延迟服务 producer 生产者 delayServiceConsumerGroup 延迟服务消费者 delayTime 延迟时间 delayTopic 延迟服务主题 realTopic 真实队列主题

func (*KafkaDelayQueueProducer) SendMessage

func (q *KafkaDelayQueueProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)

SendMessage 发送消息

type Msg

type Msg struct {
	Topic     string        // 消息的主题
	Key       string        // 消息的Key
	Body      []byte        // 消息的Body
	Partition int           // 分区号
	Delay     time.Duration // 延迟时间(秒)
	ReadyTime time.Time     // 消息准备好执行的时间(now + delay)
}

Msg 消息

type PartitionRedisDelayQueue

type PartitionRedisDelayQueue struct {
	// contains filtered or unexported fields
}

func NewPartitionRedisDelayQueue

func NewPartitionRedisDelayQueue(client *redis.Client) *PartitionRedisDelayQueue

func (*PartitionRedisDelayQueue) Consume

func (q *PartitionRedisDelayQueue) Consume(topic string, batchSize, partition int, fn func(msg *Msg) error)

func (*PartitionRedisDelayQueue) Push

func (q *PartitionRedisDelayQueue) Push(ctx context.Context, msg *Msg) error

type SimpleRedisDelayQueue

type SimpleRedisDelayQueue struct {
	// contains filtered or unexported fields
}

func NewSimpleRedisDelayQueue

func NewSimpleRedisDelayQueue(client *redis.Client) *SimpleRedisDelayQueue

func (*SimpleRedisDelayQueue) Consume

func (q *SimpleRedisDelayQueue) Consume(topic string, batchSize int, fn func(msg *Msg) error)

func (*SimpleRedisDelayQueue) Push

func (q *SimpleRedisDelayQueue) Push(ctx context.Context, msg *Msg) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL