Documentation
¶
Overview ¶
Package redis 提供基于Redis的消息总线实现
Index ¶
- type Config
- type RedisBus
- func (r *RedisBus) Close() error
- func (r *RedisBus) GetReconnectCount() uint64
- func (r *RedisBus) IncPublishErrors()
- func (r *RedisBus) IncReconnects()
- func (r *RedisBus) IncSubscribeErrors()
- func (r *RedisBus) ObserveSubscribeLatency(d time.Duration)
- func (r *RedisBus) Publish(ctx context.Context, topic string, data []byte) error
- func (r *RedisBus) Subscribe(ctx context.Context, topic string) (<-chan []byte, error)
- func (r *RedisBus) SubscribeWithTimestamp(ctx context.Context, topic string) (<-chan *bus.Message, error)
- func (r *RedisBus) Unsubscribe(topic string) error
- type RetryHook
- func (rh *RetryHook) DialHook(hook redis.DialHook) redis.DialHook
- func (rh *RetryHook) OnRetry(ctx context.Context, cmd redis.Cmder, attempt int, err error)
- func (rh *RetryHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook
- func (rh *RetryHook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // 连接地址 (单机模式、集群模式或哨兵模式) Addrs []string `json:"addrs" mapstructure:"addrs"` // 密码,如果需要的话 Password string `json:"password" mapstructure:"password"` // 数据库编号 (仅单机模式和哨兵模式有效) DB int `json:"db" mapstructure:"db"` // 哨兵模式的主节点名称 MasterName string `json:"master_name" mapstructure:"master_name"` // 连接池大小 PoolSize int `json:"pool_size" mapstructure:"pool_size"` // 最小空闲连接数 MinConn int `json:"min_conn" mapstructure:"min_conn"` // 连接超时时间 DialTimeout time.Duration `json:"dial_timeout" mapstructure:"dial_timeout"` // 读取超时时间 ReadTimeout time.Duration `json:"read_timeout" mapstructure:"read_timeout"` // 写入超时时间 WriteTimeout time.Duration `json:"write_timeout" mapstructure:"write_timeout"` // 重试间隔 RetryInterval time.Duration `json:"retry_interval" mapstructure:"retry_interval"` // 最大重试次数 MaxRetries int `json:"max_retries" mapstructure:"max_retries"` // 消息总线操作超时(发布超时) OpTimeout time.Duration `json:"op_timeout" mapstructure:"op_timeout"` // 键前缀 KeyPrefix string `json:"key_prefix" mapstructure:"key_prefix"` // 模式: single(单机), sentinel(哨兵), cluster(集群) Mode string `json:"mode" mapstructure:"mode"` }
Config Redis连接配置选项
func DefaultConfig ¶
func DefaultConfig() Config
type RedisBus ¶
type RedisBus struct {
// contains filtered or unexported fields
}
func (*RedisBus) GetReconnectCount ¶
GetReconnectCount 获取重连次数
func (*RedisBus) IncSubscribeErrors ¶
func (r *RedisBus) IncSubscribeErrors()
IncSubscribeErrors 增加订阅错误计数
func (*RedisBus) ObserveSubscribeLatency ¶
ObserveSubscribeLatency 观察订阅延迟
func (*RedisBus) SubscribeWithTimestamp ¶
func (r *RedisBus) SubscribeWithTimestamp(ctx context.Context, topic string) (<-chan *bus.Message, error)
SubscribeWithTimestamp 实现MessageBus.SubscribeWithTimestamp,订阅Redis频道并返回带时间戳的消息
func (*RedisBus) Unsubscribe ¶
Unsubscribe 实现MessageBus.Unsubscribe,取消Redis订阅
type RetryHook ¶
type RetryHook struct {
// contains filtered or unexported fields
}
RetryHook 用于统计重试次数的钩子
func (*RetryHook) ProcessHook ¶
func (rh *RetryHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook
func (*RetryHook) ProcessPipelineHook ¶
func (rh *RetryHook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook
Click to show internal directories.
Click to hide internal directories.