tablecache

package module
v0.0.0-...-3363874 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

README

tablecache

pkg/tablecache 是一个框架无关的表数据缓存管理扩展包,把“缓存目标声明、数据回源、Redis 写入、刷新锁、TTL 抖动、空值占位”收口到统一组件中。

设计要点

  • Target 只描述缓存元信息和加载函数,不绑定 go-zero、Gin、Kratos、GORM 或具体数据表。
  • Store 是缓存存储抽象,当前提供 NewRedisStore 适配 go-redis,其它框架可按接口自行适配。
  • 业务缓存 key 默认写入 tablecache:data: 命名空间,避免前缀删除或全量刷新误伤非 tablecache 管理的 Redis key;可通过 WithKeyPrefix("project:cache:") 设置项目前缀。刷新、删除和写回只处理已带指定前缀的实际 Redis key。
  • RefreshByKey 支持固定 key 与前缀型 key,例如 project:cache:role_statusproject:cache:config_uuid:{uuid}
  • RefreshByKeys 支持批量刷新多个 key,并自动去重。
  • RefreshAll 只刷新显式标记 RefreshAll 的目标,避免批量误刷参数化大 key,并支持有限并发控制。
  • SetNX 重建锁避免热点 key 缓存击穿。
  • 长耗时 Loader 运行期间会自动续期重建锁,降低锁过期后被其它实例重复重建的概率。
  • 锁值使用随机持有者标识,并通过 Lua 脚本只释放自己的锁,避免超时后误删其它实例新锁。
  • 同一进程内相同 key 的刷新请求会通过 singleflight 合并,减少突发流量下的 Redis 锁请求。
  • 重建成功后会写入短 TTL 结果元信息,避免“空结果但无业务 key”时等待方误判超时。
  • 多条 Entry 通过 WriteBatch 走 Redis Pipeline 批量写入,降低全量重建的网络 RTT。
  • TTL + Jitter 降低同类 key 集中失效导致的缓存雪崩。
  • AllowEmptyMarker + EmptyTTL 对不存在的数据写入短 TTL 空值占位,减少缓存穿透;默认写入独立元信息 key,避免与真实业务值碰撞。
  • Get 提供统一读取和反序列化入口;GetState 可明确区分 hit/miss/emptyGetOrRefreshGetOrRefreshWithLoaderGetOrRefreshWithOptionsLoadThroughLoadThroughWithOptions 可封装“缓存读取 + 回源 + 回填 + 返回”的读穿闭环。
  • LookupMetrics 可补充 lookup_state_hit/miss/emptylookup_refresh_triggered 等细分读取指标;ExtendedMetrics 还可补充 prefix_waitprefix_retry 等前缀刷新排障指标。
  • DeleteByKeyDeleteByPrefix 只允许操作缓存管理器中已注册且已带指定前缀的目标 key 或前缀,并同步清理隐藏空值元信息和短 TTL 重建结果元信息,避免误删任意 Redis key。
  • DeleteByPrefix 默认优先使用前缀 key 索引删除,避免 Redis key 特别多时扫描全库;索引未建立或 Store 不支持索引时会自动降级到 Redis SCAN + UNLINK
  • Redis Cluster 会按 master 并发扫描降级路径;单节点大 keyspace 可通过 WithScanCountWithPrefixDeleteConcurrencyWithScanUnlinkConcurrencyWithUnlinkChunkSize 控制扫描吞吐与 Redis 压力。
  • RedisStore 实现 MutationStore 合并变更快路径,刷新写回时会把旧 key 删除、新 key 写入、前缀索引 SADD/SREM 尽量合并到有界 Pipeline,降低连接池占用和网络 RTT。
  • 刷新写回、隐藏空值标记、前缀清理和完成标记前都会复核分布式锁 owner;锁已过期或被其它实例接管时直接返回 ErrRefreshLockLost,避免旧 owner 写入脏缓存。
  • Loader 返回的 Entry.Key 必须是已带指定前缀的实际 Redis key,并会在写回前校验作用域:固定目标只能写固定 key,前缀目标只能写注册前缀内 key,避免加载器误写非托管 Redis key。
  • 启动时会拒绝固定 key 与前缀目标互相重叠,例如同时注册 user:user:profile,避免 DeleteByPrefix("user:") 误删固定 key。
  • Hash/List/Set/ZSet 返回真实空集合时会写入隐藏空集合元信息,使后续 GetState 明确返回 hit,避免把“空集合”误判成 miss 并反复回源。
  • Entry.Overwrite 可控制写入前是否先删除旧 key,默认覆盖写入;显式 tablecache.Bool(false) 时按 Redis 结构增量更新。
  • LoaderTimeoutWithRebuildContextPolicy(...) 可限制单次回源时长,并控制是否继承调用方取消信号。
  • 前缀刷新采用分层互斥:前缀全量刷新独占前缀锁,同前缀单 key 刷新使用 key 级锁;若遇到全量刷新,会先等待,必要时重试,兼顾一致性和并发度。
  • 内置事件日志统一采用 component=tablecache event=... index=... key=... 风格,便于 gozero-admin 按字段检索 lock_lostprefix_waitprefix_retry 等场景。
  • 启动时会校验重复 Index、重复 Key、重叠前缀目标以及固定 key 落入前缀目标范围等配置问题,避免线上匹配歧义和批量删除误伤。

接入方式

业务项目只需要:

  1. 实现或复用 Store,gozero-admin 使用 NewRedisStore(redis.UniversalClient)
  2. 声明 []Target,在 Loader 中读取数据库并返回标准 []Entry
  3. 在缓存管理接口或业务 miss 回源位置调用 RefreshByKey / RefreshAll / RefreshByKeys
  4. 在业务读取路径调用 Get / GetState / GetOrRefresh / GetOrRefreshWithLoader / GetOrRefreshWithOptions / LoadThrough / LoadThroughWithOptions / LoadThroughBatch,按是否需要显式传入 Loader、透传 fields 或批量读取选择接入方式。

启用默认或自定义 key 前缀后,Target.Key 仍可声明为逻辑前缀,Manager 会在注册目标时收敛到实际 Redis key;Get / GetState 这类纯读取入口也可传逻辑 key 进行查询。但 RefreshByKeyRefreshByKeysDeleteByKeyDeleteByPrefix 以及会在 miss 后回源写入的 LoadThrough / GetOrRefresh,都必须传已带指定前缀的实际 Redis key;未带前缀的 key 只做查询,miss 时不会回源写入。LoadParams.Key 会传入实际 Redis key,LoadParams.KeyParts 仍按目标前缀后的业务片段解析;Loader 返回的 Entry.Key 必须使用 params.Key 或其它已带前缀且在目标范围内的实际 Redis key。

Redis key 特别多时,前缀删除的主要瓶颈通常是“全库游标扫描”和“扫描后删除”两段。RedisStore 会在前缀目标写入时维护 Set 索引,当前缀完成过一次全量刷新后写入可信标记;之后 DeleteByPrefix 会优先遍历索引集合中的成员并 UNLINK ,不再扫描全库。索引未可信、索引过期、或自定义 Store 未实现 PrefixIndexStore 时,会自动降级到 SCAN + UNLINK

  • WithPrefixKeyIndex(true):默认启用前缀 key 索引;想完全关闭索引元信息时可显式传 false
  • WithPrefixKeyIndexTTL(30*24*time.Hour):控制索引集合和可信标记保留时间;建议不短于业务缓存最长 TTL。
  • WithScanCount(1000~5000):索引未命中降级时减少 SCAN 网络往返,数值越大单轮 Redis 主线程工作越重。
  • NewRedisStore(..., WithScanUnlinkConcurrency(2~4)):降级扫描时让扫描到的 key 批次并发 UNLINK,重叠扫描与删除耗时。
  • WithUnlinkChunkSize(500~1000):控制每个 Pipeline 的 UNLINK 数量,也会影响索引成员写入/删除的分批大小。
  • WithPrefixDeleteConcurrency(2~4):降级扫描时让业务 key 与内部元信息 pattern 并发扫描;高峰期建议保持 1 或 2,后台清理窗口可适当提高。

刷新写回性能方面,RedisStore 会自动走 MutationStore 快路径:同一次刷新里的旧空值元信息清理、业务 key 写入、空集合元信息写入、前缀索引成员维护会按阶段合并为有界 Pipeline。含 Entry.Overwrite=false 的增量写不会自动重试,避免 List/Set/ZSet 增量写在网络抖动后被重复放大。

这样同一套缓存目标可以被管理页、业务读取 miss 重建、定时预热任务复用。

gozero-admin 接入示例

推荐做法是把 go-utils 的日志初始化放到项目入口,只配置一次,然后缓存管理器直接复用:

package main

import (
	"log/slog"
	"os"
)

func initLogger() {
	handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
		Level: slog.LevelInfo,
	})
	slog.SetDefault(slog.New(handler))
}
package cacheexample

import (
	"context"
	"strconv"
	"time"

	utils "github.com/Is999/go-utils"
	"cron_admin/pkg/tablecache"

	"github.com/redis/go-redis/v9"
)

// User 表示业务用户缓存示例结构。
type User struct {
	ID   int    `json:"id"`   // ID 是用户 ID
	Name string `json:"name"` // Name 是用户名称
}

// UserModel 表示示例数据库访问对象。
type UserModel interface {
	FindOne(ctx context.Context, id int64) (*User, error)
}

// UserCache 定义 gozero-admin 中可复用的用户缓存组件。
type UserCache struct {
	manager *tablecache.Manager // manager 是用户资料缓存管理器
	model   UserModel           // model 是数据库查询适配器
}

// NewUserCache 创建用户缓存管理器。
func NewUserCache(client redis.UniversalClient, model UserModel) (*UserCache, error) {
	store := tablecache.NewRedisStore(
		client,
		tablecache.WithUnlinkChunkSize(1000),
		tablecache.WithScanUnlinkConcurrency(2),
	)
	metrics, err := tablecache.NewPrometheusMetrics(
		tablecache.WithPrometheusNamespace("gozero_admin"),
		tablecache.WithPrometheusSubsystem("tablecache"),
	)
	if err != nil {
		return nil, err
	}
	manager, err := tablecache.NewManager(
		store,
		[]tablecache.Target{
			{
				Index:            "user_profile",
				Title:            "用户资料",
				Key:              "user_profile:",
				KeyTitle:         "user_profile:{userID}",
				Type:             tablecache.TypeString,
				TTL:              time.Hour,
				Jitter:           10 * time.Minute,
				AllowEmptyMarker: true,
				Loader: func(ctx context.Context, params tablecache.LoadParams) ([]tablecache.Entry, error) {
					if len(params.KeyParts) == 0 || model == nil {
						return nil, tablecache.ErrNotFound
					}
					userID, err := strconv.ParseInt(params.KeyParts[0], 10, 64)
					if err != nil {
						return nil, err
					}
					user, err := model.FindOne(ctx, userID)
					if err != nil {
						return nil, err
					}
					if user == nil {
						return nil, tablecache.ErrNotFound
					}
					return []tablecache.Entry{{
						Key:   params.Key,
						Type:  tablecache.TypeString,
						Value: user,
					}}, nil
				},
			},
		},
		tablecache.WithKeyPrefix("gozero_admin:cache:"),
		tablecache.WithPrefixKeyIndex(true),
		tablecache.WithPrefixKeyIndexTTL(30*24*time.Hour),
		tablecache.WithScanCount(2000),
		tablecache.WithPrefixDeleteConcurrency(2),
		tablecache.WithMetrics(metrics),
		tablecache.WithGoUtilsLogger(utils.Log()),
	)
	if err != nil {
		return nil, err
	}
	return &UserCache{
		manager: manager,
		model:   model,
	}, nil
}

// key 根据用户 ID 生成已注册的缓存 key。
func (c *UserCache) key(userID int64) string {
	return "gozero_admin:cache:user_profile:" + strconv.FormatInt(userID, 10)
}

// Manager 返回底层缓存管理器,供管理页和任务调度复用。
func (c *UserCache) Manager() *tablecache.Manager {
	return c.manager
}

// GetForBiz 在业务读取链路中直接执行读穿缓存。
func (c *UserCache) GetForBiz(ctx context.Context, userID int64) (*User, error) {
	var user User
	result, err := c.manager.LoadThrough(ctx, c.key(userID), &user, nil)
	if err != nil {
		return nil, err
	}
	if result.State == tablecache.LookupStateHit {
		return &user, nil
	}
	if result.State == tablecache.LookupStateEmpty {
		return nil, nil
	}
	return nil, nil
}

// GetForBizWithOptions 演示完整读穿参数的接入方式,便于业务在局部场景透传 fields 并自定义回源逻辑。
func (c *UserCache) GetForBizWithOptions(ctx context.Context, userID int64, fields []string) (*User, error) {
	var user User
	result, err := c.manager.LoadThroughWithOptions(ctx, c.key(userID), &user, tablecache.LoadThroughOptions{
		Fields: fields,
		Loader: func(ctx context.Context, params tablecache.LoadParams) ([]tablecache.Entry, error) {
			if c.model == nil {
				return nil, tablecache.ErrNotFound
			}
			dbUser, err := c.model.FindOne(ctx, userID)
			if err != nil {
				return nil, err
			}
			if dbUser == nil {
				return nil, tablecache.ErrNotFound
			}
			return []tablecache.Entry{{
				Key:   params.Key,
				Type:  tablecache.TypeString,
				Value: dbUser,
			}}, nil
		},
	})
	if err != nil {
		return nil, err
	}
	if result.State == tablecache.LookupStateHit {
		return &user, nil
	}
	return nil, nil
}

// RefreshFromAdmin 表示 gozero-admin 管理页“手动刷新单个缓存”的链路。
func (c *UserCache) RefreshFromAdmin(ctx context.Context, userID int64) error {
	return c.manager.RefreshByKey(ctx, c.key(userID))
}

// WarmupByTask 表示定时预热任务链路,支持批量预热多个注册 key。
func (c *UserCache) WarmupByTask(ctx context.Context, userIDs []int64) error {
	keys := make([]string, 0, len(userIDs))
	for _, userID := range userIDs {
		keys = append(keys, c.key(userID))
	}
	return c.manager.RefreshByKeys(ctx, keys)
}

// WarmupByTaskWithSummary 表示预热任务链路可直接返回标准批量刷新响应。
func (c *UserCache) WarmupByTaskWithSummary(ctx context.Context, userIDs []int64) (tablecache.RefreshBatchAdminResponse, error) {
	keys := make([]string, 0, len(userIDs))
	for _, userID := range userIDs {
		keys = append(keys, c.key(userID))
	}
	results, summary, err := c.manager.RefreshByKeysWithSummary(ctx, keys)
	return tablecache.BuildRefreshBatchAdminResponseWithSummary(results, summary, err), nil
}

// RefreshAllFromAdmin 表示管理页“全量刷新所有允许全量刷新的目标”时可直接返回标准响应。
func (c *UserCache) RefreshAllFromAdmin(ctx context.Context) (tablecache.RefreshBatchAdminResponse, error) {
	results, summary, err := c.manager.RefreshAllWithSummary(ctx)
	return tablecache.BuildRefreshBatchAdminResponseWithSummary(results, summary, err), nil
}

// LoadThroughBatchForAdmin 表示管理页批量读取缓存状态时可直接返回的标准 JSON 结构。
func (c *UserCache) LoadThroughBatchForAdmin(ctx context.Context, userIDs []int64) (tablecache.LoadThroughBatchAdminResponse, error) {
	items := make([]tablecache.LoadThroughItem, 0, len(userIDs))
	users := make([]User, len(userIDs))
	for index, userID := range userIDs {
		items = append(items, tablecache.LoadThroughItem{
			Key:  c.key(userID),
			Dest: &users[index],
		})
	}
	results, summary, err := c.manager.LoadThroughBatchWithSummaryOptions(ctx, items, tablecache.LoadThroughBatchOptions{
		Concurrency: 4,
		DefaultOptions: tablecache.LoadThroughOptions{
			LoaderTimeout:    300 * time.Millisecond,
			AllowEmptyMarker: tablecache.Bool(true),
		},
	})
	return tablecache.BuildLoadThroughBatchAdminResponseWithSummary(results, summary, err), nil
}

gozero-admin 可复制模板

如果你希望直接按 gozero-admin 常见分层落地,可以按下面这套方式组织:

  1. main 或启动入口只做一次 slog.SetDefault(...);如果项目已自行实现 utils.Logger,也可以在这里调用 utils.Configure(utils.WithLogger(customLogger))
  2. svc.ServiceContext 统一创建 redis.UniversalClient 和业务缓存组件,并把 manager.Items() 缓存成管理页列表。
  3. logicservice 层封装业务读穿、管理页刷新、批量预热。
  4. handler 只负责参数解析并直接返回 BuildRefreshBatchAdminResponse... / BuildLoadThroughBatchAdminResponse... 生成的标准 JSON。

svc/servicecontext.go 示例:

package svc

import (
	"your_project/internal/cache"
	"your_project/internal/config"

	"cron_admin/pkg/tablecache"
	"github.com/redis/go-redis/v9"
)

// ServiceContext 汇总项目运行时依赖。
type ServiceContext struct {
	Config    config.Config         // Config 是项目配置
	Redis     redis.UniversalClient // Redis 是全局 Redis 客户端
	Cache     *cache.UserCache      // Cache 是业务缓存组件
	TableList []tablecache.Item     // TableList 是缓存管理页展示列表
}

// NewServiceContext 创建服务上下文并集中初始化缓存组件。
func NewServiceContext(c config.Config) (*ServiceContext, error) {
	client := redis.NewUniversalClient(&redis.UniversalOptions{
		Addrs:    c.Redis.Addrs,
		Password: c.Redis.Password,
		DB:       c.Redis.DB,
	})
	userCache, err := cache.NewUserCache(client, nil)
	if err != nil {
		return nil, err
	}
	return &ServiceContext{
		Config:    c,
		Redis:     client,
		Cache:     userCache,
		TableList: userCache.Manager().Items(),
	}, nil
}

logicservice 层示例:

package logic

import (
	"context"

	"cron_admin/pkg/tablecache"
	"your_project/internal/svc"
)

// CacheLogic 封装缓存管理页和业务侧共用逻辑。
type CacheLogic struct {
	svcCtx *svc.ServiceContext // svcCtx 是服务上下文
}

// NewCacheLogic 创建缓存逻辑对象。
func NewCacheLogic(svcCtx *svc.ServiceContext) *CacheLogic {
	return &CacheLogic{svcCtx: svcCtx}
}

// ListItems 返回管理页需要展示的缓存目标列表。
func (l *CacheLogic) ListItems(ctx context.Context) []tablecache.Item {
	_ = ctx
	return l.svcCtx.TableList
}

// RefreshUserFromAdmin 处理管理页“刷新单个用户缓存”动作。
func (l *CacheLogic) RefreshUserFromAdmin(ctx context.Context, userID int64) error {
	return l.svcCtx.Cache.RefreshFromAdmin(ctx, userID)
}

// WarmupUsers 处理定时任务或后台批量预热,并直接返回标准 JSON 结构。
func (l *CacheLogic) WarmupUsers(ctx context.Context, userIDs []int64) tablecache.RefreshBatchAdminResponse {
	response, err := l.svcCtx.Cache.WarmupByTaskWithSummary(ctx, userIDs)
	if err != nil {
		return tablecache.RefreshBatchAdminResponse{
			Success: false,
			Message: err.Error(),
		}
	}
	return response
}

// ReadUsersForAdmin 处理管理页批量读穿演示,便于直接查看 hit/miss/empty/refreshed。
func (l *CacheLogic) ReadUsersForAdmin(ctx context.Context, userIDs []int64) tablecache.LoadThroughBatchAdminResponse {
	response, err := l.svcCtx.Cache.LoadThroughBatchForAdmin(ctx, userIDs)
	if err != nil {
		return tablecache.LoadThroughBatchAdminResponse{
			Success: false,
			Message: err.Error(),
		}
	}
	return response
}

handler 示例:

package handler

import (
	"net/http"

	"github.com/gin-gonic/gin"
	"your_project/internal/logic"
	"your_project/internal/svc"
)

// RegisterCacheRoutes 注册缓存管理相关接口。
func RegisterCacheRoutes(router *gin.RouterGroup, svcCtx *svc.ServiceContext) {
	cacheLogic := logic.NewCacheLogic(svcCtx)
	router.GET("/tablecache/items", func(c *gin.Context) {
		c.JSON(http.StatusOK, gin.H{
			"success": true,
			"items":   cacheLogic.ListItems(c.Request.Context()),
		})
	})
	router.POST("/tablecache/user/refresh", func(c *gin.Context) {
		var request struct {
			UserID int64 `json:"userId"` // UserID 是待刷新的用户 ID
		}
		if err := c.ShouldBindJSON(&request); err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"success": false, "message": err.Error()})
			return
		}
		if err := cacheLogic.RefreshUserFromAdmin(c.Request.Context(), request.UserID); err != nil {
			c.JSON(http.StatusOK, gin.H{"success": false, "message": err.Error()})
			return
		}
		c.JSON(http.StatusOK, gin.H{"success": true, "message": "刷新成功"})
	})
	router.POST("/tablecache/user/warmup", func(c *gin.Context) {
		var request struct {
			UserIDs []int64 `json:"userIds"` // UserIDs 是待预热的用户 ID 列表
		}
		if err := c.ShouldBindJSON(&request); err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"success": false, "message": err.Error()})
			return
		}
		c.JSON(http.StatusOK, cacheLogic.WarmupUsers(c.Request.Context(), request.UserIDs))
	})
	router.POST("/tablecache/user/read", func(c *gin.Context) {
		var request struct {
			UserIDs []int64 `json:"userIds"` // UserIDs 是待读取的用户 ID 列表
		}
		if err := c.ShouldBindJSON(&request); err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"success": false, "message": err.Error()})
			return
		}
		c.JSON(http.StatusOK, cacheLogic.ReadUsersForAdmin(c.Request.Context(), request.UserIDs))
	})
}

推荐理解方式:

  • manager.Items() 负责给管理页渲染缓存目标清单。
  • RefreshByKey / RefreshByKeysWithSummary / RefreshAllWithSummary 负责刷新、预热和后台批处理。
  • LoadThrough / LoadThroughWithOptions / LoadThroughBatchWithSummaryOptions 负责业务读穿和管理页批量读缓存状态。
  • BuildRefreshBatchAdminResponseWithSummary(...)BuildLoadThroughBatchAdminResponseWithSummary(...) 负责把执行结果直接转成后台接口标准 JSON。

三条链路怎么串

  1. 管理页刷新链路: gozero-admin 的缓存管理页拿到 manager.Items() 渲染列表,点击“刷新用户资料缓存”时调用 RefreshFromAdmin(ctx, userID),最终进入 RefreshByKey,只允许刷新已注册 key。
  2. 业务读取链路: 业务接口直接调用 GetForBiz,内部走 LoadThrough,先读缓存,只有真实 miss 才回源、回填并返回;如果命中空值占位则直接返回空对象,避免反复穿透数据库。
  3. 预热任务链路: 定时任务、启动预热或后台批处理调用 WarmupByTask,通过 RefreshByKeys 对一组注册 key 做有限并发预热。

指标建议

项目已内置 PrometheusMetrics,直接实现了 MetricsExtendedMetricsLookupMetrics

metrics, err := tablecache.NewPrometheusMetrics(
	tablecache.WithPrometheusNamespace("gozero_admin"),
	tablecache.WithPrometheusSubsystem("tablecache"),
)
if err != nil {
	return err
}

manager, err := tablecache.NewManager(store, targets, tablecache.WithKeyPrefix("gozero_admin:cache:"), tablecache.WithMetrics(metrics))
if err != nil {
	return err
}
_ = manager
  • refresh_totalrefresh_duration_seconds 记录刷新总次数和耗时。
  • cache_hit_totalcache_miss_totallock_failed_total 记录基础命中与锁竞争情况。
  • loader_error_totalempty_marker_write_totalwait_timeout_totalprefix_delete_totalprefix_delete_keys_totalrefresh_entry_count 记录扩展运行指标。
  • prefix_wait_totalprefix_retry_total 可用于观察是否有大前缀全量刷新拖慢单 key 刷新。
  • refresh_batch_totalrefresh_batch_sizerefresh_batch_success_items_totalrefresh_batch_failed_items_total 可用于观察批量刷新和全量刷新任务的执行次数、任务规模与成功失败条目数。
  • lookup_state_total{state="hit|miss|empty"}lookup_refresh_triggered_total 记录读穿读取链路指标。
  • refresh_total{result="lock_lost"} 可直接观察锁续期失败导致的刷新中止次数。
  • WithPrometheusRegisterer(...) 支持注入独立注册器,适合测试、多实例组件或避免全局注册器重复注册冲突。

日志建议

接入 WithLogger(...) 后,tablecache 会输出统一字段风格日志,便于 gozero-admin 在 Loki、ELK、Datadog 中直接检索:

  • event="lock_lost":锁续期失败后刷新被中止。
  • event="prefix_wait":单 key 刷新被同前缀全量刷新阻塞。
  • event="prefix_retry":单 key 在写回前发现前缀全量刷新已开始,因此主动重试。
  • event="lock_renew_failed":底层锁续期失败,通常会紧接着出现 lock_lost
  • event="refresh_batch_done"event="refresh_all_done":批量刷新与全量刷新任务完成时输出汇总结果。
  • 推荐直接使用 WithGoUtilsLogger(utils.Log());如果更喜欢统一入口,也可以继续使用 WithLogger(utils.Log()),两种写法都兼容。
  • 如果业务入口已经通过 slog.SetDefault(...) 完成统一日志初始化,utils.Log() 默认会复用这套 slog 能力;如果项目已自行实现 utils.Logger,也可以继续走 utils.Configure(utils.WithLogger(customLogger)),同样无需额外 logxadapter

LoadThrough 参数建议

  • LoadThrough(ctx, key, dest, nil) 适合绝大多数标准读穿场景,直接复用目标上注册的 Loader
  • GetOrRefreshWithLoader 适合快速补一个局部临时回源逻辑,但参数能力相对精简。
  • LoadThroughWithOptions 适合生产场景做统一封装,可同时传入 Fields、临时 LoaderLoaderTimeoutAllowEmptyMarkerContextPolicy,便于 Hash 字段级刷新、定制回源策略、按调用收紧超时、临时关闭空值占位或忽略上游取消。
  • LoadThroughBatch 适合列表页、后台批处理或批量业务读场景;它会逐项返回结果,单条失败不会中断整批处理。
  • LoadThroughBatchWithBatchOptions 适合整批共享默认参数的场景,可统一设置批次并发度和默认 LoadThroughOptions,条目级参数仍可覆盖。
  • LoadThroughBatchWithSummary 适合任务调度、预热任务或管理页批处理;它会额外返回汇总信息和聚合错误,便于直接判断整批状态。
  • RefreshByKeysWithSummaryRefreshAllWithSummary 适合预热任务和管理页刷新接口;它们会返回逐项结果、汇总信息和聚合错误,便于直接构造标准 JSON 响应。

示例:

var user User
result, err := manager.LoadThroughWithOptions(ctx, "gozero_admin:cache:user_profile:1", &user, tablecache.LoadThroughOptions{
	Fields:           []string{"name", "status"},
	LoaderTimeout:    200 * time.Millisecond,
	AllowEmptyMarker: tablecache.Bool(false),
	ContextPolicy:    tablecache.RebuildPolicy(tablecache.RebuildContextIgnoreCancel),
})
if err != nil {
	return err
}
_ = result

批量示例:

var user1 User
var user2 User
results := manager.LoadThroughBatch(ctx, []tablecache.LoadThroughItem{
{Key: "gozero_admin:cache:user_profile:1", Dest: &user1},
{Key: "gozero_admin:cache:user_profile:2", Dest: &user2},
})
for _, result := range results {
	if result.Error != nil {
		continue
	}
	_ = result.LookupResult
}

批次级默认参数示例:

results := manager.LoadThroughBatchWithBatchOptions(ctx, []tablecache.LoadThroughItem{
{Key: "gozero_admin:cache:user_profile:1", Dest: &user1},
{Key: "gozero_admin:cache:user_profile:2", Dest: &user2},
}, tablecache.LoadThroughBatchOptions{
	Concurrency: 4,
	DefaultOptions: tablecache.LoadThroughOptions{
		LoaderTimeout:    200 * time.Millisecond,
		AllowEmptyMarker: tablecache.Bool(true),
		ContextPolicy:    tablecache.RebuildPolicy(tablecache.RebuildContextIgnoreCancel),
	},
})
_ = results

汇总示例:

results, summary, err := manager.LoadThroughBatchWithSummary(ctx, []tablecache.LoadThroughItem{
{Key: "gozero_admin:cache:user_profile:1", Dest: &user1},
{Key: "gozero_admin:cache:user_profile:2", Dest: &user2},
})
if err != nil {
	return err
}
_ = results
_ = summary

管理页标准 JSON 示例:

response, err := userCache.LoadThroughBatchForAdmin(ctx, []int64{1, 2, 3})
if err != nil {
	return err
}
return response

批量刷新标准 JSON 示例:

response, err := userCache.WarmupByTaskWithSummary(ctx, []int64{1, 2, 3})
if err != nil {
	return err
}
return response

全量刷新标准 JSON 示例:

response, err := userCache.RefreshAllFromAdmin(ctx)
if err != nil {
	return err
}
return response

Documentation

Index

Constants

View Source
const (
	// DefaultEmptyMarker 表示空值缓存占位符,用于缓存穿透保护。
	DefaultEmptyMarker = "__empty__"
)

Variables

View Source
var (
	// ErrTargetNotFound 表示没有找到匹配的缓存目标配置。
	ErrTargetNotFound = errors.New("tablecache target not found")
	// ErrLoaderRequired 表示缓存目标缺少数据加载函数。
	ErrLoaderRequired = errors.New("tablecache loader required")
	// ErrWaitRebuildTimeout 表示等待其它实例重建缓存超时。
	ErrWaitRebuildTimeout = errors.New("tablecache wait rebuild timeout")
	// ErrRefreshLockLost 表示当前实例在刷新过程中失去重建锁所有权。
	ErrRefreshLockLost = errors.New("tablecache refresh lock lost")
	// ErrCacheMiss 表示缓存未命中。
	ErrCacheMiss = errors.New("tablecache cache miss")
	// ErrNotFound 表示加载器确认源数据不存在,可触发空值占位。
	ErrNotFound = errors.New("tablecache data not found")
	// ErrKeyPrefixRequired 表示刷新、删除或写回操作必须使用已带 Manager 指定前缀的实际 Redis key。
	ErrKeyPrefixRequired = errors.New("tablecache key prefix required")
	// ErrInvalidClusterHashTag 表示 key 中的花括号无法安全生成 Redis Cluster 同槽元信息 key。
	ErrInvalidClusterHashTag = errors.New("tablecache invalid redis cluster hash tag key")
	// ErrEntryKeyOutOfScope 表示加载器返回了不属于当前缓存目标管理范围的写入 key。
	ErrEntryKeyOutOfScope = errors.New("tablecache entry key out of target scope")
)

Functions

func Bool

func Bool(value bool) *bool

Bool 返回布尔指针,便于 Entry.Overwrite 显式设置 false。

Types

type CacheType

type CacheType string

CacheType 表示 Redis 目标数据结构类型。

const (
	// TypeString 表示 Redis String 缓存类型。
	TypeString CacheType = "string"
	// TypeHash 表示 Redis Hash 缓存类型。
	TypeHash CacheType = "hash"
	// TypeList 表示 Redis List 缓存类型。
	TypeList CacheType = "list"
	// TypeSet 表示 Redis Set 缓存类型。
	TypeSet CacheType = "set"
	// TypeZSet 表示 Redis ZSet 缓存类型。
	TypeZSet CacheType = "zset"
)

type Decoder

type Decoder func(data []byte, dest any) error

Decoder 定义缓存读取后的反序列化函数,默认使用 encoding/json。

type Encoder

type Encoder func(value any) (string, error)

Encoder 定义复杂缓存值的序列化函数,默认使用 encoding/json。

type Entry

type Entry struct {
	Key       string        // Key 是最终写入的 Redis key
	Type      CacheType     // Type 是最终写入的 Redis 数据结构类型
	Value     any           // Value 是写入值,类型由 Type 决定
	TTL       time.Duration // TTL 是当前 key 的基础过期时间
	Jitter    time.Duration // Jitter 是当前 key 的过期时间抖动范围
	Overwrite *bool         // Overwrite 控制写入前是否先删除旧 key;nil 表示默认覆盖写入
}

Entry 表示一次写入 Redis 的标准缓存数据。

type ExistsMultiStore

type ExistsMultiStore interface {
	// ExistsMulti 批量判断 key 是否存在,返回 map[key]exists。
	ExistsMulti(ctx context.Context, keys ...string) (map[string]bool, error)
}

ExistsMultiStore 表示可选的批量 Exists 能力。 Manager 在等待其它实例重建时会优先使用该能力,以减少多次 Exists 带来的网络往返。

type ExtendedMetrics

type ExtendedMetrics interface {
	Metrics
	// RecordLoaderError 记录加载器回源错误次数与错误信息。
	RecordLoaderError(ctx context.Context, index string, err error)
	// RecordEmptyMarkerWrite 记录空值占位写入次数。
	RecordEmptyMarkerWrite(ctx context.Context, index string)
	// RecordWaitTimeout 记录等待其它实例重建超时次数。
	RecordWaitTimeout(ctx context.Context, index string)
	// RecordPrefixWait 记录前缀全量刷新阻塞单 key 刷新的次数。
	RecordPrefixWait(ctx context.Context, index string)
	// RecordPrefixRetry 记录单 key 刷新因前缀全量刷新而重试的次数。
	RecordPrefixRetry(ctx context.Context, index string)
	// RecordPrefixDelete 记录前缀删除次数与删除 key 数量。
	RecordPrefixDelete(ctx context.Context, index string, prefix string, count int64)
	// RecordRefreshEntryCount 记录单次刷新写入条数。
	RecordRefreshEntryCount(ctx context.Context, index string, count int)
}

ExtendedMetrics 定义可选的细分指标接口;实现后 Manager 会自动记录更丰富的生产排障指标。

type Item

type Item struct {
	Index    string `json:"index"`    // Index 是缓存目标索引
	Key      string `json:"key"`      // Key 是 Redis key 或 key 模板
	KeyTitle string `json:"keyTitle"` // KeyTitle 是 Redis key 展示标题
	Type     string `json:"type"`     // Type 是 Redis 数据类型
	Remark   string `json:"remark"`   // Remark 是缓存说明
}

Item 表示缓存管理页可展示的缓存目标。

type LoadParams

type LoadParams struct {
	Target   Target   // Target 是当前命中的缓存目标配置
	Key      string   // Key 是本次请求刷新或读取的 Redis key
	Index    string   // Index 是缓存目标索引
	KeyParts []string // KeyParts 是去掉目标前缀后的 key 片段
	Fields   []string // Fields 是 Hash 等二级索引字段,用于局部刷新
}

LoadParams 表示缓存目标加载时的上下文参数。

type LoadThroughBatchAdminItem

type LoadThroughBatchAdminItem struct {
	Key       string      `json:"key"`             // Key 是当前批量项的缓存 key
	Success   bool        `json:"success"`         // Success 表示当前批量项是否执行成功
	State     LookupState `json:"state"`           // State 是当前批量项最终命中状态
	Refreshed bool        `json:"refreshed"`       // Refreshed 表示当前批量项是否执行了回源刷新
	Error     string      `json:"error,omitempty"` // Error 是当前批量项的错误描述
}

LoadThroughBatchAdminItem 表示管理页可直接返回的单条批量读穿结果。

type LoadThroughBatchAdminResponse

type LoadThroughBatchAdminResponse struct {
	Success bool                        `json:"success"` // Success 表示整批是否全部成功
	Message string                      `json:"message"` // Message 是面向管理页展示的批量执行说明
	Summary LoadThroughBatchSummary     `json:"summary"` // Summary 是当前批量结果的汇总统计
	Items   []LoadThroughBatchAdminItem `json:"items"`   // Items 是当前批量每一项的明细结果
}

LoadThroughBatchAdminResponse 表示管理页批量读穿接口可直接返回的标准 JSON 结构。

func BuildLoadThroughBatchAdminResponse

func BuildLoadThroughBatchAdminResponse(results []LoadThroughBatchResult) LoadThroughBatchAdminResponse

BuildLoadThroughBatchAdminResponse 根据批量结果构建管理页标准响应结构。

func BuildLoadThroughBatchAdminResponseWithSummary

func BuildLoadThroughBatchAdminResponseWithSummary(results []LoadThroughBatchResult, summary LoadThroughBatchSummary, err error) LoadThroughBatchAdminResponse

BuildLoadThroughBatchAdminResponseWithSummary 根据批量结果、汇总信息和聚合错误构建管理页标准响应结构。

type LoadThroughBatchError

type LoadThroughBatchError struct {
	Summary LoadThroughBatchSummary // Summary 是当前批量结果的汇总信息
}

LoadThroughBatchError 表示批量读穿存在失败项的聚合错误。

func (*LoadThroughBatchError) Error

func (e *LoadThroughBatchError) Error() string

Error 返回批量读穿聚合错误描述。

type LoadThroughBatchOptions

type LoadThroughBatchOptions struct {
	Concurrency    int                // Concurrency 是本批次读穿的并发度,0 表示沿用 Manager 默认配置
	DefaultOptions LoadThroughOptions // DefaultOptions 是本批次所有条目的默认读穿参数,条目级 Options 优先级更高
}

LoadThroughBatchOptions 表示一批读穿请求的批次级配置。

type LoadThroughBatchResult

type LoadThroughBatchResult struct {
	Key          string       `json:"key"`          // Key 是当前批量项的缓存 key
	LookupResult LookupResult `json:"lookupResult"` // LookupResult 是当前批量项的读取结果
	Error        error        `json:"-"`            // Error 是当前批量项的执行错误
}

LoadThroughBatchResult 表示单条批量读穿请求的执行结果。

type LoadThroughBatchSummary

type LoadThroughBatchSummary struct {
	Total      int      `json:"total"`      // Total 是批量请求总数
	Success    int      `json:"success"`    // Success 是执行成功的条数
	Failed     int      `json:"failed"`     // Failed 是执行失败的条数
	Hit        int      `json:"hit"`        // Hit 是最终命中缓存数据的条数
	Miss       int      `json:"miss"`       // Miss 是最终仍未命中的条数
	Empty      int      `json:"empty"`      // Empty 是命中空值占位的条数
	Refreshed  int      `json:"refreshed"`  // Refreshed 是执行过程中触发回源刷新的条数
	FailedKeys []string `json:"failedKeys"` // FailedKeys 是执行失败的 key 列表
}

LoadThroughBatchSummary 表示一批读穿请求的汇总结果。

func SummarizeLoadThroughBatchResults

func SummarizeLoadThroughBatchResults(results []LoadThroughBatchResult) LoadThroughBatchSummary

SummarizeLoadThroughBatchResults 汇总批量读穿结果,便于任务调度和批处理统一判断。

func (LoadThroughBatchSummary) HasError

func (s LoadThroughBatchSummary) HasError() bool

HasError 返回当前批量汇总是否包含失败项。

type LoadThroughItem

type LoadThroughItem struct {
	Key     string             // Key 是本次批量读穿的目标缓存 key
	Dest    any                // Dest 是当前 key 的反序列化目标对象
	Options LoadThroughOptions // Options 是当前 key 的读穿可选参数
}

LoadThroughItem 表示一条批量读穿请求。

type LoadThroughOptions

type LoadThroughOptions struct {
	Fields           []string              // Fields 是本次读穿时透传给 Loader 的字段列表,适用于 Hash 等局部刷新场景
	Loader           Loader                // Loader 是本次调用临时覆盖的回源加载器,nil 表示继续使用 Target.Loader
	LoaderTimeout    time.Duration         // LoaderTimeout 是本次读穿回源超时时间,优先级高于 Target.LoaderTimeout
	AllowEmptyMarker *bool                 // AllowEmptyMarker 控制本次 miss 回源是否允许写入空值占位,nil 表示沿用目标配置
	ContextPolicy    *RebuildContextPolicy // ContextPolicy 控制本次读穿是否忽略调用方取消信号,nil 表示沿用管理器配置
}

LoadThroughOptions 表示单次读穿缓存调用的可选参数。

type Loader

type Loader func(ctx context.Context, params LoadParams) ([]Entry, error)

Loader 表示缓存目标的数据加载函数,由业务项目自行适配数据库、RPC 或其它数据源。

type Logger

type Logger interface {
	// Debugf 输出调试日志。
	Debugf(format string, args ...any)
	// Infof 输出信息日志。
	Infof(format string, args ...any)
	// Warnf 输出警告日志。
	Warnf(format string, args ...any)
	// Errorf 输出错误日志。
	Errorf(format string, args ...any)
}

Logger 定义可选日志接口,兼容传统 printf 风格日志实现。 若业务已统一使用 go-utils.Logger,也可直接通过 WithLogger 传入。

type LookupMetrics

type LookupMetrics interface {
	Metrics
	// RecordLookupState 记录读取最终状态(hit/miss/empty)。
	RecordLookupState(ctx context.Context, index string, state LookupState)
	// RecordLookupRefreshTriggered 记录 GetOrRefresh/LoadThrough 在 miss 后触发回源刷新的次数。
	RecordLookupRefreshTriggered(ctx context.Context, index string)
}

LookupMetrics 定义读取链路细分指标接口,用于区分 hit/miss/empty 与读穿刷新触发次数。

type LookupResult

type LookupResult struct {
	State     LookupState `json:"state"`     // State 是本次读取最终状态
	Refreshed bool        `json:"refreshed"` // Refreshed 表示本次流程中是否执行了回源刷新
}

LookupResult 表示一次缓存读取或“读取后按需刷新”的结果。

type LookupState

type LookupState string

LookupState 表示缓存读取状态。

const (
	// LookupStateHit 表示缓存命中且已成功写入目标对象。
	LookupStateHit LookupState = "hit"
	// LookupStateMiss 表示缓存未命中,且当前没有空值占位。
	LookupStateMiss LookupState = "miss"
	// LookupStateEmpty 表示缓存已确认源数据为空,命中了空值占位。
	LookupStateEmpty LookupState = "empty"
)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager 管理一组表数据缓存目标,负责刷新、锁保护、TTL 抖动与空值占位。

func NewManager

func NewManager(store Store, targets []Target, opts ...Option) (*Manager, error)

NewManager 创建通用表缓存管理器。

func (*Manager) DeleteByKey

func (m *Manager) DeleteByKey(ctx context.Context, key string) error

DeleteByKey 删除指定 Redis key;前缀删除请使用 DeleteByPrefix。

func (*Manager) DeleteByPrefix

func (m *Manager) DeleteByPrefix(ctx context.Context, prefix string) error

DeleteByPrefix 删除指定 Redis key 前缀下的所有缓存。

func (*Manager) Get

func (m *Manager) Get(ctx context.Context, key string, dest any) (bool, error)

Get 从缓存读取指定 key,并把缓存值反序列化到 dest。

func (*Manager) GetOrRefresh

func (m *Manager) GetOrRefresh(ctx context.Context, key string, dest any) (LookupResult, error)

GetOrRefresh 先读取缓存;仅当真实未命中时触发刷新,命中空值占位时直接返回 empty。

func (*Manager) GetOrRefreshWithLoader

func (m *Manager) GetOrRefreshWithLoader(ctx context.Context, key string, dest any, loader Loader) (LookupResult, error)

GetOrRefreshWithLoader 先读取缓存;miss 时使用本次传入的 Loader 回源、回填并返回最新结果。

func (*Manager) GetOrRefreshWithOptions

func (m *Manager) GetOrRefreshWithOptions(ctx context.Context, key string, dest any, options LoadThroughOptions) (LookupResult, error)

GetOrRefreshWithOptions 先读取缓存;miss 时按本次 options 配置执行回源、回填并返回最新结果。

func (*Manager) GetState

func (m *Manager) GetState(ctx context.Context, key string, dest any) (LookupResult, error)

GetState 从缓存读取指定 key,并返回命中、空值占位或未命中的明确状态。

func (*Manager) IsFixedKey

func (m *Manager) IsFixedKey(key string) bool

IsFixedKey 判断 key 是否为固定内置缓存 key,模板和前缀型 key 不纳入固定 key。

func (*Manager) Items

func (m *Manager) Items() []Item

Items 返回缓存管理页展示的目标列表。

func (*Manager) LoadThrough

func (m *Manager) LoadThrough(ctx context.Context, key string, dest any, loader Loader) (LookupResult, error)

LoadThrough 提供更贴近业务侧语义的读穿缓存入口,内部复用 GetOrRefreshWithLoader。

func (*Manager) LoadThroughBatch

func (m *Manager) LoadThroughBatch(ctx context.Context, items []LoadThroughItem) []LoadThroughBatchResult

LoadThroughBatch 批量执行读穿缓存;每个条目独立返回结果,单条失败不会中断整批处理。

func (*Manager) LoadThroughBatchWithBatchOptions

func (m *Manager) LoadThroughBatchWithBatchOptions(ctx context.Context, items []LoadThroughItem, options LoadThroughBatchOptions) []LoadThroughBatchResult

LoadThroughBatchWithBatchOptions 批量执行读穿缓存,并支持批次级默认参数和并发度覆盖。

func (*Manager) LoadThroughBatchWithSummary

func (m *Manager) LoadThroughBatchWithSummary(ctx context.Context, items []LoadThroughItem) ([]LoadThroughBatchResult, LoadThroughBatchSummary, error)

LoadThroughBatchWithSummary 批量执行读穿缓存,并返回汇总信息与聚合错误,便于任务调度快速判定整批状态。

func (*Manager) LoadThroughBatchWithSummaryOptions

func (m *Manager) LoadThroughBatchWithSummaryOptions(ctx context.Context, items []LoadThroughItem, options LoadThroughBatchOptions) ([]LoadThroughBatchResult, LoadThroughBatchSummary, error)

LoadThroughBatchWithSummaryOptions 批量执行读穿缓存,并支持批次级默认参数、并发覆盖和汇总结果返回。

func (*Manager) LoadThroughWithOptions

func (m *Manager) LoadThroughWithOptions(ctx context.Context, key string, dest any, options LoadThroughOptions) (LookupResult, error)

LoadThroughWithOptions 提供带可选参数的完整读穿缓存入口,适用于字段级局部刷新和临时覆盖 Loader。

func (*Manager) RefreshAll

func (m *Manager) RefreshAll(ctx context.Context) error

RefreshAll 刷新全部允许批量重建的缓存目标。

func (*Manager) RefreshAllDetailed

func (m *Manager) RefreshAllDetailed(ctx context.Context) []RefreshBatchResult

RefreshAllDetailed 刷新全部允许批量重建的缓存目标,并逐项返回执行结果。

func (*Manager) RefreshAllWithSummary

func (m *Manager) RefreshAllWithSummary(ctx context.Context) ([]RefreshBatchResult, RefreshBatchSummary, error)

RefreshAllWithSummary 刷新全部允许批量重建的缓存目标,并返回汇总信息与聚合错误。

func (*Manager) RefreshByKey

func (m *Manager) RefreshByKey(ctx context.Context, key string, fields ...string) error

RefreshByKey 根据 Redis key 刷新匹配的缓存目标。

func (*Manager) RefreshByKeys

func (m *Manager) RefreshByKeys(ctx context.Context, keys []string) error

RefreshByKeys 批量刷新多个缓存 key;同 key 会自动去重。

func (*Manager) RefreshByKeysDetailed

func (m *Manager) RefreshByKeysDetailed(ctx context.Context, keys []string) []RefreshBatchResult

RefreshByKeysDetailed 批量刷新多个缓存 key,并逐项返回执行结果。

func (*Manager) RefreshByKeysWithSummary

func (m *Manager) RefreshByKeysWithSummary(ctx context.Context, keys []string) ([]RefreshBatchResult, RefreshBatchSummary, error)

RefreshByKeysWithSummary 批量刷新多个缓存 key,并返回汇总信息与聚合错误。

type Metrics

type Metrics interface {
	// RecordRefresh 记录一次刷新执行的结果与耗时。
	RecordRefresh(ctx context.Context, index string, result string, duration time.Duration)
	// RecordCacheHit 记录一次缓存命中次数。
	RecordCacheHit(ctx context.Context, index string)
	// RecordCacheMiss 记录一次缓存未命中次数。
	RecordCacheMiss(ctx context.Context, index string)
	// RecordLockFailed 记录一次分布式锁竞争失败次数。
	RecordLockFailed(ctx context.Context, index string)
}

Metrics 定义 tablecache 运行指标记录接口,可由 Prometheus、StatsD 或自定义埋点实现。

type MutationStore

type MutationStore interface {
	// ApplyMutation 按顺序提交一次缓存变更描述;实现方应先删除旧 key,再写入新 key,最后维护索引。
	ApplyMutation(ctx context.Context, mutation StoreMutation) error
}

MutationStore 定义合并缓存变更能力,用于把删除、写入和索引维护压缩进更少的 Redis Pipeline。 RedisStore 默认实现该接口;自定义 Store 未实现时 Manager 会拆回基础 Store 与 PrefixIndexStore 调用。

type Option

type Option func(*Manager)

Option 表示 Manager 的可选配置。

func WithDecoder

func WithDecoder(decoder Decoder) Option

WithDecoder 设置缓存读取反序列化函数。

func WithEmptyMarker

func WithEmptyMarker(marker string, ttl time.Duration) Option

WithEmptyMarker 设置空值缓存占位内容和默认过期时间。

func WithGoUtilsLogger

func WithGoUtilsLogger(logger utils.Logger) Option

WithGoUtilsLogger 设置 go-utils.Logger 日志适配器。 推荐在项目已统一使用 utils.Configure(utils.WithLogger(...)) 时直接传入 utils.Log()。

func WithKeyPrefix

func WithKeyPrefix(prefix string) Option

WithKeyPrefix 设置业务缓存 key 命名空间前缀,用于把 tablecache 管理的业务数据与其它 Redis key 隔离。 默认前缀为 "tablecache:data:";刷新、删除和写回只接受已带该前缀的实际 Redis key。

func WithLoaderTimeout

func WithLoaderTimeout(timeout time.Duration) Option

WithLoaderTimeout 设置默认 Loader 超时时间。

func WithLockRenewInterval

func WithLockRenewInterval(interval time.Duration) Option

WithLockRenewInterval 设置缓存重建锁续期间隔。

func WithLockTTL

func WithLockTTL(ttl time.Duration) Option

WithLockTTL 设置缓存重建锁持有时间。

func WithLogger

func WithLogger(logger any) Option

WithLogger 设置缓存管理器日志适配器。 该方法同时兼容 tablecache 自定义 Logger 与 go-utils.Logger。

func WithMetrics

func WithMetrics(metrics Metrics) Option

WithMetrics 设置缓存运行指标记录器。

func WithPrefixDeleteConcurrency

func WithPrefixDeleteConcurrency(concurrency int) Option

WithPrefixDeleteConcurrency 设置 DeleteByPrefix 同时扫描删除的 pattern 数。 默认 1 表示保守串行;大 keyspace 且可接受更高 Redis 瞬时压力时可提高到 2~4 来缩短清理耗时。

func WithPrefixEpochTTL

func WithPrefixEpochTTL(ttl time.Duration) Option

WithPrefixEpochTTL 设置前缀全量刷新代际元信息保留时间。

func WithPrefixKeyIndex

func WithPrefixKeyIndex(enabled bool) Option

WithPrefixKeyIndex 设置是否启用前缀 key 索引。 启用后 Manager 会在前缀目标写入时维护索引,并在 DeleteByPrefix 时优先按索引删除;Store 不支持索引时自动降级。

func WithPrefixKeyIndexTTL

func WithPrefixKeyIndexTTL(ttl time.Duration) Option

WithPrefixKeyIndexTTL 设置前缀 key 索引与可信标记的保留时间。 TTL 过短会让删除更容易降级到 SCAN;TTL 过长会保留更多可能已过期的成员,建议按业务缓存最长 TTL 配置。

func WithRebuildContextPolicy

func WithRebuildContextPolicy(policy RebuildContextPolicy) Option

WithRebuildContextPolicy 设置缓存重建上下文取消策略。

func WithRebuildResultTTL

func WithRebuildResultTTL(ttl time.Duration) Option

WithRebuildResultTTL 设置重建完成元信息保留时间。

func WithRefreshConcurrency

func WithRefreshConcurrency(concurrency int) Option

WithRefreshConcurrency 设置 RefreshAll 和 RefreshByKeys 的有限并发度。

func WithScanCount

func WithScanCount(count int64) Option

WithScanCount 设置前缀缓存清理时的 Redis SCAN count。

func WithWait

func WithWait(step time.Duration, times int) Option

WithWait 设置未抢到锁时等待其它实例重建缓存的轮询策略。

type PipelineExecError

type PipelineExecError struct {
	Operation  string   // Operation 表示本次 Pipeline 执行的操作名称(用于日志与排障归类)
	FailedKeys []string // FailedKeys 表示本次 Pipeline 中检测到失败的 key 列表(已排序去重)
	Cause      error    // Cause 是底层错误原因
}

PipelineExecError 表示 Pipeline 执行失败的增强错误信息。 在部分命令失败(例如 cluster 路由抖动、网络短暂错误)时可携带失败 key 列表,便于线上快速定位与重试。

func (*PipelineExecError) Error

func (e *PipelineExecError) Error() string

func (*PipelineExecError) Unwrap

func (e *PipelineExecError) Unwrap() error

type PrefixIndexMutation

type PrefixIndexMutation struct {
	IndexKey string        // IndexKey 是前缀目标对应的索引集合 Redis key
	TTL      time.Duration // TTL 是索引集合需要刷新的保留时间;移除成员时可为 0
	Keys     []string      // Keys 是需要加入或移除索引集合的真实 Redis key 成员列表
}

PrefixIndexMutation 描述一次前缀索引集合的成员变更。

type PrefixIndexStore

type PrefixIndexStore interface {
	// AddPrefixIndexKeys 把一批业务或元信息 key 写入指定前缀索引,并按 ttl 刷新索引过期时间。
	AddPrefixIndexKeys(ctx context.Context, indexKey string, ttl time.Duration, keys ...string) error
	// RemovePrefixIndexKeys 从指定前缀索引移除一批 key,用于精确删除或状态切换后清理过期成员。
	RemovePrefixIndexKeys(ctx context.Context, indexKey string, keys ...string) error
	// DeletePrefixIndexKeys 分批遍历索引集合中的成员 key,删除真实 Redis key 并清空索引集合。
	DeletePrefixIndexKeys(ctx context.Context, indexKey string, count int64) (int64, error)
}

PrefixIndexStore 定义前缀目标的 key 索引能力,用于大 keyspace 下绕开全库 SCAN。 RedisStore 默认实现该接口;自定义 Store 未实现时 Manager 会自动降级为 DeletePattern 扫描删除。

type PrometheusMetrics

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

PrometheusMetrics 提供 Metrics、ExtendedMetrics、LookupMetrics 的 Prometheus 实现。

func NewPrometheusMetrics

func NewPrometheusMetrics(opts ...PrometheusMetricsOption) (*PrometheusMetrics, error)

NewPrometheusMetrics 创建 Prometheus 指标适配器,默认兼容重复注册场景。

func (*PrometheusMetrics) RecordCacheHit

func (m *PrometheusMetrics) RecordCacheHit(ctx context.Context, index string)

RecordCacheHit 记录基础缓存命中次数。

func (*PrometheusMetrics) RecordCacheMiss

func (m *PrometheusMetrics) RecordCacheMiss(ctx context.Context, index string)

RecordCacheMiss 记录基础缓存未命中次数。

func (*PrometheusMetrics) RecordEmptyMarkerWrite

func (m *PrometheusMetrics) RecordEmptyMarkerWrite(ctx context.Context, index string)

RecordEmptyMarkerWrite 记录空值占位写入次数。

func (*PrometheusMetrics) RecordLoaderError

func (m *PrometheusMetrics) RecordLoaderError(ctx context.Context, index string, err error)

RecordLoaderError 记录 Loader 错误次数。

func (*PrometheusMetrics) RecordLockFailed

func (m *PrometheusMetrics) RecordLockFailed(ctx context.Context, index string)

RecordLockFailed 记录锁竞争失败次数。

func (*PrometheusMetrics) RecordLookupRefreshTriggered

func (m *PrometheusMetrics) RecordLookupRefreshTriggered(ctx context.Context, index string)

RecordLookupRefreshTriggered 记录读穿触发刷新次数。

func (*PrometheusMetrics) RecordLookupState

func (m *PrometheusMetrics) RecordLookupState(ctx context.Context, index string, state LookupState)

RecordLookupState 记录读取状态细分指标。

func (*PrometheusMetrics) RecordPrefixDelete

func (m *PrometheusMetrics) RecordPrefixDelete(ctx context.Context, index string, prefix string, count int64)

RecordPrefixDelete 记录前缀删除次数和删除 key 总数。

func (*PrometheusMetrics) RecordPrefixRetry

func (m *PrometheusMetrics) RecordPrefixRetry(ctx context.Context, index string)

RecordPrefixRetry 记录单 key 因前缀全量刷新重试的次数。

func (*PrometheusMetrics) RecordPrefixWait

func (m *PrometheusMetrics) RecordPrefixWait(ctx context.Context, index string)

RecordPrefixWait 记录前缀全量刷新阻塞单 key 的次数。

func (*PrometheusMetrics) RecordRefresh

func (m *PrometheusMetrics) RecordRefresh(ctx context.Context, index string, result string, duration time.Duration)

RecordRefresh 记录刷新次数和刷新耗时。

func (*PrometheusMetrics) RecordRefreshBatch

func (m *PrometheusMetrics) RecordRefreshBatch(ctx context.Context, mode string, result string, total int, success int, failed int)

RecordRefreshBatch 记录批量刷新与全量刷新任务的次数、规模和成功失败条目数。

func (*PrometheusMetrics) RecordRefreshEntryCount

func (m *PrometheusMetrics) RecordRefreshEntryCount(ctx context.Context, index string, count int)

RecordRefreshEntryCount 记录单次刷新写入条数。

func (*PrometheusMetrics) RecordWaitTimeout

func (m *PrometheusMetrics) RecordWaitTimeout(ctx context.Context, index string)

RecordWaitTimeout 记录等待重建超时次数。

type PrometheusMetricsConfig

type PrometheusMetricsConfig struct {
	Namespace           string                // Namespace 是 Prometheus 指标命名空间
	Subsystem           string                // Subsystem 是 Prometheus 指标子系统
	Registerer          prometheus.Registerer // Registerer 是指标注册器,nil 时默认使用全局注册器
	RefreshBuckets      []float64             // RefreshBuckets 是刷新耗时直方图桶
	RefreshEntryBuckets []float64             // RefreshEntryBuckets 是单次刷新写入条数直方图桶
}

PrometheusMetricsConfig 表示 Prometheus 指标适配器的构造配置。

type PrometheusMetricsOption

type PrometheusMetricsOption func(*PrometheusMetricsConfig)

PrometheusMetricsOption 表示 Prometheus 指标适配器的可选配置。

func WithPrometheusNamespace

func WithPrometheusNamespace(namespace string) PrometheusMetricsOption

WithPrometheusNamespace 设置 Prometheus 指标命名空间。

func WithPrometheusRefreshBuckets

func WithPrometheusRefreshBuckets(buckets []float64) PrometheusMetricsOption

WithPrometheusRefreshBuckets 设置刷新耗时直方图桶。

func WithPrometheusRefreshEntryBuckets

func WithPrometheusRefreshEntryBuckets(buckets []float64) PrometheusMetricsOption

WithPrometheusRefreshEntryBuckets 设置单次刷新写入条数直方图桶。

func WithPrometheusRegisterer

func WithPrometheusRegisterer(registerer prometheus.Registerer) PrometheusMetricsOption

WithPrometheusRegisterer 设置 Prometheus 指标注册器。

func WithPrometheusSubsystem

func WithPrometheusSubsystem(subsystem string) PrometheusMetricsOption

WithPrometheusSubsystem 设置 Prometheus 指标子系统。

type RebuildContextPolicy

type RebuildContextPolicy int

RebuildContextPolicy 表示缓存重建上下文的取消信号策略。

const (
	// RebuildContextInheritCancel 表示重建上下文继承调用方取消信号。
	RebuildContextInheritCancel RebuildContextPolicy = iota
	// RebuildContextIgnoreCancel 表示重建上下文保留调用方上下文值,但忽略调用方取消信号。
	RebuildContextIgnoreCancel
)

func RebuildPolicy

func RebuildPolicy(value RebuildContextPolicy) *RebuildContextPolicy

RebuildPolicy 返回重建上下文策略指针,便于 LoadThroughOptions 显式设置单次调用策略。

type RedisStore

type RedisStore struct {
	// contains filtered or unexported fields
}

RedisStore 是基于 go-redis 的 Store 适配器,可直接服务 go-zero、Gin、Kratos 等 Go 框架。

func NewRedisStore

func NewRedisStore(client redis.UniversalClient, opts ...RedisStoreOption) *RedisStore

NewRedisStore 创建 go-redis 存储适配器。

func (*RedisStore) AddPrefixIndexKeys

func (s *RedisStore) AddPrefixIndexKeys(ctx context.Context, indexKey string, ttl time.Duration, keys ...string) error

AddPrefixIndexKeys 把一批真实 Redis key 写入前缀索引集合,供 DeleteByPrefix 优先走索引删除。

func (*RedisStore) ApplyMutation

func (s *RedisStore) ApplyMutation(ctx context.Context, mutation StoreMutation) error

ApplyMutation 使用有界 Pipeline 合并提交删除、写入与前缀索引维护,减少刷新链路 Redis 往返。

func (*RedisStore) Delete

func (s *RedisStore) Delete(ctx context.Context, keys ...string) error

Delete 删除一个或多个 Redis key。

func (*RedisStore) DeletePattern

func (s *RedisStore) DeletePattern(ctx context.Context, pattern string, count int64) (int64, error)

DeletePattern 使用 SCAN 增量删除匹配 key,避免 KEYS 阻塞线上 Redis。

func (*RedisStore) DeletePrefixIndexKeys

func (s *RedisStore) DeletePrefixIndexKeys(ctx context.Context, indexKey string, count int64) (int64, error)

DeletePrefixIndexKeys 分批遍历前缀索引集合,并删除索引内记录的真实 Redis key。 该方法只读取索引集合本身,不扫描 Redis 全局 keyspace,适合 key 数特别多的线上实例。

func (*RedisStore) Exists

func (s *RedisStore) Exists(ctx context.Context, key string) (bool, error)

Exists 判断 Redis key 是否存在。

func (*RedisStore) ExistsMulti

func (s *RedisStore) ExistsMulti(ctx context.Context, keys ...string) (map[string]bool, error)

func (*RedisStore) Read

func (s *RedisStore) Read(ctx context.Context, key string, typ CacheType) (any, error)

Read 按缓存类型读取 Redis 原始值。

func (*RedisStore) RefreshLock

func (s *RedisStore) RefreshLock(ctx context.Context, key string, value string, ttl time.Duration) (bool, error)

RefreshLock 仅当锁值与持有者标识一致时续期锁。

func (*RedisStore) ReleaseLock

func (s *RedisStore) ReleaseLock(ctx context.Context, key string, value string) (bool, error)

ReleaseLock 仅当锁值与持有者标识一致时释放锁,避免误删其它实例刚抢到的锁。

func (*RedisStore) RemovePrefixIndexKeys

func (s *RedisStore) RemovePrefixIndexKeys(ctx context.Context, indexKey string, keys ...string) error

RemovePrefixIndexKeys 从前缀索引集合移除一批 key,避免 DeleteByKey 或状态切换后留下大量过期成员。

func (*RedisStore) SetNX

func (s *RedisStore) SetNX(ctx context.Context, key string, value any, ttl time.Duration) (bool, error)

SetNX 设置 Redis 分布式轻量锁。

func (*RedisStore) Write

func (s *RedisStore) Write(ctx context.Context, entry Entry) error

Write 按 Entry 类型写入 Redis,并统一处理 TTL 抖动。

func (*RedisStore) WriteBatch

func (s *RedisStore) WriteBatch(ctx context.Context, entries []Entry) error

WriteBatch 使用 Redis Pipeline 批量写入缓存,降低大量 Entry 重建时的网络往返。

type RedisStoreOption

type RedisStoreOption func(*RedisStore)

RedisStoreOption 表示 RedisStore 可选配置。

func WithPipelineRetries

func WithPipelineRetries(retries int) RedisStoreOption

WithPipelineRetries 设置批量删除与覆盖写的瞬时失败重试次数。

func WithRedisEncoder

func WithRedisEncoder(encoder Encoder) RedisStoreOption

WithRedisEncoder 设置 Redis 复杂值序列化函数。

func WithScanUnlinkConcurrency

func WithScanUnlinkConcurrency(concurrency int) RedisStoreOption

WithScanUnlinkConcurrency 设置 DeletePattern 在单个 Redis 节点上的并发删除 worker 数。 SCAN 游标本身必须顺序推进;该配置只把扫描到的 key 分发给多个 UNLINK worker,用于大 keyspace 下重叠扫描和删除耗时。

func WithUnlinkChunkSize

func WithUnlinkChunkSize(size int) RedisStoreOption

WithUnlinkChunkSize 设置批量删除时单个 Pipeline 的 UNLINK 命令数量。 较大的 chunk 可减少网络往返,较小的 chunk 可降低单次请求体积和 Redis 侧瞬时压力。

type RefreshBatchAdminItem

type RefreshBatchAdminItem struct {
	Key     string `json:"key"`             // Key 是当前批量项的缓存 key
	Success bool   `json:"success"`         // Success 表示当前批量项是否刷新成功
	Error   string `json:"error,omitempty"` // Error 是当前批量项的错误描述
}

RefreshBatchAdminItem 表示管理页可直接返回的单条批量刷新结果。

type RefreshBatchAdminResponse

type RefreshBatchAdminResponse struct {
	Success bool                    `json:"success"` // Success 表示整批是否全部成功
	Message string                  `json:"message"` // Message 是面向管理页展示的批量执行说明
	Summary RefreshBatchSummary     `json:"summary"` // Summary 是当前批量结果的汇总统计
	Items   []RefreshBatchAdminItem `json:"items"`   // Items 是当前批量每一项的明细结果
}

RefreshBatchAdminResponse 表示管理页批量刷新接口可直接返回的标准 JSON 结构。

func BuildRefreshBatchAdminResponse

func BuildRefreshBatchAdminResponse(results []RefreshBatchResult) RefreshBatchAdminResponse

BuildRefreshBatchAdminResponse 根据批量刷新结果构建管理页标准响应结构。

func BuildRefreshBatchAdminResponseWithSummary

func BuildRefreshBatchAdminResponseWithSummary(results []RefreshBatchResult, summary RefreshBatchSummary, err error) RefreshBatchAdminResponse

BuildRefreshBatchAdminResponseWithSummary 根据批量刷新结果、汇总信息和聚合错误构建管理页标准响应结构。

type RefreshBatchError

type RefreshBatchError struct {
	Summary RefreshBatchSummary // Summary 是当前批量刷新结果的汇总信息
}

RefreshBatchError 表示批量刷新存在失败项的聚合错误。

func (*RefreshBatchError) Error

func (e *RefreshBatchError) Error() string

Error 返回批量刷新聚合错误描述。

type RefreshBatchMetrics

type RefreshBatchMetrics interface {
	Metrics
	// RecordRefreshBatch 记录批量刷新/全量刷新任务的汇总信息。
	RecordRefreshBatch(ctx context.Context, mode string, result string, total int, success int, failed int)
}

RefreshBatchMetrics 定义批量刷新与全量刷新任务的汇总指标接口。

type RefreshBatchResult

type RefreshBatchResult struct {
	Key   string `json:"key"` // Key 是当前批量项的缓存 key
	Error error  `json:"-"`   // Error 是当前批量项的执行错误
}

RefreshBatchResult 表示单条批量刷新请求的执行结果。

type RefreshBatchSummary

type RefreshBatchSummary struct {
	Total      int      `json:"total"`      // Total 是批量请求总数
	Success    int      `json:"success"`    // Success 是执行成功的条数
	Failed     int      `json:"failed"`     // Failed 是执行失败的条数
	FailedKeys []string `json:"failedKeys"` // FailedKeys 是执行失败的 key 列表
}

RefreshBatchSummary 表示一批刷新请求的汇总结果。

func SummarizeRefreshBatchResults

func SummarizeRefreshBatchResults(results []RefreshBatchResult) RefreshBatchSummary

SummarizeRefreshBatchResults 汇总批量刷新结果,便于管理页和预热任务统一判断。

func (RefreshBatchSummary) HasError

func (s RefreshBatchSummary) HasError() bool

HasError 返回当前批量刷新汇总是否包含失败项。

type Store

type Store interface {
	// Delete 删除一个或多个缓存 key。
	Delete(ctx context.Context, keys ...string) error
	// DeletePattern 按 pattern 增量删除匹配 key,避免 KEYS 阻塞线上 Redis。
	DeletePattern(ctx context.Context, pattern string, count int64) (int64, error)
	// Exists 判断指定 key 是否存在。
	Exists(ctx context.Context, key string) (bool, error)
	// Read 按缓存类型读取 Redis 原始值。
	Read(ctx context.Context, key string, typ CacheType) (any, error)
	// RefreshLock 仅当锁值与持有者标识一致时续期锁。
	RefreshLock(ctx context.Context, key string, value string, ttl time.Duration) (bool, error)
	// ReleaseLock 仅当锁值与持有者标识一致时释放锁,避免误删其它实例刚抢到的锁。
	ReleaseLock(ctx context.Context, key string, value string) (bool, error)
	// SetNX 设置 Redis 分布式轻量锁。
	SetNX(ctx context.Context, key string, value any, ttl time.Duration) (bool, error)
	// Write 写入单条缓存数据。
	Write(ctx context.Context, entry Entry) error
	// WriteBatch 批量写入缓存数据。
	WriteBatch(ctx context.Context, entries []Entry) error
}

Store 定义缓存写入与分布式锁能力,业务框架只要实现该接口即可复用 Manager。

type StoreMutation

type StoreMutation struct {
	DeleteKeys   []string              // DeleteKeys 是需要删除的真实 Redis key 列表,来源于旧业务 key、空值元信息或重建结果元信息
	WriteEntries []Entry               // WriteEntries 是需要写入的缓存条目,来源于 Loader 结果或内部元信息
	AddIndex     []PrefixIndexMutation // AddIndex 表示需要把哪些 key 加入前缀索引集合,通常来源于写入成功的业务 key 或元信息 key
	RemoveIndex  []PrefixIndexMutation // RemoveIndex 表示需要从前缀索引集合移除哪些 key,通常来源于被删除或状态切换的旧 key
}

StoreMutation 描述一次缓存刷新或清理需要提交到底层 Store 的变更集合。

type Target

type Target struct {
	Index            string        // Index 是缓存目标索引,通常等于 Redis key 第一个冒号前的前缀
	Title            string        // Title 是缓存目标中文名称
	Key              string        // Key 是 Redis key 或 key 前缀,前缀型目标以冒号结尾
	KeyTitle         string        // KeyTitle 是缓存管理页展示用 key 模板
	Type             CacheType     // Type 是 Redis 数据结构类型
	Remark           string        // Remark 是缓存用途说明
	TTL              time.Duration // TTL 是业务缓存基础过期时间,0 表示不过期
	Jitter           time.Duration // Jitter 是 TTL 最大抖动范围,用于降低缓存雪崩风险
	EmptyTTL         time.Duration // EmptyTTL 是空值占位缓存过期时间,用于缓存穿透保护
	LoaderTimeout    time.Duration // LoaderTimeout 是当前目标回源加载超时时间,0 表示使用管理器默认值
	RefreshAll       bool          // RefreshAll 表示刷新全部缓存时是否包含该目标
	AllowEmptyMarker bool          // AllowEmptyMarker 表示加载空数据时是否写入空值占位
	VisibleEmptyMark bool          // VisibleEmptyMark 表示把空值占位直接写入业务 key;默认只写独立元信息
	Loader           Loader        // Loader 是缓存回源加载器
}

Target 描述一个可刷新、可展示、可复用的表数据缓存目标。

type ZMember

type ZMember struct {
	Member any     // Member 是有序集合成员
	Score  float64 // Score 是有序集合分数
}

ZMember 表示 Redis ZSet 成员与分数。

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL