Documentation
¶
Index ¶
- Variables
- func New() *redisProxy
- type NameSpcaeKey
- type PubSubTopic
- type Queue
- type QueueMessage
- type StreamTopic
- func (topic *StreamTopic) CreateGroup(groupname, start string) (string, error)
- func (topic *StreamTopic) Delete(ids ...string) error
- func (topic *StreamTopic) DeleteConsumer(groupname string, consumername string) (int64, error)
- func (topic *StreamTopic) DeleteGroup(groupname string) (int64, error)
- func (topic *StreamTopic) GroupInfos() ([]redis.XInfoGroups, error)
- func (topic *StreamTopic) HasGroup(groupname string) (bool, error)
- func (topic *StreamTopic) HasGroups(groupnames []string) (bool, error)
- func (topic *StreamTopic) Len() (int64, error)
- func (topic *StreamTopic) Move(groupname string, toconsumer string, minIdle time.Duration, ids ...string) ([]redis.XMessage, error)
- func (topic *StreamTopic) Pending(groupname string) (*redis.XPending, error)
- func (topic *StreamTopic) Publish(value map[string]interface{}) (string, error)
- func (topic *StreamTopic) Range(start, stop string) ([]redis.XMessage, error)
- func (topic *StreamTopic) SetGroupID(groupname string, start string) (string, error)
- func (topic *StreamTopic) Trim(count int64) (int64, error)
Constants ¶
This section is empty.
Variables ¶
var Done = errors.New("done")
Done 锁的结束信号
var ErrCountMustBePositive = errors.New("Count Must Be Positive")
ErrCountMustBePositive 个数参数必须为正数
var ErrElementNotExist = errors.New("Element Not Exist")
ErrElementNotExist 个数参数必须为正数
var ErrGroupNotInTopic = errors.New("Group Not In Topic")
ErrGroupNotInTopic 消费组不在topic上
var ErrIsAlreadySubscribed = errors.New("Is Already Subscribed")
ErrIsAlreadySubscribed 消费者已经订阅了数据
var ErrIsAlreadyUnSubscribed = errors.New("Is Already UnSubscribed")
ErrIsAlreadyUnSubscribed 消费者已经取消订阅了数据
var ErrKeyNotExist = errors.New("key not exist")
ErrKeyNotExist 键不存在
var ErrLockAlreadySet = errors.New("Lock is already setted")
ErrLockAlreadySet 分布式锁已经设置
var ErrLockWaitTimeout = errors.New("wait lock timeout")
ErrLockWaitTimeout 等待解锁超时
var ErrProxyAlreadyInited = errors.New("proxy already inited yet")
ErrProxyAlreadyInited 代理已经初始化错误
var ErrProxyNotInited = errors.New("proxy not inited yet")
ErrProxyNotInited 代理未初始化错误
var ErrPubSubNotSubscribe = errors.New("PubSub Not Subscribe")
ErrPubSubNotSubscribe Pubsub模式订阅者未订阅
var ErrQueueGetNotMatch = errors.New("Queue Get NotMatch")
ErrQueueGetNotMatch Pubsub模式订阅者未订阅
var ErrRankError = errors.New("Rank Error")
ErrRankError 排名信息有问题
var ErrStreamConsumerNotBlocked = errors.New("Stream Consumer Must Be Blocked")
ErrStreamConsumerNotBlocked 消费者不是用阻塞模式监听
var Proxy = New()
Proxy 默认的pg代理对象
Functions ¶
Types ¶
type NameSpcaeKey ¶
type NameSpcaeKey struct {
// contains filtered or unexported fields
}
NameSpcaeKey 带命名空间的键
func NewNameSpaceKey ¶
func NewNameSpaceKey(namespace string) *NameSpcaeKey
NewNameSpaceKey 创建一个带命名空间的key
func (*NameSpcaeKey) AddSubNamespace ¶
func (nsk *NameSpcaeKey) AddSubNamespace(namespace string) *NameSpcaeKey
AddSubNamespace 在原来的命名空间基础上加一级子命名空间创建一个新的命名空间
type PubSubTopic ¶
type PubSubTopic struct { Name string // contains filtered or unexported fields }
PubSubTopic 流主题
func NewPubSubTopic ¶
func NewPubSubTopic(proxy *redisProxy, name string) *PubSubTopic
NewPubSubTopic 新建一个PubSub主题
func (*PubSubTopic) Publish ¶
func (topic *PubSubTopic) Publish(value interface{}) (int64, error)
Publish 向发布订阅主题发送消息
type Queue ¶
type Queue struct { Name string // contains filtered or unexported fields }
Queue 消息队列
func (*Queue) Get ¶
func (q *Queue) Get(timeout time.Duration) (*QueueMessage, error)
Get 从队列中取出数据,timeout为0则表示一直阻塞直到有数据
func (*Queue) GetNoWait ¶
func (q *Queue) GetNoWait() (*QueueMessage, error)
GetNoWait 从队列中取出数据,timeout为0则表示一直阻塞直到有数据
type StreamTopic ¶
type StreamTopic struct { Name string MaxLen int64 Strict bool //maxlen with ~ // contains filtered or unexported fields }
StreamTopic 流主题
func NewStreamTopic ¶
func NewStreamTopic(proxy *redisProxy, name string, maxlen int64, strict bool) *StreamTopic
NewStreamTopic 新建一个流主题
func (*StreamTopic) CreateGroup ¶
func (topic *StreamTopic) CreateGroup(groupname, start string) (string, error)
CreateGroup 为指定消费者在指定的topic上创建消费者组
func (*StreamTopic) Delete ¶
func (topic *StreamTopic) Delete(ids ...string) error
Delete 设置标志位标识删除主题流中指定id的数据
func (*StreamTopic) DeleteConsumer ¶
func (topic *StreamTopic) DeleteConsumer(groupname string, consumername string) (int64, error)
DeleteConsumer 在指定的topic上删除指定消费者组中的指定消费者
func (*StreamTopic) DeleteGroup ¶
func (topic *StreamTopic) DeleteGroup(groupname string) (int64, error)
DeleteGroup 为指定消费者在指定的topic上删除消费者组
func (*StreamTopic) GroupInfos ¶
func (topic *StreamTopic) GroupInfos() ([]redis.XInfoGroups, error)
GroupInfos 获取主题流中注册的消费者组信息
func (*StreamTopic) HasGroup ¶
func (topic *StreamTopic) HasGroup(groupname string) (bool, error)
HasGroup 判断消费者组是否在topic上
func (*StreamTopic) HasGroups ¶
func (topic *StreamTopic) HasGroups(groupnames []string) (bool, error)
HasGroups 判断消费者组是否都在在topic上
func (*StreamTopic) Move ¶
func (topic *StreamTopic) Move(groupname string, toconsumer string, minIdle time.Duration, ids ...string) ([]redis.XMessage, error)
Move 查看消费组中等待确认的消息列表
func (*StreamTopic) Pending ¶
func (topic *StreamTopic) Pending(groupname string) (*redis.XPending, error)
Pending 查看消费组中等待确认的消息列表
func (*StreamTopic) Publish ¶
func (topic *StreamTopic) Publish(value map[string]interface{}) (string, error)
Publish 向主题流发送消息
func (*StreamTopic) Range ¶
func (topic *StreamTopic) Range(start, stop string) ([]redis.XMessage, error)
Range 获取消息列表,会自动过滤已经删除的消息,注意-表示最小值, +表示最大值
func (*StreamTopic) SetGroupID ¶
func (topic *StreamTopic) SetGroupID(groupname string, start string) (string, error)
SetGroupID 设置指定消费者组在主题流中的读取起始位置