stream

package
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2022 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package stream 流及相关对象的包

Package stream 流及相关对象的包

Index

Constants

This section is empty.

Variables

View Source
var ErrStreamConsumerAlreadyListened = errors.New("流已经被监听了")

ErrStreamConsumerAlreadyListened 流已经被监听了

View Source
var ErrStreamConsumerNotListeningYet = errors.New("流未被监听")

ErrStreamConsumerNotListeningYet 流未被监听

View Source
var ErrStreamNeedToPointOutGroups = errors.New("stream操作需要指定消费者组")

ErrStreamNeedToPointOutGroups 流操作需要指定消费者组

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	TopicInfos map[string]string
	*clientkeybatch.ClientKeyBatch
	*consumerabc.ConsumerABC
	// contains filtered or unexported fields
}

Consumer 流消费者对象

func NewConsumer

func NewConsumer(kb *clientkeybatch.ClientKeyBatch, opts ...broker.Option) *Consumer

NewConsumer 创建一个指向多个流的消费者 默认一批获取一个消息,可以通过`WithStreamComsumerRecvBatchSize`设置批的大小 如果使用`WithStreamComsumerGroupName`设定了group,则按组消费(默认总组监听的最新位置开始监听,收到消息后确认,消息确认策略可以通过`WithStreamComsumerAckMode`配置), 否则按按单独客户端消费(默认从开始监听的时刻开始消费) 需要注意单独客户端消费不会记录消费的偏移量,因此很容易丢失下次请求时的结果.解决方法是第一次使用`$`,后面则需要记录下id号从它开始 @params k *clientkeybatch.ClientKeyBatch redis客户端的批键对象 @params start string 监听的起始位置,可以是一个unix毫秒时间戳的字符串 @params opts ...broker.Option 消费者的配置

func (*Consumer) Get

func (s *Consumer) Get(ctx context.Context, timeout time.Duration) ([]redis.XStream, error)

Get 从多个流中取出数据 @params ctx context.Context 请求的上下文 @params timeout time.Duration 等待超时时间,为0则表示一直阻塞直到有数据

func (*Consumer) Listen

func (s *Consumer) Listen(asyncHanddler bool, p ...event.Parser) error

Listen 监听流 @params asyncHanddler bool 是否并行执行回调 @params p ...Parser 解析输入消息为事件对象的函数

func (*Consumer) StopListening

func (s *Consumer) StopListening() error

StopListening 停止监听

type Producer

type Producer struct {
	*Stream
	// contains filtered or unexported fields
}

Producer 流的生产者对象

func NewProducer

func NewProducer(k *Stream, opts ...broker.Option) *Producer

NewProducer 创建一个新的流生产者 @params k *clientkey.ClientKey redis客户端的键对象 @params opts ...broker.Option 生产者的配置

func (*Producer) PubEvent

func (p *Producer) PubEvent(ctx context.Context, payload interface{}) (*event.Event, error)

PubEvent 向流中放入事件数据 @params ctx context.Context 请求的上下文 @params payload []byte 发送的消息负载 @returns *event.Event 发送出去的消息对象

func (*Producer) Publish

func (p *Producer) Publish(ctx context.Context, payload interface{}) error

Publish 向流中放入数据 @params ctx context.Context 请求的上下文 @params payload interface{} 发送的消息负载,负载如果不是map[string]interface{}形式或者可以被json/msgpack序列化的对象则统一以[value 值]的形式传出

type Stream

type Stream struct {
	MaxLen int64
	Strict bool
	*clientkey.ClientKey
}

Stream 流对象

func New

func New(k *clientkey.ClientKey, maxlen int64, strict bool) *Stream

New 创建一个新的流对象 @params k *clientkey.ClientKey redis客户端的键对象 @params maxlen int64 流对象的最大长度 @params strict bool 流是否严格控制长度

func (*Stream) Ack

func (s *Stream) Ack(ctx context.Context, groupname string, ids ...string) error

Ack 手工确认组已经消耗了消息 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params ids ...string 确认被消耗的id列表

func (*Stream) AsProducer

func (s *Stream) AsProducer(opts ...broker.Option) *Producer

func (*Stream) CreateGroup

func (s *Stream) CreateGroup(ctx context.Context, groupname, start string, autocreate bool) (string, error)

CreateGroup 为指定消费者在指定的Stream上创建消费者组 如果stream不存在则创建之 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params start string 开始位置,"$"表示最近,"0"表示最初.也可以填入正常的id

func (*Stream) Delete

func (s *Stream) Delete(ctx context.Context, ids ...string) error

Delete 设置标志位标识删除流中指定id的数据 @params ctx context.Context 上下文信息,用于控制请求的结束 @params ids ...string 要删除的消息id列表

func (*Stream) DeleteConsumerFromGroup

func (s *Stream) DeleteConsumerFromGroup(ctx context.Context, groupname, consumername string) (int64, error)

DeleteConsumerFromGroup 在指定的Stream上删除指定消费者组中的指定消费者 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params groupname string 消费者组名列表

func (*Stream) DeleteGroup

func (s *Stream) DeleteGroup(ctx context.Context, groupname string) (int64, error)

DeleteGroup 为指定消费者在指定的Stream上删除消费者组 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表

func (*Stream) GroupInfos

func (s *Stream) GroupInfos(ctx context.Context) ([]redis.XInfoGroup, error)

GroupInfos 获取主题流中注册的消费者组信息 @params ctx context.Context 上下文信息,用于控制请求的结束

func (*Stream) HasGroup

func (s *Stream) HasGroup(ctx context.Context, groupname string) (bool, error)

HasGroup 判断消费者组是否在Stream上 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名

func (*Stream) HasGroups

func (s *Stream) HasGroups(ctx context.Context, groupnames ...string) (bool, error)

HasGroups 判断消费者组是否都在在Stream上 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupnames ...string 消费者组名列表

func (*Stream) Len

func (s *Stream) Len(ctx context.Context) (int64, error)

Len 查看流的当前长度 @params ctx context.Context 上下文信息,用于控制请求的结束

func (*Stream) Move

func (s *Stream) Move(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ids ...string) ([]redis.XMessage, error)

Move 转移消息的所有权给用户组中的某个用户 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params toconsumer string 要转移给所有权的消费者 @params minIdle time.Duration 被转移出去的消息等待时间最小值,为0则时间计数被重置 @params ids ...string 要转移所有权的消息id

func (*Stream) MoveJustID

func (s *Stream) MoveJustID(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ids ...string) ([]string, error)

MoveJustID 转移消息的所有权给用户组中的某个用户 这个命令和Move区别在于返回值只有消息id @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params toconsumer string 要转移给所有权的消费者 @params minIdle time.Duration 被转移出去的消息等待时间最小值,为0则时间计数被重置 @params ids ...string 要转移所有权的消息id

func (*Stream) Pending

func (s *Stream) Pending(ctx context.Context, groupname string) (*redis.XPending, error)

Pending 查看消费组中等待确认的消息列表 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表

func (*Stream) Range

func (s *Stream) Range(ctx context.Context, start, stop string) ([]redis.XMessage, error)

Range 获取消息列表,会自动过滤已经删除的消息 @params ctx context.Context 请求的上下文 @params start string 开始位置,`-`表示最小值, `+`表示最大值,可以指定毫秒级时间戳,也可以指定特定消息id @params stop string 结束位置,`-`表示最小值, `+`表示最大值,可以指定毫秒级时间戳,也可以指定特定消息id

func (*Stream) SetGroupStartAt

func (s *Stream) SetGroupStartAt(ctx context.Context, groupname, start string) (string, error)

SetGroupStartAt 设置指定消费者组在主题流中的读取起始位置 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params start string 开始位置

func (*Stream) Trim

func (s *Stream) Trim(ctx context.Context, count int64, strict bool) (int64, error)

Trim 为流扩容 @params ctx context.Context 上下文信息,用于控制请求的结束 @params count int64 扩容到多长 @params strict bool 是否严格控制长度

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL