Documentation ¶
Overview ¶
Package stream 流及相关对象的包
Package stream 流及相关对象的包
Index ¶
- Variables
- type Consumer
- type Producer
- type Stream
- func (s *Stream) Ack(ctx context.Context, groupname string, ids ...string) error
- func (s *Stream) AsProducer(opts ...broker.Option) *Producer
- func (s *Stream) CreateGroup(ctx context.Context, groupname, start string, autocreate bool) (string, error)
- func (s *Stream) Delete(ctx context.Context, ids ...string) error
- func (s *Stream) DeleteConsumerFromGroup(ctx context.Context, groupname, consumername string) (int64, error)
- func (s *Stream) DeleteGroup(ctx context.Context, groupname string) (int64, error)
- func (s *Stream) GroupInfos(ctx context.Context) ([]redis.XInfoGroup, error)
- func (s *Stream) HasGroup(ctx context.Context, groupname string) (bool, error)
- func (s *Stream) HasGroups(ctx context.Context, groupnames ...string) (bool, error)
- func (s *Stream) Len(ctx context.Context) (int64, error)
- func (s *Stream) Move(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ...) ([]redis.XMessage, error)
- func (s *Stream) MoveJustID(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ...) ([]string, error)
- func (s *Stream) Pending(ctx context.Context, groupname string) (*redis.XPending, error)
- func (s *Stream) Range(ctx context.Context, start, stop string) ([]redis.XMessage, error)
- func (s *Stream) SetGroupStartAt(ctx context.Context, groupname, start string) (string, error)
- func (s *Stream) Trim(ctx context.Context, count int64, strict bool) (int64, error)
Constants ¶
This section is empty.
Variables ¶
var ErrStreamConsumerAlreadyListened = errors.New("流已经被监听了")
ErrStreamConsumerAlreadyListened 流已经被监听了
var ErrStreamConsumerNotListeningYet = errors.New("流未被监听")
ErrStreamConsumerNotListeningYet 流未被监听
var ErrStreamNeedToPointOutGroups = errors.New("stream操作需要指定消费者组")
ErrStreamNeedToPointOutGroups 流操作需要指定消费者组
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct { TopicInfos map[string]string *clientkeybatch.ClientKeyBatch *consumerabc.ConsumerABC // contains filtered or unexported fields }
Consumer 流消费者对象
func NewConsumer ¶
func NewConsumer(kb *clientkeybatch.ClientKeyBatch, opts ...broker.Option) *Consumer
NewConsumer 创建一个指向多个流的消费者 默认一批获取一个消息,可以通过`WithStreamComsumerRecvBatchSize`设置批的大小 如果使用`WithStreamComsumerGroupName`设定了group,则按组消费(默认总组监听的最新位置开始监听,收到消息后确认,消息确认策略可以通过`WithStreamComsumerAckMode`配置), 否则按按单独客户端消费(默认从开始监听的时刻开始消费) 需要注意单独客户端消费不会记录消费的偏移量,因此很容易丢失下次请求时的结果.解决方法是第一次使用`$`,后面则需要记录下id号从它开始 @params k *clientkeybatch.ClientKeyBatch redis客户端的批键对象 @params start string 监听的起始位置,可以是一个unix毫秒时间戳的字符串 @params opts ...broker.Option 消费者的配置
func (*Consumer) Get ¶
Get 从多个流中取出数据 @params ctx context.Context 请求的上下文 @params timeout time.Duration 等待超时时间,为0则表示一直阻塞直到有数据
type Producer ¶
type Producer struct { *Stream // contains filtered or unexported fields }
Producer 流的生产者对象
func NewProducer ¶
NewProducer 创建一个新的流生产者 @params k *clientkey.ClientKey redis客户端的键对象 @params opts ...broker.Option 生产者的配置
type Stream ¶
Stream 流对象
func New ¶
New 创建一个新的流对象 @params k *clientkey.ClientKey redis客户端的键对象 @params maxlen int64 流对象的最大长度 @params strict bool 流是否严格控制长度
func (*Stream) Ack ¶
Ack 手工确认组已经消耗了消息 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params ids ...string 确认被消耗的id列表
func (*Stream) CreateGroup ¶
func (s *Stream) CreateGroup(ctx context.Context, groupname, start string, autocreate bool) (string, error)
CreateGroup 为指定消费者在指定的Stream上创建消费者组 如果stream不存在则创建之 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params start string 开始位置,"$"表示最近,"0"表示最初.也可以填入正常的id
func (*Stream) Delete ¶
Delete 设置标志位标识删除流中指定id的数据 @params ctx context.Context 上下文信息,用于控制请求的结束 @params ids ...string 要删除的消息id列表
func (*Stream) DeleteConsumerFromGroup ¶
func (s *Stream) DeleteConsumerFromGroup(ctx context.Context, groupname, consumername string) (int64, error)
DeleteConsumerFromGroup 在指定的Stream上删除指定消费者组中的指定消费者 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params groupname string 消费者组名列表
func (*Stream) DeleteGroup ¶
DeleteGroup 为指定消费者在指定的Stream上删除消费者组 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表
func (*Stream) GroupInfos ¶
GroupInfos 获取主题流中注册的消费者组信息 @params ctx context.Context 上下文信息,用于控制请求的结束
func (*Stream) HasGroup ¶
HasGroup 判断消费者组是否在Stream上 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名
func (*Stream) HasGroups ¶
HasGroups 判断消费者组是否都在在Stream上 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupnames ...string 消费者组名列表
func (*Stream) Move ¶
func (s *Stream) Move(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ids ...string) ([]redis.XMessage, error)
Move 转移消息的所有权给用户组中的某个用户 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params toconsumer string 要转移给所有权的消费者 @params minIdle time.Duration 被转移出去的消息等待时间最小值,为0则时间计数被重置 @params ids ...string 要转移所有权的消息id
func (*Stream) MoveJustID ¶
func (s *Stream) MoveJustID(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ids ...string) ([]string, error)
MoveJustID 转移消息的所有权给用户组中的某个用户 这个命令和Move区别在于返回值只有消息id @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params toconsumer string 要转移给所有权的消费者 @params minIdle time.Duration 被转移出去的消息等待时间最小值,为0则时间计数被重置 @params ids ...string 要转移所有权的消息id
func (*Stream) Pending ¶
Pending 查看消费组中等待确认的消息列表 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表
func (*Stream) Range ¶
Range 获取消息列表,会自动过滤已经删除的消息 @params ctx context.Context 请求的上下文 @params start string 开始位置,`-`表示最小值, `+`表示最大值,可以指定毫秒级时间戳,也可以指定特定消息id @params stop string 结束位置,`-`表示最小值, `+`表示最大值,可以指定毫秒级时间戳,也可以指定特定消息id
func (*Stream) SetGroupStartAt ¶
SetGroupStartAt 设置指定消费者组在主题流中的读取起始位置 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params start string 开始位置