redis

package
v0.0.0-...-2ce5273 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package redis 提供基于Redis的消息总线实现

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// 连接地址 (单机模式、集群模式或哨兵模式)
	Addrs []string `json:"addrs" mapstructure:"addrs"`

	// 密码,如果需要的话
	Password string `json:"password" mapstructure:"password"`

	// 数据库编号 (仅单机模式和哨兵模式有效)
	DB int `json:"db" mapstructure:"db"`

	// 哨兵模式的主节点名称
	MasterName string `json:"master_name" mapstructure:"master_name"`

	// 连接池大小
	PoolSize int `json:"pool_size" mapstructure:"pool_size"`

	// 最小空闲连接数
	MinConn int `json:"min_conn" mapstructure:"min_conn"`

	// 连接超时时间
	DialTimeout time.Duration `json:"dial_timeout" mapstructure:"dial_timeout"`

	// 读取超时时间
	ReadTimeout time.Duration `json:"read_timeout" mapstructure:"read_timeout"`

	// 写入超时时间
	WriteTimeout time.Duration `json:"write_timeout" mapstructure:"write_timeout"`

	// 重试间隔
	RetryInterval time.Duration `json:"retry_interval" mapstructure:"retry_interval"`

	// 最大重试次数
	MaxRetries int `json:"max_retries" mapstructure:"max_retries"`

	// 消息总线操作超时(发布超时)
	OpTimeout time.Duration `json:"op_timeout" mapstructure:"op_timeout"`

	// 键前缀
	KeyPrefix string `json:"key_prefix" mapstructure:"key_prefix"`

	// 模式: single(单机), sentinel(哨兵), cluster(集群)
	Mode string `json:"mode" mapstructure:"mode"`
}

Config Redis连接配置选项

func DefaultConfig

func DefaultConfig() Config

type RedisBus

type RedisBus struct {
	// contains filtered or unexported fields
}

func New

func New(cfg Config) (*RedisBus, error)

func (*RedisBus) Close

func (r *RedisBus) Close() error

Close 实现MessageBus.Close,关闭Redis连接

func (*RedisBus) GetReconnectCount

func (r *RedisBus) GetReconnectCount() uint64

GetReconnectCount 获取重连次数

func (*RedisBus) IncPublishErrors

func (r *RedisBus) IncPublishErrors()

IncPublishErrors 增加发布错误计数

func (*RedisBus) IncReconnects

func (r *RedisBus) IncReconnects()

IncReconnects 增加重连计数

func (*RedisBus) IncSubscribeErrors

func (r *RedisBus) IncSubscribeErrors()

IncSubscribeErrors 增加订阅错误计数

func (*RedisBus) ObserveSubscribeLatency

func (r *RedisBus) ObserveSubscribeLatency(d time.Duration)

ObserveSubscribeLatency 观察订阅延迟

func (*RedisBus) Publish

func (r *RedisBus) Publish(ctx context.Context, topic string, data []byte) error

func (*RedisBus) Subscribe

func (r *RedisBus) Subscribe(ctx context.Context, topic string) (<-chan []byte, error)

Subscribe 实现MessageBus.Subscribe,订阅Redis频道

func (*RedisBus) SubscribeWithTimestamp

func (r *RedisBus) SubscribeWithTimestamp(ctx context.Context, topic string) (<-chan *bus.Message, error)

SubscribeWithTimestamp 实现MessageBus.SubscribeWithTimestamp,订阅Redis频道并返回带时间戳的消息

func (*RedisBus) Unsubscribe

func (r *RedisBus) Unsubscribe(topic string) error

Unsubscribe 实现MessageBus.Unsubscribe,取消Redis订阅

type RetryHook

type RetryHook struct {
	// contains filtered or unexported fields
}

RetryHook 用于统计重试次数的钩子

func (*RetryHook) DialHook

func (rh *RetryHook) DialHook(hook redis.DialHook) redis.DialHook

func (*RetryHook) OnRetry

func (rh *RetryHook) OnRetry(ctx context.Context, cmd redis.Cmder, attempt int, err error)

OnRetry 记录每次重连事件

func (*RetryHook) ProcessHook

func (rh *RetryHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook

func (*RetryHook) ProcessPipelineHook

func (rh *RetryHook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL