rmq

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2025 License: MIT Imports: 16 Imported by: 0

README

RMQ缓存库使用文档

介绍

RMQ(redis memory queue)是一个包含内存和Redis的缓存解决方案,它提供了高级功能如分布式延迟队列、事件监听以及消息重试机制。

核心组件概览

  • Memory: 基于bigcache实现的内存缓存,增加了ttl能力。
  • Redis: Redis客户端封装,支持批量删除等额外功能。
  • Cache: 提供了L1(内存)和L2(Redis)两级缓存。
  • Dict: 基于Cache实现的字典功能,扩展了回源能力。
  • DelayQueue: 提供基于Redis+lua实现的延迟队列。
  • Publisher & Monitor: 分别用于发送消息和监控队列状态。

安装

go get -u github.com/zohu/rmq

使用方法

  1. 适用内存缓存
// 实例化方法
// mem := rmq.NewMemory(opts ...MemoryOption) *Memory

// 如果仅使用全局单例,可以先全局初始化,然后直接使用
rmq.InitMemory()
rmq.Memory().Set("test", "hello world")
rmq.Memory().Get("test")
rmq.Memory().Delete("test")
/* 更多方法参考memory.go */

  1. 使用Redis缓存
// 实例化
// rds := rmq.NewRedis(opts ...RedisOption) *Redis

// 如果仅使用全局单例,可以先全局初始化,然后直接使用
rmq.InitRedis(
    WithRedisPrefix("rmq"), // 全局前缀,对业务透明, 默认为rmq
    WithRedisOptions(&redis.UniversalOptions{
        Addrs:      []string{"localhost:8011"},
        Password:   "123456",
        ClientName: "rmq",
    }),
)

rmq.Rds().Set(ctx, "key", "value")
/* 更多方法参考go-redis */
  1. 使用二级缓存
// 实例化
// rmq.NewCache(m *Memory, r *Redis) *Cache

// 如果仅使用全局单例,可以先全局初始化,然后直接使用
rmq.InitCache(rmq.Mem(), rmq.Rds())

rmq.Cache().Set(ctx, "key", "value", time.Minute)
rmq.Cache().Get(ctx, "key")
rmq.Cache().Del(ctx, "key")
  1. 使用字典
// 使用字典之前,需要先初始化全局Cache
rmq.InitCache(rmq.Mem(), rmq.Rds())

// name=字典名,比如"store_name"
// query=查询函数,返回字典值 func(ctx context.Context, key string) string
// ttl=回源结果的默认有效期
rmq.NewDict(name DictName, query DictQuery, ttl ...time.Duration)

// 使用方法
rmq.Dict(ctx, name DictName, key string) (string, error)
  1. 使用延迟队列
// 定义一个主题
const (
	orderTopic rmq.Topic = "order"
)
dq := rmq.NewQueue(orderTopic, opts ...QueueOption) *DelayQueue
// 注册消费者
done := dq.Subscribe(cb func(payload string) bool) (done <-chan struct{})

// 发送消息
dq.SendScheduleMsg(payload string, t time.Time) (string, error)
dq.SendDelayMsg(payload string, duration time.Duration) (string, error)
// 拦截消息
dq.TryIntercept(id string) (*InterceptResult, error)

// 其他方法
dq.Stop()
dq.GetPendingCount() (int64, error)
dq.GetReadyCount() (int64, error)
dq.GetProcessingCount() (int64, error)
dq.ListenEvent(listener EventListener)
dq.DisableListener()
dq.EnableReport()
dq.DisableReport()
// 消费者和生产者不在一起
// 同服务内(内部有缓存topic的配置,可以无需再传):
dq := rmq.NewPublisher(topic) *Publisher

// 跨服务的(必须传入和消费者相同的配置,特别是prefix)
dq := rmq.NewPublisher(topic Topic, opts ...QueueOption) *Publisher

// 监控同理
mnt := rmq.NewMonitor(topic Topic, opts ...QueueOption) *Monitor

依赖库

Documentation

Index

Constants

View Source
const (
	StatePending    = "pending"
	StateReady      = "ready"
	StateReadyRetry = "ready_to_retry"
	StateConsuming  = "consuming"
	StateUnknown    = "unknown"
)

Variables

This section is empty.

Functions

func Dict

func Dict(ctx context.Context, name DictName, key string) (string, error)

func InitCache

func InitCache(m *Memory, r *Redis)

func InitMemory

func InitMemory(opts ...MemoryOption)

func InitRedis

func InitRedis(opts ...RedisOption)

func NewDict

func NewDict(name DictName, query DictQuery, ttl ...time.Duration)

func Ptr

func Ptr[T any](v T) *T

func Val

func Val[T any](v *T) T

func WithSkipPrefix added in v0.0.2

func WithSkipPrefix(ctx context.Context) context.Context

Types

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

func CacheL2

func CacheL2() *Cache

func NewCache

func NewCache(m *Memory, r *Redis) *Cache

func (*Cache) Del

func (c *Cache) Del(ctx context.Context, k string) error

func (*Cache) Get

func (c *Cache) Get(ctx context.Context, k string) (string, error)

func (*Cache) Set

func (c *Cache) Set(ctx context.Context, key string, value string, ttl time.Duration)

type DelayQueue

type DelayQueue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(topic Topic, opts ...QueueOption) *DelayQueue

func (*DelayQueue) DisableListener

func (q *DelayQueue) DisableListener()

func (*DelayQueue) DisableReport

func (q *DelayQueue) DisableReport()

func (*DelayQueue) EnableReport

func (q *DelayQueue) EnableReport()

func (*DelayQueue) GetPendingCount

func (q *DelayQueue) GetPendingCount() (int64, error)

func (*DelayQueue) GetProcessingCount

func (q *DelayQueue) GetProcessingCount() (int64, error)

func (*DelayQueue) GetReadyCount

func (q *DelayQueue) GetReadyCount() (int64, error)

func (*DelayQueue) ListenEvent

func (q *DelayQueue) ListenEvent(listener EventListener)

func (*DelayQueue) SendDelayMsg

func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration) (string, error)

func (*DelayQueue) SendScheduleMsg

func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time) (string, error)

func (*DelayQueue) Stop

func (q *DelayQueue) Stop()

func (*DelayQueue) Subscribe

func (q *DelayQueue) Subscribe(cb func(payload string) bool) (done <-chan struct{})

func (*DelayQueue) TryIntercept

func (q *DelayQueue) TryIntercept(id string) (*InterceptResult, error)

type DictName

type DictName Prefix

func (DictName) String

func (t DictName) String(args ...string) string

type DictQuery

type DictQuery func(ctx context.Context, key string) string

type Event

type Event struct {
	Code      EventType
	Timestamp int64
	MsgCount  int
}

type EventListener

type EventListener interface {
	OnEvent(*Event)
}

type EventType

type EventType int
const (
	// 发送消息时
	NewMessageEvent EventType = iota + 1
	// 消息到达传递时间时发出
	ReadyEvent
	// 消息已发送给消费者时
	DeliveredEvent
	// 消费成功
	AckEvent
	// 消费失败
	NackEvent
	// 重新发送消息
	RetryEvent
	// 已达最大重试次数
	FinalFailedEvent
)

func (EventType) String

func (t EventType) String() string

type InterceptResult

type InterceptResult struct {
	Intercepted bool
	State       string
}

type MemEntry

type MemEntry struct {
	Expire int64  `json:"e"`
	Value  string `json:"v"`
}

type Memory

type Memory struct {
	// contains filtered or unexported fields
}

func Mem

func Mem() *Memory

func NewMemory

func NewMemory(opts ...MemoryOption) *Memory

func (*Memory) Capacity

func (m *Memory) Capacity() int

func (*Memory) Close

func (m *Memory) Close() error

func (*Memory) Delete

func (m *Memory) Delete(key string)

func (*Memory) Get

func (m *Memory) Get(key string) (string, bool)

func (*Memory) KeyMetadata

func (m *Memory) KeyMetadata(key string) bigcache.Metadata

func (*Memory) Len

func (m *Memory) Len() int

func (*Memory) Range

func (m *Memory) Range(fn func(key string, value string))

func (*Memory) Set

func (m *Memory) Set(key string, value string, ttl ...time.Duration)

func (*Memory) Stats

func (m *Memory) Stats() bigcache.Stats

type MemoryOption

type MemoryOption func(*memoryOptions)

func WithMemoryOptions

func WithMemoryOptions(options *bigcache.Config) MemoryOption

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

func NewMonitor

func NewMonitor(topic Topic, opts ...QueueOption) *Monitor

func (*Monitor) GetPendingCount

func (m *Monitor) GetPendingCount() (int64, error)

func (*Monitor) GetProcessingCount

func (m *Monitor) GetProcessingCount() (int64, error)

func (*Monitor) GetReadyCount

func (m *Monitor) GetReadyCount() (int64, error)

func (*Monitor) ListenEvent

func (m *Monitor) ListenEvent(listener EventListener) (func(), error)

type Prefix

type Prefix string

func (Prefix) String

func (p Prefix) String(args ...string) string

type PrefixHook

type PrefixHook struct {
	// contains filtered or unexported fields
}

func NewPrefixHook

func NewPrefixHook(prefix Prefix) *PrefixHook

func (PrefixHook) DialHook

func (h PrefixHook) DialHook(next redis.DialHook) redis.DialHook

func (PrefixHook) ProcessHook

func (h PrefixHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook

func (PrefixHook) ProcessPipelineHook

func (h PrefixHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(topic Topic, opts ...QueueOption) *Publisher

func (*Publisher) SendDelayMsg

func (p *Publisher) SendDelayMsg(payload string, duration time.Duration) (string, error)

func (*Publisher) SendScheduleMsg

func (p *Publisher) SendScheduleMsg(payload string, t time.Time) (string, error)

type QueueOption

type QueueOption func(*queueOptions)

func WithQueueConcurrent

func WithQueueConcurrent(concurrent uint) QueueOption

func WithQueueDefaultTTL

func WithQueueDefaultTTL(ttl time.Duration) QueueOption

func WithQueueFetchInterval

func WithQueueFetchInterval(interval time.Duration) QueueOption

func WithQueueFetchLimit

func WithQueueFetchLimit(limit uint) QueueOption

func WithQueueHashTag

func WithQueueHashTag(hashTag bool) QueueOption

func WithQueueMaxConsumeDuration

func WithQueueMaxConsumeDuration(duration time.Duration) QueueOption

func WithQueueNackRedeliveryDelay

func WithQueueNackRedeliveryDelay(delay time.Duration) QueueOption

func WithQueueRetryCount

func WithQueueRetryCount(count uint) QueueOption

type Redis

type Redis struct {
	redis.UniversalClient
	// contains filtered or unexported fields
}

func NewRedis

func NewRedis(opts ...RedisOption) *Redis

func Rds

func Rds() *Redis

func (*Redis) BatchDelete

func (r *Redis) BatchDelete(ctx context.Context, pattern string, batchSize ...int64) (int64, error)

BatchDelete @Description: 批量删除 @receiver r @param ctx @param pattern 匹配模式,如foo* 表示匹配foo开头的所有key @param batchSize 单次批量删除的数量 @return int64 @return error

func (*Redis) FloatToZ

func (r *Redis) FloatToZ(values map[string]float64) []redis.Z

type RedisOption

type RedisOption func(*redisOptions)

func WithRedisOptions

func WithRedisOptions(redis *redis.UniversalOptions) RedisOption

func WithRedisPrefix

func WithRedisPrefix(prefix string) RedisOption

type Topic

type Topic Prefix

func (Topic) String

func (t Topic) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL