Documentation
¶
Index ¶
- func SetupCFTestData(ctx context.Context, adapter *StoreCFAdapter, interactions []struct{ ... }) error
- type ANN
- type BERTRecall
- type BERTStore
- type CFStore
- type ChainMergeStrategy
- type ContentRecall
- type ContentStore
- type CosineSimilarity
- type DSSMRecall
- type DistributeRequest
- type DistributeStrategy
- type EmbRecall
- type ErrorHandler
- type ExposureCountStrategy
- type ExposureCounter
- type FallbackErrorHandler
- type Fanout
- type FirstMergeStrategy
- type GraphRecall
- type HybridRatioMergeStrategy
- type I2IRecall
- type IgnoreErrorHandler
- type ItemBasedCF
- type MFRecall
- type MFStore
- type MergeStrategy
- type PearsonCorrelation
- type PoolItem
- type PoolReader
- type PoolRecall
- type PriorityMergeStrategy
- type QuotaMergeStrategy
- type RPCRecall
- func (r *RPCRecall) Name() string
- func (r *RPCRecall) Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)
- func (r *RPCRecall) WithClient(client *http.Client) *RPCRecall
- func (r *RPCRecall) WithRequestBuilder(...) *RPCRecall
- func (r *RPCRecall) WithResponseParser(parser func(resp *http.Response) ([]*core.Item, error)) *RPCRecall
- func (r *RPCRecall) WithTopK(topK int) *RPCRecall
- type RatioMergeStrategy
- type RetryErrorHandler
- type RoundRobinMergeStrategy
- type ScoredHistoryItem
- type SimilarItemStore
- type SimilarityCalculator
- type SortOrder
- type SortedSetRecall
- func NewEditorPickRecall(store core.Store, key string, topK int) *SortedSetRecall
- func NewHotRecall(store core.Store, key string, topK int) *SortedSetRecall
- func NewLatestRecall(store core.Store, key string, topK int) *SortedSetRecall
- func NewTopRatedRecall(store core.Store, key string, topK int) *SortedSetRecall
- func NewTrendingRecall(store core.Store, key string, topK int) *SortedSetRecall
- func (r *SortedSetRecall) Kind() pipeline.Kind
- func (r *SortedSetRecall) Name() string
- func (r *SortedSetRecall) Process(ctx context.Context, rctx *core.RecommendContext, _ []*core.Item) ([]*core.Item, error)
- func (r *SortedSetRecall) Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)
- type Source
- type StoreCFAdapter
- func (a *StoreCFAdapter) GetAllItemVectors(ctx context.Context) (map[string][]float64, error)
- func (a *StoreCFAdapter) GetAllItems(ctx context.Context) ([]string, error)
- func (a *StoreCFAdapter) GetAllUsers(ctx context.Context) ([]string, error)
- func (a *StoreCFAdapter) GetItemFeatures(ctx context.Context, itemID string) (map[string]float64, error)
- func (a *StoreCFAdapter) GetItemUsers(ctx context.Context, itemID string) (map[string]float64, error)
- func (a *StoreCFAdapter) GetItemVector(ctx context.Context, itemID string) ([]float64, error)
- func (a *StoreCFAdapter) GetSimilarItems(ctx context.Context, itemFeatures map[string]float64, topK int) ([]string, error)
- func (a *StoreCFAdapter) GetUserItems(ctx context.Context, userID string) (map[string]float64, error)
- func (a *StoreCFAdapter) GetUserPreferences(ctx context.Context, userID string) (map[string]float64, error)
- func (a *StoreCFAdapter) GetUserVector(ctx context.Context, userID string) ([]float64, error)
- func (a *StoreCFAdapter) Name() string
- type StoreContentAdapter
- func (a *StoreContentAdapter) GetAllItemVectors(ctx context.Context) (map[string][]float64, error)
- func (a *StoreContentAdapter) GetAllItems(ctx context.Context) ([]string, error)
- func (a *StoreContentAdapter) GetAllUsers(ctx context.Context) ([]string, error)
- func (a *StoreContentAdapter) GetItemFeatures(ctx context.Context, itemID string) (map[string]float64, error)
- func (a *StoreContentAdapter) GetItemUsers(ctx context.Context, itemID string) (map[string]float64, error)
- func (a *StoreContentAdapter) GetItemVector(ctx context.Context, itemID string) ([]float64, error)
- func (a *StoreContentAdapter) GetSimilarItems(ctx context.Context, itemFeatures map[string]float64, topK int) ([]string, error)
- func (a *StoreContentAdapter) GetUserItems(ctx context.Context, userID string) (map[string]float64, error)
- func (a *StoreContentAdapter) GetUserPreferences(ctx context.Context, userID string) (map[string]float64, error)
- func (a *StoreContentAdapter) GetUserVector(ctx context.Context, userID string) ([]float64, error)
- func (a *StoreContentAdapter) Name() string
- type StoreMFAdapter
- func (a *StoreMFAdapter) GetAllItemVectors(ctx context.Context) (map[string][]float64, error)
- func (a *StoreMFAdapter) GetAllItems(ctx context.Context) ([]string, error)
- func (a *StoreMFAdapter) GetAllUsers(ctx context.Context) ([]string, error)
- func (a *StoreMFAdapter) GetItemFeatures(ctx context.Context, itemID string) (map[string]float64, error)
- func (a *StoreMFAdapter) GetItemUsers(ctx context.Context, itemID string) (map[string]float64, error)
- func (a *StoreMFAdapter) GetItemVector(ctx context.Context, itemID string) ([]float64, error)
- func (a *StoreMFAdapter) GetSimilarItems(ctx context.Context, itemFeatures map[string]float64, topK int) ([]string, error)
- func (a *StoreMFAdapter) GetUserItems(ctx context.Context, userID string) (map[string]float64, error)
- func (a *StoreMFAdapter) GetUserPreferences(ctx context.Context, userID string) (map[string]float64, error)
- func (a *StoreMFAdapter) GetUserVector(ctx context.Context, userID string) ([]float64, error)
- func (a *StoreMFAdapter) Name() string
- type TwoTowerRecall
- type TwoTowerRecallOption
- type U2IRecall
- type UnionMergeStrategy
- type UserBasedCF
- type UserBucketStrategy
- type UserHistory
- type UserHistoryStore
- type UserShuffleStrategy
- type WaterfallMergeStrategy
- type WeightedScoreMergeStrategy
- type Word2VecRecall
- type Word2VecStore
- type YouTubeDNNRecall
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetupCFTestData ¶
func SetupCFTestData(ctx context.Context, adapter *StoreCFAdapter, interactions []struct { UserID string ItemID string Score float64 }) error
SetupCFTestData 辅助函数:为测试准备协同过滤数据到 Store 中。 使用 StoreCFAdapter + MemoryStore 时,可以用这个函数方便地添加测试数据。
Types ¶
type ANN ¶
type ANN struct {
// VectorService 向量检索服务(领域层接口)
// 可以是 Milvus、Faiss 等向量数据库的实现
VectorService core.VectorService
// Collection 集合名称(用于向量搜索)
Collection string
// UserEmbedding 用户向量(如果提供,优先使用;否则从 rctx 获取)
UserEmbedding []float64
// TopK 返回 TopK 相似物品
TopK int
// Metric 距离度量:cosine / euclidean / inner_product
Metric string
// UserEmbeddingExtractor 从 RecommendContext 提取用户向量(可选)
UserEmbeddingExtractor func(rctx *core.RecommendContext) []float64
}
ANN 是 Embedding 向量检索召回源(Approximate Nearest Neighbor)。 支持余弦相似度、欧氏距离等计算方式。
设计原则:
- 直接使用 core.VectorService(领域层接口),符合 DDD 原则
- 消除过度抽象:不再需要 VectorStore 适配层
- 专注于高性能向量检索场景
type BERTRecall ¶
type BERTRecall struct {
// Model BERT 模型
Model *model.BERTModel
// Store 存储接口
Store BERTStore
// TopK 返回 TopK 个物品
TopK int
// Mode 召回模式:text(基于文本)或 query(基于查询)
Mode string
// TextField 文本字段:title / description / tags
TextField string
// BatchSize 批量编码大小(提高效率)
BatchSize int
// HistoryFunc 可选,返回用户最近点击的物品 ID 列表。
// 未设置时从 rctx.Attributes["recent_clicks"] 获取。
HistoryFunc func(rctx *core.RecommendContext) []string
}
BERTRecall 是基于 BERT 的召回源。
核心思想:
- 使用 BERT 将文本编码为语义向量
- 通过向量相似度找到语义相似的物品
使用场景:
- 文本语义召回:基于物品标题、描述的语义相似度
- 搜索推荐:用户查询与物品文本的语义匹配
- I2I 召回:基于物品文本语义相似度
func (*BERTRecall) Name ¶
func (r *BERTRecall) Name() string
func (*BERTRecall) Recall ¶
func (r *BERTRecall) Recall( ctx context.Context, rctx *core.RecommendContext, ) ([]*core.Item, error)
type BERTStore ¶
type BERTStore interface {
// GetItemText 获取物品的文本特征(标题、描述、标签等)
GetItemText(ctx context.Context, itemID string) (string, error)
// GetItemTags 获取物品的标签列表
GetItemTags(ctx context.Context, itemID string) ([]string, error)
// GetUserQuery 获取用户查询文本(可选,用于搜索场景)
GetUserQuery(ctx context.Context, userID string) (string, error)
// GetAllItems 获取所有物品 ID 列表
GetAllItems(ctx context.Context) ([]string, error)
}
BERTStore 是 BERT 召回所需的存储接口。
type CFStore ¶
type CFStore = core.RecallDataStore
CFStore 是协同过滤的存储接口(已废弃,使用 core.RecallDataStore)。 为了向后兼容,保留为类型别名。
type ChainMergeStrategy ¶
type ChainMergeStrategy struct {
Strategies []MergeStrategy
}
ChainMergeStrategy 将多个 MergeStrategy 串联执行,前一个策略的输出作为下一个的输入。 适用于需要组合多种策略的场景,例如"先加权调分,再按配额截取"。
dedup 仅传递给第一个策略,后续策略收到 dedup=false(避免重复去重)。
type ContentRecall ¶
type ContentRecall struct {
Store ContentStore
// TopK 返回 TopK 个物品
TopK int
// Metric 距离度量方式:cosine / jaccard / tfidf
Metric string
// UserPreferencesKey 从 RecommendContext 获取用户偏好的 key
// 如果为空,则从 Store 获取
UserPreferencesKey string
// UserPreferencesExtractor 从 RecommendContext 提取用户偏好(可选)
UserPreferencesExtractor func(rctx *core.RecommendContext) map[string]float64
}
ContentRecall 是基于内容的召回源(Content-Based Recommendation)。
核心思想:"用户喜欢具有某些特征的物品,推荐具有相似特征的其他物品"
func (*ContentRecall) Name ¶
func (r *ContentRecall) Name() string
func (*ContentRecall) Recall ¶
func (r *ContentRecall) Recall( ctx context.Context, rctx *core.RecommendContext, ) ([]*core.Item, error)
type ContentStore ¶
type ContentStore = core.RecallDataStore
ContentStore 是基于内容的推荐的存储接口(已废弃,使用 core.RecallDataStore)。 为了向后兼容,保留为类型别名。
type CosineSimilarity ¶
type CosineSimilarity struct{}
CosineSimilarity 是余弦相似度计算器。
func (*CosineSimilarity) Calculate ¶
func (c *CosineSimilarity) Calculate(x, y []float64) float64
type DSSMRecall ¶
type DSSMRecall struct {
Endpoint string // HTTP 服务端点,例如 "http://localhost:8083/query_embedding"
Timeout time.Duration
Client *http.Client
VectorService core.VectorService
TopK int
Collection string
Metric string
QueryFeatureExtractor feature.FeatureExtractor
}
DSSMRecall 是基于 DSSM 的 Query-Doc 语义召回源。
流程:从 rctx 获取 query_features -> /query_embedding -> 向量检索 Doc。 使用前需训练并启动 DSSM 服务,且 Doc Embeddings 已导入 VectorService。
python train/train_dssm.py uvicorn service.dssm_server:app --host 0.0.0.0 --port 8083
func (*DSSMRecall) Name ¶
func (r *DSSMRecall) Name() string
func (*DSSMRecall) Recall ¶
func (r *DSSMRecall) Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)
type DistributeRequest ¶
DistributeRequest 一次池内分发的请求参数。
func (DistributeRequest) NormalizeTopK ¶
func (r DistributeRequest) NormalizeTopK() int
NormalizeTopK 将 TopK 限制在 [0, len(Items)]。
type DistributeStrategy ¶
type DistributeStrategy interface {
// Pick 从 items 中为 userID 选取 topK 条。
Pick(ctx context.Context, req DistributeRequest) ([]PoolItem, error)
}
DistributeStrategy 定义池内 item 的分发策略接口。 从全量池成员中选取 TopK 分发给当前用户。
type ErrorHandler ¶
type ErrorHandler interface {
// HandleError 处理召回源错误。
// 返回 error != nil 时将中断整个 Fanout。
HandleError(ctx context.Context, source Source, err error, rctx *core.RecommendContext) ([]*core.Item, error)
}
ErrorHandler 是错误处理策略接口,用于自定义召回源失败时的处理逻辑。
type ExposureCountStrategy ¶
type ExposureCountStrategy struct {
Counter ExposureCounter
// KeyPrefix 曝光计数 key 前缀;最终 key = KeyPrefix:{yyyyMMdd}。
// 为空时需由调用方通过 PoolKey 推导。
KeyPrefix string
// RecordOnPick 为 true 时在 Pick 成功后自动累加曝光计数。
RecordOnPick bool
// TZOffset 日期后缀的时区偏移(相对 UTC),默认 8h (UTC+8)。
TZOffset time.Duration
}
ExposureCountStrategy 从计数器读取曝光次数,优先分发曝光最少的 item。 若 Counter 不可用或出错,自动回退 UserShuffle。
func NewExposureCountStrategy ¶
func NewExposureCountStrategy(counter ExposureCounter, keyPrefix string) *ExposureCountStrategy
func (*ExposureCountStrategy) Pick ¶
func (s *ExposureCountStrategy) Pick(ctx context.Context, req DistributeRequest) ([]PoolItem, error)
type ExposureCounter ¶
type ExposureCounter interface {
// GetCounts 批量获取 items 的曝光计数。key 为计数存储 key(含日期后缀),itemIDs 为待查询 ID。
GetCounts(ctx context.Context, key string, itemIDs []string) (map[string]int64, error)
// IncrCounts 批量累加曝光计数。
IncrCounts(ctx context.Context, key string, itemIDs []string, delta int64) error
}
ExposureCounter 曝光计数读写接口,用于 ExposureCountStrategy。
type FallbackErrorHandler ¶
type FallbackErrorHandler struct {
FallbackSource Source
// OnFallback 可选回调,降级时触发。
OnFallback func(source Source, err error)
}
FallbackErrorHandler 是降级策略:使用备用召回源。
func (*FallbackErrorHandler) HandleError ¶
func (h *FallbackErrorHandler) HandleError(ctx context.Context, source Source, err error, rctx *core.RecommendContext) ([]*core.Item, error)
type Fanout ¶
type Fanout struct {
// NodeName 自定义名称(可选),用于嵌套时区分不同 Fanout 实例。
// 为空时默认 "recall.fanout"。
NodeName string
Sources []Source
Dedup bool
Timeout time.Duration // 每个召回源的超时时间
MaxConcurrent int // 最大并发数(0 表示无限制)
// MergeStrategy 合并策略(必需)
// 使用内置策略:FirstMergeStrategy、UnionMergeStrategy、PriorityMergeStrategy
// 或实现自定义策略
MergeStrategy MergeStrategy
// ErrorHandler 错误处理策略(可选)
// 如果为 nil,则使用默认策略(IgnoreErrorHandler)
ErrorHandler ErrorHandler
// SourcePriorities 自定义优先级权重(可选)
// key: Source 名称,value: 优先级(值越小优先级越高)
// 如果未设置,则使用 Source 在数组中的索引作为优先级
SourcePriorities map[string]int
}
Fanout 是一个 Recall Node:并发执行多个召回源,并合并结果。 同时实现 Source 接口,支持嵌套在另一个 Fanout 中作为子召回源。
type FirstMergeStrategy ¶
type FirstMergeStrategy struct{}
FirstMergeStrategy 是默认的合并策略:按 ID 去重,保留第一个出现的。
type GraphRecall ¶
GraphRecall 是基于图嵌入(Node2Vec/GraphSAGE)的召回源。
调用图召回服务 /recall,传入 user_id、top_k,返回相似用户 ID 列表(如「关注页」召回)。 使用前需训练 Node2Vec 并启动图召回服务:
python train/train_node2vec.py --edges data/graph_edges.csv uvicorn service.graph_recall_server:app --host 0.0.0.0 --port 8084
func (*GraphRecall) Name ¶
func (r *GraphRecall) Name() string
func (*GraphRecall) Recall ¶
func (r *GraphRecall) Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)
type HybridRatioMergeStrategy ¶
type HybridRatioMergeStrategy struct {
// SourceRatios 显式参与比例分配的源(source -> ratio)
SourceRatios map[string]float64
// TotalLimit 合并总上限,<=0 时视为不截断(使用去重后总长度)
TotalLimit int
// DropUnconfiguredSources 是否丢弃未配置 ratio 的源,默认 false(即保留)
DropUnconfiguredSources bool
// SortByPriorityBeforeDedup 去重前是否按 recall_priority 排序,默认 false
SortByPriorityBeforeDedup bool
}
HybridRatioMergeStrategy 混合比例合并: 1) 可选按 recall_priority 预排序并去重(保留优先级高者) 2) 未配置在 SourceRatios 的源整路保留(按组内分数降序) 3) SourceRatios 中配置的源在剩余槽位内按比例分配
适用于“核心召回路固定保留 + 实验召回按比例占坑”的在线场景。
type I2IRecall ¶
type I2IRecall = ItemBasedCF
I2IRecall 是 ItemBasedCF 的类型别名,提供更符合工业习惯的命名。 i2i (Item-to-Item) 是工业级召回的"常青树",电商、内容流、短视频都在用。
type IgnoreErrorHandler ¶
IgnoreErrorHandler 忽略错误,返回空结果,不中断其他召回源。 可选 OnError 回调,用于 metrics/alerting 上报。
func (*IgnoreErrorHandler) HandleError ¶
func (h *IgnoreErrorHandler) HandleError(_ context.Context, source Source, err error, _ *core.RecommendContext) ([]*core.Item, error)
type ItemBasedCF ¶
type ItemBasedCF struct {
Store CFStore
// TopKSimilarItems 计算相似度时考虑的 TopK 个相似物品
// 如果 <= 0,则使用 Config 中的默认值
TopKSimilarItems int
// TopKItems 最终返回的 TopK 个物品
// 如果 <= 0,则使用 Config 中的默认值
TopKItems int
// SimilarityCalculator 相似度计算器(必需)
// 使用内置计算器:CosineSimilarity、PearsonCorrelation
// 或实现自定义计算器
SimilarityCalculator SimilarityCalculator
// MinCommonUsers 两个物品至少需要有多少个共同交互用户才计算相似度
// 如果 <= 0,则使用 Config 中的默认值
MinCommonUsers int
// UserHistoryKey 从 RecommendContext 获取用户历史物品的 key
// 如果为空,则从 Store 获取用户的所有交互物品
UserHistoryKey string
// Config 协同过滤配置(必需)
// 提供默认值,不能为 nil
Config core.CFConfig
}
ItemBasedCF 是基于物品的协同过滤召回源(Item-based Collaborative Filtering, Item-CF)。
核心思想:"被同一批用户喜欢的物品,相互相似"
算法流程:
- 构建物品 → 用户倒排表
- 计算物品相似度
- 对用户历史行为物品,取相似物品集合
工程特征:
- 实时性:好
- 计算复杂度:可控
- 可解释性:强
- 稳定性:高
工业地位:
- 工业级召回的"常青树"
- 电商、内容流、短视频都在用
- 可直接线上使用
在 Reckit 中的位置:
- 核心 Recall Node(i2iRecall)
- Label:recall.i2i
使用场景:
- 输入:用户最近点击 items
- 输出:相似 items
- "我看了这个,还可能看什么"
func (*ItemBasedCF) Name ¶
func (r *ItemBasedCF) Name() string
func (*ItemBasedCF) Recall ¶
func (r *ItemBasedCF) Recall( ctx context.Context, rctx *core.RecommendContext, ) ([]*core.Item, error)
type MFRecall ¶
type MFRecall struct {
Store MFStore
// TopK 返回 TopK 个物品
TopK int
// UserEmbeddingKey 从 RecommendContext 获取用户隐向量的 key
// 如果为空,则从 Store 获取
UserEmbeddingKey string
// UserEmbeddingExtractor 从 RecommendContext 提取用户隐向量(可选)
UserEmbeddingExtractor func(rctx *core.RecommendContext) []float64
}
MFRecall 是基于矩阵分解(Matrix Factorization)的召回源。
核心思想:将用户-物品交互矩阵分解为用户隐向量和物品隐向量 预测分数 = 用户隐向量 · 物品隐向量
算法类型:
- MF (Matrix Factorization): 基础矩阵分解
- ALS (Alternating Least Squares): 交替最小二乘法
- SVD (Singular Value Decomposition): 奇异值分解
工程特征:
- 实时性:好(离线训练,在线查表)
- 计算复杂度:低(向量点积)
- 可解释性:中等
- 冷启动:中等
在 Reckit 中的位置:
- 核心 Recall Node(MFRecall)
- Label:recall.mf
使用场景:
- 输入:用户隐向量(离线训练得到)
- 输出:TopK 物品(通过向量点积计算)
type MFStore ¶
type MFStore = core.RecallDataStore
MFStore 是矩阵分解的存储接口(已废弃,使用 core.RecallDataStore)。 为了向后兼容,保留为类型别名。
type MergeStrategy ¶
MergeStrategy 是合并策略接口,用于自定义多路召回结果的合并逻辑。
type PearsonCorrelation ¶
type PearsonCorrelation struct{}
PearsonCorrelation 是皮尔逊相关系数计算器。
func (*PearsonCorrelation) Calculate ¶
func (p *PearsonCorrelation) Calculate(x, y []float64) float64
type PoolReader ¶
type PoolReader interface {
// GetPoolItems 返回指定 key 的池内成员。
// start/stop 为分数或排名范围(实现自行约定语义);rev=true 表示降序。
GetPoolItems(ctx context.Context, key string, start, stop int64, rev bool) ([]PoolItem, error)
}
PoolReader 从外部存储读取池内全量成员。 典型实现:Redis ZSET ZRANGEBYSCORE / ZREVRANGEBYSCORE。
type PoolRecall ¶
type PoolRecall struct {
// Reader 池数据读取器(必需)
Reader PoolReader
// PoolKey Redis key 或其他存储标识
PoolKey string
// TopK 每次请求返回的最大条数,默认 20
TopK int
// Strategy 分发策略(必需)
Strategy DistributeStrategy
// SourceName 召回源标识,用于 recall_source label
SourceName string
// StrategyResolver 如果非 nil,每次 Recall 前调用,可动态切换分发策略。
// 返回 nil 表示使用默认 Strategy。
StrategyResolver func(rctx *core.RecommendContext) DistributeStrategy
}
PoolRecall 从全局池读取全量成员,再经分发策略选取 TopK 给当前用户。
适用于任何"运营池子"召回场景——编辑/算法手动维护内容池,按策略均匀分发给用户。 是比 i2i/u2i 更基础的召回方式。
支持场景配置覆盖:设置 StrategyResolver 可在运行时动态切换分发策略。
示例:
pool := &recall.PoolRecall{
Reader: redisPoolReader,
PoolKey: "recall:editor_pick",
TopK: 20,
Strategy: recall.NewUserShuffleStrategy(),
SourceName: "editor_pick",
}
func (*PoolRecall) Kind ¶
func (r *PoolRecall) Kind() pipeline.Kind
func (*PoolRecall) Name ¶
func (r *PoolRecall) Name() string
func (*PoolRecall) Process ¶
func (r *PoolRecall) Process( ctx context.Context, rctx *core.RecommendContext, _ []*core.Item, ) ([]*core.Item, error)
func (*PoolRecall) Recall ¶
func (r *PoolRecall) Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)
type PriorityMergeStrategy ¶
PriorityMergeStrategy 是按优先级合并的策略。 优先级由 Source 的索引决定(索引越小优先级越高),或通过 PriorityWeights 自定义。
type QuotaMergeStrategy ¶
type QuotaMergeStrategy struct {
// SourceQuotas 各源配额(source name -> 数量)。
SourceQuotas map[string]int
// DefaultQuota 未在 SourceQuotas 中配置的源的默认配额,0 表示不取。
DefaultQuota int
}
QuotaMergeStrategy 每个召回源取固定数量的 item。 适用于明确知道各源应出多少条的场景(如运营配置)。
type RPCRecall ¶
type RPCRecall struct {
// Endpoint 召回服务端点,例如 "http://localhost:8080/recall"
Endpoint string
// Timeout 请求超时时间
Timeout time.Duration
// TopK 返回 TopK 个物品(可选,服务端也可以返回)
TopK int
// Client HTTP 客户端(可选,如果不设置则使用默认客户端)
Client *http.Client
// RequestBuilder 自定义请求构建器(可选)
// 如果为 nil,则使用默认请求格式
RequestBuilder func(ctx context.Context, rctx *core.RecommendContext, topK int) (map[string]interface{}, error)
// ResponseParser 自定义响应解析器(可选)
// 如果为 nil,则使用默认响应格式
ResponseParser func(resp *http.Response) ([]*core.Item, error)
}
RPCRecall 是通过 RPC/HTTP 调用外部召回服务的召回源。
支持场景:
- 调用远程召回服务(Python/Java 等实现)
- 调用微服务架构中的召回服务
- 调用第三方召回 API
使用示例:
rpcRecall := &recall.RPCRecall{
Endpoint: "http://localhost:8080/recall",
Timeout: 2 * time.Second,
TopK: 20,
}
// 在 Fanout 中使用
fanout := &recall.Fanout{
Sources: []recall.Source{
rpcRecall,
&recall.SortedSetRecall{IDs: []string{"1", "2", "3"}, NodeName: "recall.hot"},
},
}
func NewRPCRecall ¶
NewRPCRecall 创建一个新的 RPC 召回源
func (*RPCRecall) Recall ¶
func (r *RPCRecall) Recall( ctx context.Context, rctx *core.RecommendContext, ) ([]*core.Item, error)
Recall 实现 Source 接口,调用远程召回服务
func (*RPCRecall) WithClient ¶
WithClient 设置自定义 HTTP 客户端
func (*RPCRecall) WithRequestBuilder ¶
func (r *RPCRecall) WithRequestBuilder(builder func(ctx context.Context, rctx *core.RecommendContext, topK int) (map[string]interface{}, error)) *RPCRecall
WithRequestBuilder 设置自定义请求构建器
type RatioMergeStrategy ¶
type RatioMergeStrategy struct {
// SourceRatios 各源比例(source name -> 比例 0.0~1.0)。
// 比例之和不必为 1.0,内部会自动归一化。
SourceRatios map[string]float64
// TotalLimit 总数量限制,必须 > 0。
TotalLimit int
}
RatioMergeStrategy 按比例从各召回源取 item,总量由 TotalLimit 控制。 适用于"hot 占 20%、cf 占 30%、ann 占 50%"的场景。
type RetryErrorHandler ¶
type RetryErrorHandler struct {
MaxRetries int
RetryDelay time.Duration
// OnRetry 可选回调,每次重试前触发(用于 metrics/日志)。
OnRetry func(source Source, attempt int, err error)
// OnGiveUp 可选回调,全部重试失败后触发。
OnGiveUp func(source Source, err error)
}
RetryErrorHandler 是重试策略:失败后重试若干次,每次间隔 RetryDelay。 全部重试仍失败时返回空结果(降级),不中断 Pipeline。
func (*RetryErrorHandler) HandleError ¶
func (h *RetryErrorHandler) HandleError(ctx context.Context, source Source, err error, rctx *core.RecommendContext) ([]*core.Item, error)
type RoundRobinMergeStrategy ¶
type RoundRobinMergeStrategy struct {
// SourceOrder 轮询顺序(可选),未指定时按源首次出现的顺序。
SourceOrder []string
// TopN 合并后取前 N 个,0 表示不限制。
TopN int
}
RoundRobinMergeStrategy 从各召回源轮流取 item,实现均匀交叉排列。 适用于信息流等需要内容多样性的场景。
type ScoredHistoryItem ¶
ScoredHistoryItem 带时间戳/分数的历史条目。 Score 通常为毫秒时间戳(与 Redis ZSET score 语义一致), 供多行为类型合并时按时间降序归并。
type SimilarItemStore ¶
type SimilarItemStore interface {
// GetSimilarItems 获取给定物品列表的相似物品
GetSimilarItems(ctx context.Context, itemIDs []string, topK int) ([]string, error)
}
SimilarItemStore 获取相似物品的存储接口
type SimilarityCalculator ¶
type SimilarityCalculator interface {
// Calculate 计算两个向量的相似度
// x, y: 两个向量(必须长度相同)
// 返回: 相似度值(通常在 -1 到 1 之间)
Calculate(x, y []float64) float64
}
SimilarityCalculator 是相似度计算接口,用于自定义相似度计算方法。
type SortedSetRecall ¶
type SortedSetRecall struct {
Store core.Store
// Key 完整的存储 key(优先使用)。
Key string
// KeyPrefix key 前缀。当 Key 为空时:
// - 若 rctx.Scene 非空,实际 key = KeyPrefix:Scene
// - 否则 key = KeyPrefix
KeyPrefix string
// TopK 返回数量上限,默认 100。
TopK int
// Order 排序方向,默认 OrderDesc。
Order SortOrder
// IDs fallback 静态 ID 列表(当 Store 不可用或为空时使用)。
IDs []string
// NodeName 自定义节点名称,影响 Name() 返回值和 recall_source 标签。
// 默认 "recall.sorted_set"。
NodeName string
}
SortedSetRecall 是基于外部有序集合的通用召回源。
它是 Hot / Trending / Latest / TopRated 等具体业务召回的底层抽象: 从 Store 的有序集合中按指定方向拉取 TopK 个 item ID 及分数。
数据读取策略(按优先级):
- SortedSetRangeStore → ZRevRangeWithScores / ZRangeWithScores(带分数 + 双向)
- KeyValueStore → ZRange(仅降序、不含分数)
- Store.Get → JSON 数组(纯 ID 列表)
- IDs 字段 → fallback 静态列表
Key 解析优先级:Key > KeyPrefix+Scene > KeyPrefix。
常见业务场景与对应构造器:
NewHotRecall → 热门召回(按热度降序) NewTrendingRecall → 趋势召回(按趋势分降序) NewLatestRecall → 最新召回(按发布时间降序) NewTopRatedRecall → 高分召回(按评分降序)
func NewEditorPickRecall ¶
func NewEditorPickRecall(store core.Store, key string, topK int) *SortedSetRecall
NewEditorPickRecall 创建编辑推荐召回(按运营权重降序)。
func NewHotRecall ¶
func NewHotRecall(store core.Store, key string, topK int) *SortedSetRecall
NewHotRecall 创建热门召回(按热度降序)。
recall := NewHotRecall(store, "hot:feed", 100)
// 或使用 key_prefix,key 按 scene 自动拼接:
recall := NewHotRecall(store, "", 100)
recall.KeyPrefix = "hot" // key = hot:{scene}
func NewLatestRecall ¶
func NewLatestRecall(store core.Store, key string, topK int) *SortedSetRecall
NewLatestRecall 创建最新召回(按发布时间降序,最新优先)。
func NewTopRatedRecall ¶
func NewTopRatedRecall(store core.Store, key string, topK int) *SortedSetRecall
NewTopRatedRecall 创建高分召回(按评分降序)。
func NewTrendingRecall ¶
func NewTrendingRecall(store core.Store, key string, topK int) *SortedSetRecall
NewTrendingRecall 创建趋势召回(按趋势分降序)。
func (*SortedSetRecall) Kind ¶
func (r *SortedSetRecall) Kind() pipeline.Kind
func (*SortedSetRecall) Name ¶
func (r *SortedSetRecall) Name() string
func (*SortedSetRecall) Process ¶
func (r *SortedSetRecall) Process( ctx context.Context, rctx *core.RecommendContext, _ []*core.Item, ) ([]*core.Item, error)
func (*SortedSetRecall) Recall ¶
func (r *SortedSetRecall) Recall( ctx context.Context, rctx *core.RecommendContext, ) ([]*core.Item, error)
type Source ¶
type Source interface {
Name() string
Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)
}
Source 表示一个可复用的召回源(热门/CF/内容/ANN/...)。 你可以把它理解为“可并发 fan-out 的策略单元”。
type StoreCFAdapter ¶
type StoreCFAdapter struct {
// KeyPrefix 是存储 key 的前缀
// 用户物品交互:{KeyPrefix}:user:{userID}
// 物品用户交互:{KeyPrefix}:item:{itemID}
// 所有用户列表:{KeyPrefix}:users
// 所有物品列表:{KeyPrefix}:items
KeyPrefix string
// contains filtered or unexported fields
}
StoreCFAdapter 是基于 core.Store 接口的召回数据存储适配器。 实现 core.RecallDataStore 接口,支持协同过滤、内容推荐、矩阵分解等召回算法。 从 Redis/MySQL 等存储中读取召回所需的数据。
func NewStoreCFAdapter ¶
func NewStoreCFAdapter(s core.Store, keyPrefix string) *StoreCFAdapter
NewStoreCFAdapter 创建一个基于 core.Store 的协同过滤适配器。
func (*StoreCFAdapter) GetAllItemVectors ¶
GetAllItemVectors 实现 core.RecallDataStore 接口(矩阵分解)
func (*StoreCFAdapter) GetAllItems ¶
func (a *StoreCFAdapter) GetAllItems(ctx context.Context) ([]string, error)
func (*StoreCFAdapter) GetAllUsers ¶
func (a *StoreCFAdapter) GetAllUsers(ctx context.Context) ([]string, error)
func (*StoreCFAdapter) GetItemFeatures ¶
func (a *StoreCFAdapter) GetItemFeatures(ctx context.Context, itemID string) (map[string]float64, error)
GetItemFeatures 实现 core.RecallDataStore 接口(内容推荐)
func (*StoreCFAdapter) GetItemUsers ¶
func (*StoreCFAdapter) GetItemVector ¶
GetItemVector 实现 core.RecallDataStore 接口(矩阵分解)
func (*StoreCFAdapter) GetSimilarItems ¶
func (a *StoreCFAdapter) GetSimilarItems(ctx context.Context, itemFeatures map[string]float64, topK int) ([]string, error)
GetSimilarItems 实现 core.RecallDataStore 接口(内容推荐)
func (*StoreCFAdapter) GetUserItems ¶
func (*StoreCFAdapter) GetUserPreferences ¶
func (a *StoreCFAdapter) GetUserPreferences(ctx context.Context, userID string) (map[string]float64, error)
GetUserPreferences 实现 core.RecallDataStore 接口(内容推荐)
func (*StoreCFAdapter) GetUserVector ¶
GetUserVector 实现 core.RecallDataStore 接口(矩阵分解)
func (*StoreCFAdapter) Name ¶
func (a *StoreCFAdapter) Name() string
Name 实现 core.RecallDataStore 接口
type StoreContentAdapter ¶
type StoreContentAdapter struct {
// KeyPrefix 是存储 key 的前缀
// 物品特征:{KeyPrefix}:item:{itemID}
// 用户偏好:{KeyPrefix}:user:{userID}
// 所有物品列表:{KeyPrefix}:items
KeyPrefix string
// contains filtered or unexported fields
}
StoreContentAdapter 是基于 core.Store 接口的内容推荐存储适配器。 从 Redis/MySQL 等存储中读取物品特征和用户偏好。
func NewStoreContentAdapter ¶
func NewStoreContentAdapter(s core.Store, keyPrefix string) *StoreContentAdapter
NewStoreContentAdapter 创建一个基于 core.Store 的内容推荐适配器。
func (*StoreContentAdapter) GetAllItemVectors ¶
GetAllItemVectors 实现 core.RecallDataStore 接口(矩阵分解)
func (*StoreContentAdapter) GetAllItems ¶
func (a *StoreContentAdapter) GetAllItems(ctx context.Context) ([]string, error)
func (*StoreContentAdapter) GetAllUsers ¶
func (a *StoreContentAdapter) GetAllUsers(ctx context.Context) ([]string, error)
GetAllUsers 实现 core.RecallDataStore 接口(协同过滤)
func (*StoreContentAdapter) GetItemFeatures ¶
func (*StoreContentAdapter) GetItemUsers ¶
func (a *StoreContentAdapter) GetItemUsers(ctx context.Context, itemID string) (map[string]float64, error)
GetItemUsers 实现 core.RecallDataStore 接口(协同过滤)
func (*StoreContentAdapter) GetItemVector ¶
GetItemVector 实现 core.RecallDataStore 接口(矩阵分解)
func (*StoreContentAdapter) GetSimilarItems ¶
func (*StoreContentAdapter) GetUserItems ¶
func (a *StoreContentAdapter) GetUserItems(ctx context.Context, userID string) (map[string]float64, error)
GetUserItems 实现 core.RecallDataStore 接口(协同过滤)
func (*StoreContentAdapter) GetUserPreferences ¶
func (*StoreContentAdapter) GetUserVector ¶
GetUserVector 实现 core.RecallDataStore 接口(矩阵分解)
func (*StoreContentAdapter) Name ¶
func (a *StoreContentAdapter) Name() string
Name 实现 core.RecallDataStore 接口
type StoreMFAdapter ¶
type StoreMFAdapter struct {
// KeyPrefix 是存储 key 的前缀
// 用户隐向量:{KeyPrefix}:user:{userID}
// 物品隐向量:{KeyPrefix}:item:{itemID}
// 所有物品列表:{KeyPrefix}:items
KeyPrefix string
// contains filtered or unexported fields
}
StoreMFAdapter 是基于 core.Store 接口的矩阵分解存储适配器。 从 Redis/MySQL 等存储中读取用户和物品的隐向量。
func NewStoreMFAdapter ¶
func NewStoreMFAdapter(s core.Store, keyPrefix string) *StoreMFAdapter
NewStoreMFAdapter 创建一个基于 core.Store 的矩阵分解适配器。
func (*StoreMFAdapter) GetAllItemVectors ¶
func (*StoreMFAdapter) GetAllItems ¶
func (a *StoreMFAdapter) GetAllItems(ctx context.Context) ([]string, error)
GetAllItems 实现 core.RecallDataStore 接口(通用方法)
func (*StoreMFAdapter) GetAllUsers ¶
func (a *StoreMFAdapter) GetAllUsers(ctx context.Context) ([]string, error)
GetAllUsers 实现 core.RecallDataStore 接口(协同过滤)
func (*StoreMFAdapter) GetItemFeatures ¶
func (a *StoreMFAdapter) GetItemFeatures(ctx context.Context, itemID string) (map[string]float64, error)
GetItemFeatures 实现 core.RecallDataStore 接口(内容推荐)
func (*StoreMFAdapter) GetItemUsers ¶
func (a *StoreMFAdapter) GetItemUsers(ctx context.Context, itemID string) (map[string]float64, error)
GetItemUsers 实现 core.RecallDataStore 接口(协同过滤)
func (*StoreMFAdapter) GetItemVector ¶
func (*StoreMFAdapter) GetSimilarItems ¶
func (a *StoreMFAdapter) GetSimilarItems(ctx context.Context, itemFeatures map[string]float64, topK int) ([]string, error)
GetSimilarItems 实现 core.RecallDataStore 接口(内容推荐)
func (*StoreMFAdapter) GetUserItems ¶
func (a *StoreMFAdapter) GetUserItems(ctx context.Context, userID string) (map[string]float64, error)
GetUserItems 实现 core.RecallDataStore 接口(协同过滤)
func (*StoreMFAdapter) GetUserPreferences ¶
func (a *StoreMFAdapter) GetUserPreferences(ctx context.Context, userID string) (map[string]float64, error)
GetUserPreferences 实现 core.RecallDataStore 接口(内容推荐)
func (*StoreMFAdapter) GetUserVector ¶
func (*StoreMFAdapter) Name ¶
func (a *StoreMFAdapter) Name() string
Name 实现 core.RecallDataStore 接口
type TwoTowerRecall ¶
type TwoTowerRecall struct {
// FeatureService 特征服务,用于获取用户特征
FeatureService core.FeatureService
// UserTowerService 用户塔推理服务(core.MLService 接口)
// 支持 ONNX Runtime、TorchServe、TensorFlow Serving 等
UserTowerService core.MLService
// VectorService 向量检索服务(core.VectorService 接口)
// 支持 Milvus、Faiss 等向量数据库
// 注意:使用领域接口(core.VectorService),由基础设施层(vector)实现
VectorService core.VectorService
// TopK 返回 TopK 个物品
TopK int
// Collection 向量数据库集合名称(用于存储 Item Embeddings)
Collection string
// Metric 距离度量方式:cosine / euclidean / inner_product
// 默认:inner_product(内积,适合双塔模型)
Metric string
// UserFeatureExtractor 自定义用户特征提取器(可选)
// 如果为 nil,则使用 FeatureService.GetUserFeatures 或默认抽取逻辑
// 支持传入 feature.FeatureExtractor 接口或函数类型(通过适配器)
UserFeatureExtractor feature.FeatureExtractor
}
TwoTowerRecall 是基于双塔模型的召回源实现。
核心流程:
- 获取用户特征(通过 FeatureService)
- 运行用户塔推理(通过 MLService,如 ONNX Runtime、TorchServe)
- 向量检索(通过 ANNService,如 Milvus、Faiss)
设计原则:
- 高内聚:TwoTowerRecall 只负责协调流程,具体逻辑在各服务中
- 低耦合:通过接口依赖,可替换实现(FeatureService、MLService、ANNService)
- DDD 模式:每个服务都是独立的领域服务
使用示例:
// 1. 创建特征服务
featureService := feature.NewFeatureService(...)
// 2. 创建用户塔推理服务(KServe V2 / Triton / TorchServe)
userTowerService := service.NewKServeClient("http://localhost:8080", "user_tower")
// 3. 创建向量检索服务(Milvus)
vectorService := vector.NewMilvusService("localhost:19530")
// 4. 创建双塔召回源
twoTowerRecall := recall.NewTwoTowerRecall(
featureService,
userTowerService,
vectorService,
recall.WithTwoTowerTopK(100),
recall.WithTwoTowerCollection("item_embeddings"),
)
// 5. 在 Fanout 中使用
fanout := &recall.Fanout{
Sources: []recall.Source{
twoTowerRecall,
&recall.SortedSetRecall{IDs: []string{"1", "2", "3"}, NodeName: "recall.hot"},
},
}
func NewTwoTowerRecall ¶
func NewTwoTowerRecall( featureService core.FeatureService, userTowerService core.MLService, vectorService core.VectorService, opts ...TwoTowerRecallOption, ) *TwoTowerRecall
NewTwoTowerRecall 创建一个新的双塔召回源。
func (*TwoTowerRecall) Name ¶
func (r *TwoTowerRecall) Name() string
func (*TwoTowerRecall) Recall ¶
func (r *TwoTowerRecall) Recall( ctx context.Context, rctx *core.RecommendContext, ) ([]*core.Item, error)
Recall 实现 Source 接口,执行双塔召回流程
type TwoTowerRecallOption ¶
type TwoTowerRecallOption func(*TwoTowerRecall)
TwoTowerRecallOption 双塔召回配置选项
func WithTwoTowerCollection ¶
func WithTwoTowerCollection(collection string) TwoTowerRecallOption
WithTwoTowerCollection 设置向量数据库集合名称
func WithTwoTowerMetric ¶
func WithTwoTowerMetric(metric string) TwoTowerRecallOption
WithTwoTowerMetric 设置距离度量方式
func WithTwoTowerTopK ¶
func WithTwoTowerTopK(topK int) TwoTowerRecallOption
WithTwoTowerTopK 设置 TopK
func WithTwoTowerUserFeatureExtractor ¶
func WithTwoTowerUserFeatureExtractor(extractor interface{}) TwoTowerRecallOption
WithTwoTowerUserFeatureExtractor 设置自定义用户特征提取器 支持传入 feature.FeatureExtractor 接口或函数类型(自动包装为 CustomFeatureExtractor)
type U2IRecall ¶
type U2IRecall = UserBasedCF
U2IRecall 是 UserBasedCF 的类型别名,提供更符合工业习惯的命名。 u2i (User-to-Item) 表示"直接给用户算候选物品集合"的召回方向。
type UserBasedCF ¶
type UserBasedCF struct {
Store CFStore
// TopKSimilarUsers 计算相似度时考虑的 TopK 个相似用户
// 如果 <= 0,则使用 Config 中的默认值
TopKSimilarUsers int
// TopKItems 最终返回的 TopK 个物品
// 如果 <= 0,则使用 Config 中的默认值
TopKItems int
// SimilarityCalculator 相似度计算器(必需)
// 使用内置计算器:CosineSimilarity、PearsonCorrelation
// 或实现自定义计算器
SimilarityCalculator SimilarityCalculator
// MinCommonItems 两个用户至少需要有多少个共同交互物品才计算相似度
// 如果 <= 0,则使用 Config 中的默认值
MinCommonItems int
// Config 协同过滤配置(必需)
// 提供默认值,不能为 nil
Config core.CFConfig
}
UserBasedCF 是基于用户的协同过滤召回源(User-based Collaborative Filtering, User-CF)。
核心思想:"兴趣相似的用户,喜欢相似的物品"
算法流程:
- 用户 → 行为向量(点击/收藏/购买)
- 计算用户相似度(Cosine / Pearson)
- 找 TopK 相似用户
- 推荐这些用户喜欢但目标用户未见过的物品
工程特征:
- 实时性:较差(用户变化快)
- 计算复杂度:高(用户数大)
- 可解释性:强
- 冷启动:差
工程使用现状:
- ❌ 几乎不直接在线用
- ✅ 离线分析 / 冷启动补充
在 Reckit 中的位置:
- 离线产出 u2u / u2i 结果
- 作为 Recall Node(u2u → u2i 工程拆分)
func (*UserBasedCF) Name ¶
func (r *UserBasedCF) Name() string
func (*UserBasedCF) Recall ¶
func (r *UserBasedCF) Recall( ctx context.Context, rctx *core.RecommendContext, ) ([]*core.Item, error)
type UserBucketStrategy ¶
type UserBucketStrategy struct{}
UserBucketStrategy 将用户均分到池内不同位置,每个 item 获得近似 1/N 的用户流量。 适合公平分发场景。
func NewUserBucketStrategy ¶
func NewUserBucketStrategy() *UserBucketStrategy
func (*UserBucketStrategy) Pick ¶
func (s *UserBucketStrategy) Pick(_ context.Context, req DistributeRequest) ([]PoolItem, error)
type UserHistory ¶
type UserHistory struct {
Store UserHistoryStore
// KeyPrefix 是 Store 中的 key 前缀,实际 key 为 {KeyPrefix}:{UserID}
KeyPrefix string
// BehaviorType 行为类型:view / click / purchase / favorite 等
BehaviorType string
// TopK 返回 TopK 个物品
TopK int
// TimeWindow 时间窗口(秒),只考虑该时间窗口内的历史
// 0 表示考虑所有历史
TimeWindow int64
// EnableSimilarExtend 是否启用相似物品扩展(I2I 召回)。
//
// 开启后,会通过 SimilarItemStore.GetSimilarItems() 将用户历史物品扩展为相似物品,
// 实现 I2I (Item-to-Item) 召回,推荐用户未交互过但可能感兴趣的物品。
//
// 与 I2IRecall 的对比:
// - UserHistory + EnableSimilarExtend:
// * 本质:I2I 接口模式,依赖外部实现 SimilarItemStore
// * 输入:用户历史 item 列表
// * 输出:相似 item 列表
// * 实现方式:外部提供 I2I 数据源(预计算索引、向量检索等)
// * 性能:依赖外部实现,可使用预计算索引(性能更好)
// * 灵活性:高,可切换不同 I2I 数据源
// * 适用场景:生产环境推荐,使用预计算的 I2I 索引
//
// - I2IRecall (ItemBasedCF):
// * 本质:I2I 完整实现,内部通过协同过滤计算相似度
// * 输入:用户历史 item
// * 输出:相似 item
// * 实现方式:内部实时计算物品相似度(Cosine/Pearson)
// * 性能:实时计算,可能较慢
// * 灵活性:低,固定协同过滤算法
// * 适用场景:小规模数据或需要实时计算的场景
//
// 使用示例:
// // 方式 1:使用预计算的 I2I 索引(推荐)
// userHistoryRecall := &recall.UserHistory{
// Store: &PrecomputedI2IStore{store: redisStore},
// EnableSimilarExtend: true, // 开启 I2I
// }
//
// // 方式 2:直接使用 I2IRecall(实时计算)
// i2iRecall := &recall.I2IRecall{
// Store: cfStore,
// SimilarityCalculator: &recall.CosineSimilarity{},
// }
EnableSimilarExtend bool
}
UserHistory 是基于用户 historical 行为的个性化召回源。 支持从 Store 读取用户的浏览、点击、购买等历史,推荐相似物品。
func (*UserHistory) Kind ¶
func (r *UserHistory) Kind() pipeline.Kind
func (*UserHistory) Name ¶
func (r *UserHistory) Name() string
func (*UserHistory) Process ¶
func (r *UserHistory) Process( ctx context.Context, rctx *core.RecommendContext, _ []*core.Item, ) ([]*core.Item, error)
func (*UserHistory) Recall ¶
func (r *UserHistory) Recall( ctx context.Context, rctx *core.RecommendContext, ) ([]*core.Item, error)
type UserHistoryStore ¶
type UserHistoryStore interface {
// GetUserHistory 获取用户的历史行为物品列表(带时间戳/分数)。
// 返回按 Score 降序排列的历史条目,Score 通常为毫秒时间戳。
GetUserHistory(ctx context.Context, userID string, keyPrefix, behaviorType string, timeWindow int64) ([]ScoredHistoryItem, error)
}
UserHistoryStore 是用户历史存储接口。
type UserShuffleStrategy ¶
type UserShuffleStrategy struct{}
UserShuffleStrategy 按用户确定性 shuffle 后取 TopK。 同用户同天稳定,不同用户看到不同子集。适合通用打散。
func NewUserShuffleStrategy ¶
func NewUserShuffleStrategy() *UserShuffleStrategy
func (*UserShuffleStrategy) Pick ¶
func (s *UserShuffleStrategy) Pick(_ context.Context, req DistributeRequest) ([]PoolItem, error)
type WaterfallMergeStrategy ¶
type WaterfallMergeStrategy struct {
// SourcePriority 源优先级顺序(高 -> 低),不在列表中的源排在最后。
SourcePriority []string
// TotalLimit 总数量限制,必须 > 0。
TotalLimit int
// SourceLimits 每源最大数量(可选),防止单源占满全部配额。
// 未配置的源无单独限制,受 TotalLimit 约束。
SourceLimits map[string]int
}
WaterfallMergeStrategy 高优先级源优先填满,不足时由低优先级源补充。 适用于"优先用个性化结果,不够再用热门兜底"的场景。
type WeightedScoreMergeStrategy ¶
type WeightedScoreMergeStrategy struct {
// SourceWeights 各召回源的权重乘数(source name -> weight multiplier)。
// 未配置的源使用 DefaultWeight。
SourceWeights map[string]float64
// DefaultWeight 未在 SourceWeights 中配置的源的默认权重,默认 1.0。
DefaultWeight float64
// TopN 合并后取前 N 个,0 表示不限制。
TopN int
}
WeightedScoreMergeStrategy 按召回源权重调整 item 分数,然后按分数降序排列。 适用于通过权重控制各源贡献度的场景。
type Word2VecRecall ¶
type Word2VecRecall struct {
// Model Word2Vec 模型
Model *model.Word2VecModel
// Store 存储接口
Store Word2VecStore
// TopK 返回 TopK 个物品
TopK int
// Mode 召回模式:text(基于文本)或 sequence(基于序列)
Mode string
// TextField 文本字段:title / description / tags
TextField string
// HistoryFunc 可选,返回用户最近点击的物品 ID 列表。
// 未设置时从 rctx.Attributes["recent_clicks"] 获取。
HistoryFunc func(rctx *core.RecommendContext) []string
}
Word2VecRecall 是基于 Word2Vec 的召回源。
核心思想:
- 将用户行为序列或物品文本特征转换为向量
- 通过向量相似度找到相似物品
使用场景:
- I2I 召回:基于物品文本相似度
- 序列召回:基于用户行为序列
func (*Word2VecRecall) Name ¶
func (r *Word2VecRecall) Name() string
func (*Word2VecRecall) Recall ¶
func (r *Word2VecRecall) Recall( ctx context.Context, rctx *core.RecommendContext, ) ([]*core.Item, error)
type Word2VecStore ¶
type Word2VecStore interface {
// GetItemText 获取物品的文本特征(标题、描述、标签等)
GetItemText(ctx context.Context, itemID string) (string, error)
// GetItemTags 获取物品的标签列表
GetItemTags(ctx context.Context, itemID string) ([]string, error)
// GetUserSequence 获取用户行为序列(点击的物品ID列表)
GetUserSequence(ctx context.Context, userID string, maxLen int) ([]string, error)
// GetAllItems 获取所有物品 ID 列表
GetAllItems(ctx context.Context) ([]string, error)
}
Word2VecStore 是 Word2Vec 召回所需的存储接口。
type YouTubeDNNRecall ¶
type YouTubeDNNRecall struct {
FeatureService core.FeatureService
Endpoint string // HTTP 服务端点,例如 "http://localhost:8082/user_embedding"
Timeout time.Duration
Client *http.Client
VectorService core.VectorService
TopK int
Collection string
Metric string
UserFeatureExtractor feature.FeatureExtractor
HistoryExtractor *feature.HistoryExtractor
}
YouTubeDNNRecall 是基于 YouTube DNN 的召回源。
流程:用户特征 + 用户历史行为 -> /user_embedding -> 用户向量 -> 向量检索 Item。 使用前需训练并启动 YouTube DNN 服务,且 Item Embeddings 已导入 VectorService。
python train/train_youtube_dnn.py uvicorn service.youtube_dnn_server:app --host 0.0.0.0 --port 8082
func (*YouTubeDNNRecall) Name ¶
func (r *YouTubeDNNRecall) Name() string
func (*YouTubeDNNRecall) Recall ¶
func (r *YouTubeDNNRecall) Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)
Source Files
¶
- ann.go
- bert_recall.go
- cf_store_adapter.go
- collaborative_filtering.go
- content.go
- content_store_adapter.go
- dssm_recall.go
- error_handler.go
- fanout.go
- graph_recall.go
- matrix_factorization.go
- merge_strategy.go
- mf_store_adapter.go
- pool.go
- pool_strategy.go
- rpc_recall.go
- sorted_set.go
- source.go
- two_tower_recall.go
- user_history.go
- word2vec_recall.go
- youtube_dnn_recall.go