streamhelper

package
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2022 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package stream对象

streamhelper 满足pchelper规定的生产者和消费者接口,同时提供stream管理对象

Index

Constants

This section is empty.

Variables

View Source
var ErrStreamConsumerAlreadyListened = errors.New("stream already listened")

ErrStreamConsumerAlreadyListened 流已经被监听了

View Source
var ErrStreamConsumerNotListeningYet = errors.New("stream not listening yet")

ErrStreamConsumerNotListeningYet 流未被监听

View Source
var ErrStreamNeedToPointOutGroups = errors.New("stream need to point out groups")

ErrStreamNeedToPointOutGroups 流操作需要指定消费者组

Functions

func SerializeWithJSON

func SerializeWithJSON() optparams.Option[Options]

SerializeWithJSON 使用JSON作为序列化反序列化的协议

func SerializeWithMsgpack

func SerializeWithMsgpack() optparams.Option[Options]

SerializeWithMsgpack 使用JSON作为序列化反序列化的协议

func WithAutocreate

func WithAutocreate() optparams.Option[createGroupOpt]

WithAutocreate CreateGroup方法的参数,用于设置是否自动创建流

func WithBlockTime

func WithBlockTime(d time.Duration) optparams.Option[Options]

WithBlockTime 设置客户端阻塞等待消息的时长

func WithClientID

func WithClientID(clientID string) optparams.Option[Options]

WithClientID 中间件通用设置,设置客户端id

func WithConsumerAckMode

func WithConsumerAckMode(ack AckModeType) optparams.Option[Options]

WithConsumerAckMode stream消费者专用,用于设定同步校验规则

func WithConsumerDefaultStartAt

func WithConsumerDefaultStartAt(t time.Time) optparams.Option[Options]

WithConsumerDefaultStartAt stream消费者专用,用于设定默认消费起始时间

func WithConsumerDefaultStartEarliest

func WithConsumerDefaultStartEarliest() optparams.Option[Options]

WithConsumerDefaultStartEarliest stream消费者专用,用于设定默认消费从最早的消息开始

func WithConsumerDefaultStartLatest

func WithConsumerDefaultStartLatest() optparams.Option[Options]

WithConsumerDefaultStartLatest stream消费者专用,用于设定默认消费从最新的消息开始

func WithConsumerDefaultStartPosition

func WithConsumerDefaultStartPosition(flag string) optparams.Option[Options]

WithConsumerDefaultStartPosition stream消费者专用,用于设定默认消费起始位置,不设置则group设置为`$`,否则设置为`>`

func WithConsumerGroupName

func WithConsumerGroupName(groupname string) optparams.Option[Options]

WithConsumerGroupName stream消费者专用,用于设定客户端组

func WithConsumerRecvBatchSize

func WithConsumerRecvBatchSize(size int64) optparams.Option[Options]

WithConsumerRecvBatchSize stream消费者专用,用于设定一次获取的消息批长度

func WithProducerDefaultMaxLen

func WithProducerDefaultMaxLen(n int64) optparams.Option[Options]

WithProducerDefaultMaxLen stream生产者专用,用于设置流的默认最长长度

func WithProducerDefaultStrict

func WithProducerDefaultStrict() optparams.Option[Options]

WithProducerStrict stream生产者专用,用于设置流是否为严格模式

func WithStartAtID

func WithStartAtID(id string) optparams.Option[createGroupOpt]

WithStartLatest CreateGroup方法的参数,用于设置创建流后流的默认读取位置为指定id

func WithStartEarliest

func WithStartEarliest() optparams.Option[createGroupOpt]

WithStartEarliest CreateGroup方法的参数,用于设置创建流后流的默认读取位置为最早数据

func WithStartLatest

func WithStartLatest() optparams.Option[createGroupOpt]

WithStartLatest CreateGroup方法的参数,用于设置创建流后流的默认读取位置为最新数据

func WithStrict

func WithStrict() optparams.Option[trimOpt]

WithStrict Trim方法的参数,用于设置严格控制长度

func WithUUIDSnowflake

func WithUUIDSnowflake() optparams.Option[Options]

WithUUIDSnowflake 使用snowflake作为uuid的生成器

func WithUUIDSonyflake

func WithUUIDSonyflake() optparams.Option[Options]

WithUUIDSonyflake 使用sonyflake作为uuid的生成器

func WithUUIDv4

func WithUUIDv4() optparams.Option[Options]

WithUUIDv4 使用uuid4作为uuid的生成器

Types

type AckModeType

type AckModeType uint8

AckModeType stream的Ack模式

const (

	//AckModeAckWhenGet 获取到后确认
	AckModeAckWhenGet AckModeType = iota
	//AckModeAckWhenDone 处理完后确认
	AckModeAckWhenDone
	//AckModeNoAck 不做确认,消费者需要自己实现ack操作,最好别这么用
	AckModeNoAck
)

type Consumer

type Consumer struct {
	*clientIdhelper.ClientIDAbc
	*pchelper.ConsumerABC

	TopicInfos map[string]string
	// contains filtered or unexported fields
}

Consumer 流消费者对象

func NewConsumer

func NewConsumer(cli redis.UniversalClient, opts ...optparams.Option[Options]) (*Consumer, error)

NewConsumer 创建一个指向多个流的消费者 默认一批获取一个消息,可以通过`WithStreamComsumerRecvBatchSize`设置批的大小 如果使用`WithStreamComsumerGroupName`设定了group,则按组消费(默认总组监听的最新位置开始监听,收到消息后确认,消息确认策略可以通过`WithStreamComsumerAckMode`配置), 否则按按单独客户端消费(默认从开始监听的时刻开始消费) 需要注意单独客户端消费不会记录消费的偏移量,因此很容易丢失下次请求时的结果.解决方法是第一次使用`$`,后面则需要记录下id号从它开始 @params cli redis.UniversalClient redis客户端对象 @params opts ...optparams.Option[pchelper.Options] 消费者的配置

func (*Consumer) Client

func (s *Consumer) Client() redis.UniversalClient

Client 获取连接的redis客户端

func (*Consumer) Get

func (s *Consumer) Get(ctx context.Context, timeout time.Duration) ([]redis.XStream, error)

Get 从多个流中取出数据 @params ctx context.Context 请求的上下文 @params timeout time.Duration 等待超时时间,为0则表示一直阻塞直到有数据

func (*Consumer) Listen

func (s *Consumer) Listen(topics string, opts ...optparams.Option[pchelper.ListenOptions]) error

Listen 监听流,默认情况下从所有topic的起始位置开始 @params topics string 监听的topic,复数topic用`,`隔开 @params opts ...optparams.Option[pchelper.ListenOptions] 监听时的一些配置,具体看listenoption.go说明

func (*Consumer) StopListening

func (s *Consumer) StopListening() error

StopListening 停止监听

type Options

type Options struct {
	BlockTime            time.Duration                              //queue结构使用的参数,用于设置每次拉取的阻塞时长
	RecvBatchSize        int64                                      //stream消费者专用,用于设定一次获取的消息批长度
	Group                string                                     //stream消费者专用,用于设定客户端组
	AckMode              AckModeType                                //stream消费者专用,用于设定同步校验规则
	DefaultStart         string                                     //stream消费者专用,用于设定默认的监听的起始位置
	DefaultMaxLen        int64                                      //stream生产者专用,用于设置流的默认最长长度
	DefaultStrict        bool                                       //stream生产者专用,用于设置流是否为严格模式
	ProducerConsumerOpts []optparams.Option[pchelper.Options]       //初始化pchelper的配置
	ClientIDOpts         []optparams.Option[clientIdhelper.Options] //初始化ClientID的配置
}

type Producer

type Producer struct {
	*pchelper.ProducerConsumerABC
	*clientIdhelper.ClientIDAbc
	// contains filtered or unexported fields
}

Producer 流的生产者对象

func NewProducer

func NewProducer(cli redis.UniversalClient, opts ...optparams.Option[Options]) (*Producer, error)

NewProducer 创建一个新的流生产者 @params k *clientkey.ClientKey redis客户端的键对象 @params opts ...broker.Option 生产者的配置

func (*Producer) Client

func (p *Producer) Client() redis.UniversalClient

Client 获取连接的redis客户端

func (*Producer) PubEvent

func (p *Producer) PubEvent(ctx context.Context, topic string, payload interface{}, opts ...optparams.Option[pchelper.PublishOptions]) (*pchelper.Event, error)

PubEvent 向流中放入事件数据 @params ctx context.Context 请求的上下文 @params payload []byte 发送的消息负载 @returns *event.Event 发送出去的消息对象

func (*Producer) Publish

func (p *Producer) Publish(ctx context.Context, topic string, payload interface{}, opts ...optparams.Option[pchelper.PublishOptions]) error

Publish 向流中放入数据 @params ctx context.Context 请求的上下文 @params payload interface{} 发送的消息负载,负载如果不是map[string]interface{}形式或者可以被json/msgpack序列化的对象则统一以[value 值]的形式传出

type Stream

type Stream struct {
	Name string
	// contains filtered or unexported fields
}

Stream 流对象

func NewStream

func NewStream(cli redis.UniversalClient, name string) *Stream

NewStream 创建一个新的流对象 @params cli redis.UniversalClient redis客户端的对象 @params name string 流名

func (*Stream) Ack

func (s *Stream) Ack(ctx context.Context, groupname string, ids ...string) error

Ack 手工确认组已经消耗了消息 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params ids ...string 确认被消耗的id列表

func (*Stream) CreateGroup

func (s *Stream) CreateGroup(ctx context.Context, groupname string, opts ...optparams.Option[createGroupOpt]) (string, error)

CreateGroup 为指定消费者在指定的Stream上创建消费者组,创建流后流的默认读取位置为最新数据 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表

func (*Stream) Delete

func (s *Stream) Delete(ctx context.Context, ids ...string) error

Delete 设置标志位标识删除流中指定id的数据 @params ctx context.Context 上下文信息,用于控制请求的结束 @params ids ...string 要删除的消息id列表

func (*Stream) DeleteConsumerFromGroup

func (s *Stream) DeleteConsumerFromGroup(ctx context.Context, groupname, consumername string) (int64, error)

DeleteConsumerFromGroup 在指定的Stream上删除指定消费者组中的指定消费者 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params groupname string 消费者组名列表

func (*Stream) DeleteGroup

func (s *Stream) DeleteGroup(ctx context.Context, groupname string) (int64, error)

DeleteGroup 为指定消费者在指定的Stream上删除消费者组 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表

func (*Stream) GroupInfos

func (s *Stream) GroupInfos(ctx context.Context) ([]redis.XInfoGroup, error)

GroupInfos 获取主题流中注册的消费者组信息 @params ctx context.Context 上下文信息,用于控制请求的结束

func (*Stream) HasGroup

func (s *Stream) HasGroup(ctx context.Context, groupname string) (bool, error)

HasGroup 判断消费者组是否在Stream上 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名

func (*Stream) HasGroups

func (s *Stream) HasGroups(ctx context.Context, groupnames ...string) (bool, error)

HasGroups 判断消费者组是否都在在Stream上 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupnames ...string 消费者组名列表

func (*Stream) Len

func (s *Stream) Len(ctx context.Context) (int64, error)

Len 查看流的当前长度 @params ctx context.Context 上下文信息,用于控制请求的结束

func (*Stream) Move

func (s *Stream) Move(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ids ...string) ([]redis.XMessage, error)

Move 转移消息的所有权给用户组中的某个用户 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params toconsumer string 要转移给所有权的消费者 @params minIdle time.Duration 被转移出去的消息等待时间最小值,为0则时间计数被重置 @params ids ...string 要转移所有权的消息id

func (*Stream) MoveJustID

func (s *Stream) MoveJustID(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ids ...string) ([]string, error)

MoveJustID 转移消息的所有权给用户组中的某个用户 这个命令和Move区别在于返回值只有消息id @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params toconsumer string 要转移给所有权的消费者 @params minIdle time.Duration 被转移出去的消息等待时间最小值,为0则时间计数被重置 @params ids ...string 要转移所有权的消息id

func (*Stream) Pending

func (s *Stream) Pending(ctx context.Context, groupname string) (*redis.XPending, error)

Pending 查看消费组中等待确认的消息列表 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表

func (*Stream) Range

func (s *Stream) Range(ctx context.Context, start, stop string) ([]redis.XMessage, error)

Range 获取消息列表,会自动过滤已经删除的消息 @params ctx context.Context 请求的上下文 @params start string 开始位置,`-`表示最小值, `+`表示最大值,可以指定毫秒级时间戳,也可以指定特定消息id @params stop string 结束位置,`-`表示最小值, `+`表示最大值,可以指定毫秒级时间戳,也可以指定特定消息id

func (*Stream) SetGroupStartAt

func (s *Stream) SetGroupStartAt(ctx context.Context, groupname, start string) (string, error)

SetGroupStartAt 设置指定消费者组在主题流中的读取起始位置 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params start string 开始位置

func (*Stream) Trim

func (s *Stream) Trim(ctx context.Context, count int64, opts ...optparams.Option[trimOpt]) (int64, error)

Trim 为流扩容 @params ctx context.Context 上下文信息,用于控制请求的结束 @params count int64 扩容到多长 @params strict bool 是否严格控制长度

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL