msg_group

package
v0.0.0-...-6420be8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumeFunc

type ConsumeFunc[T any] func(dataList []T) error

消费者函数

type MsgGroup

type MsgGroup[T any] struct {
	// contains filtered or unexported fields
}

func NewMsgGroup

func NewMsgGroup[T any](MsgGroupObjName string, options Options, ConsumeFunc ConsumeFunc[T]) *MsgGroup[T]

NewMsgGroup ObjName:对象名每执行一次NewMsgGroup()就要有一个本项目唯一的名字

func (*MsgGroup[T]) Init

func (s *MsgGroup[T]) Init() error

Init 初始化,主要用于消费之前剩余的数据

func (*MsgGroup[T]) Publish

func (s *MsgGroup[T]) Publish(QueueName string, data T) (err error)

Publish 添加消息 o=参数选项 QueueName 队列名,redis key,不要跟其他redis key重复,只要队列名不同就会自动创建一个队列 data=要写入的数据 ConsumeFunc=消费者函数,不同的队列名可以有不通的消费者函数,如果返回err则本次消息不处理,稍后继续处理。如果返回成功则会执行ack并删除消息

type Options

type Options struct {
	Rdb       *redis.Client      //redis对象
	Graceful  *graceful.Graceful //优雅重启 退出时等待所有消费者退出
	Namespace string             //命名空间,redisKey前缀,每个项目不通

	//队列属性
	//QueueName   string        //队列名,redis key,不要跟其他redis key重复
	Expire      time.Duration //队列超时时间,超时队列自动删除,删除后消费者自动退出,创建大量队列不建议设置时间过长
	QueueMaxLen int64         //队列最大元素个数,超出自动删除旧的数据

	//消费相关属性
	GroupStartID  redis_queue.GroupStartID //消费起始位置 只消费新数据,还是消费所有数据
	ClaimMinIdle  time.Duration            //认领 待处理数据 的最小空闲时长,业务不同值也不同,主要看对消息的处理速度
	ClaimCount    int64                    //每次认领的消息个数,主要是为了提高性能
	ConsumerCount int64                    //每次消费的数据个数,消费数据多可减少与redis的交互,节省流量
	ConsumerBlock time.Duration            //BLOCK为关键字,表示设置XREAD为阻塞模式,默认是非阻塞模式,milliseconds表示具体阻塞的时间。同XREAD命令。

	//函数
	ErrLogFunc  func(err error)  //错误日志函数
	InfoLogFunc func(msg string) //Info日志
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL