recall

package
v0.0.0-...-02f0996 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetupCFTestData

func SetupCFTestData(ctx context.Context, adapter *StoreCFAdapter, interactions []struct {
	UserID string
	ItemID string
	Score  float64
}) error

SetupCFTestData 辅助函数:为测试准备协同过滤数据到 Store 中。 使用 StoreCFAdapter + MemoryStore 时,可以用这个函数方便地添加测试数据。

Types

type ANN

type ANN struct {
	// VectorService 向量检索服务(领域层接口)
	// 可以是 Milvus、Faiss 等向量数据库的实现
	VectorService core.VectorService

	// Collection 集合名称(用于向量搜索)
	Collection string

	// UserEmbedding 用户向量(如果提供,优先使用;否则从 rctx 获取)
	UserEmbedding []float64

	// TopK 返回 TopK 相似物品
	TopK int

	// Metric 距离度量:cosine / euclidean / inner_product
	Metric string

	// UserEmbeddingExtractor 从 RecommendContext 提取用户向量(可选)
	UserEmbeddingExtractor func(rctx *core.RecommendContext) []float64
}

ANN 是 Embedding 向量检索召回源(Approximate Nearest Neighbor)。 支持余弦相似度、欧氏距离等计算方式。

设计原则:

  • 直接使用 core.VectorService(领域层接口),符合 DDD 原则
  • 消除过度抽象:不再需要 VectorStore 适配层
  • 专注于高性能向量检索场景

func (*ANN) Kind

func (r *ANN) Kind() pipeline.Kind

func (*ANN) Name

func (r *ANN) Name() string

func (*ANN) Process

func (r *ANN) Process(ctx context.Context, rctx *core.RecommendContext, _ []*core.Item) ([]*core.Item, error)

Process 实现 pipeline.Node 接口,使 ANN 可直接放入 Pipeline.Nodes。

func (*ANN) Recall

func (r *ANN) Recall(
	ctx context.Context,
	rctx *core.RecommendContext,
) ([]*core.Item, error)

type BERTRecall

type BERTRecall struct {
	// Model BERT 模型
	Model *model.BERTModel

	// Store 存储接口
	Store BERTStore

	// TopK 返回 TopK 个物品
	TopK int

	// Mode 召回模式:text(基于文本)或 query(基于查询)
	Mode string

	// TextField 文本字段:title / description / tags
	TextField string

	// BatchSize 批量编码大小(提高效率)
	BatchSize int

	// HistoryFunc 可选,返回用户最近点击的物品 ID 列表。
	// 未设置时从 rctx.Attributes["recent_clicks"] 获取。
	HistoryFunc func(rctx *core.RecommendContext) []string
}

BERTRecall 是基于 BERT 的召回源。

核心思想:

  • 使用 BERT 将文本编码为语义向量
  • 通过向量相似度找到语义相似的物品

使用场景:

  • 文本语义召回:基于物品标题、描述的语义相似度
  • 搜索推荐:用户查询与物品文本的语义匹配
  • I2I 召回:基于物品文本语义相似度

func (*BERTRecall) Name

func (r *BERTRecall) Name() string

func (*BERTRecall) Recall

func (r *BERTRecall) Recall(
	ctx context.Context,
	rctx *core.RecommendContext,
) ([]*core.Item, error)

type BERTStore

type BERTStore interface {
	// GetItemText 获取物品的文本特征(标题、描述、标签等)
	GetItemText(ctx context.Context, itemID string) (string, error)

	// GetItemTags 获取物品的标签列表
	GetItemTags(ctx context.Context, itemID string) ([]string, error)

	// GetUserQuery 获取用户查询文本(可选,用于搜索场景)
	GetUserQuery(ctx context.Context, userID string) (string, error)

	// GetAllItems 获取所有物品 ID 列表
	GetAllItems(ctx context.Context) ([]string, error)
}

BERTStore 是 BERT 召回所需的存储接口。

type CFStore

type CFStore = core.RecallDataStore

CFStore 是协同过滤的存储接口(已废弃,使用 core.RecallDataStore)。 为了向后兼容,保留为类型别名。

type ChainMergeStrategy

type ChainMergeStrategy struct {
	Strategies []MergeStrategy
}

ChainMergeStrategy 将多个 MergeStrategy 串联执行,前一个策略的输出作为下一个的输入。 适用于需要组合多种策略的场景,例如"先加权调分,再按配额截取"。

dedup 仅传递给第一个策略,后续策略收到 dedup=false(避免重复去重)。

func (*ChainMergeStrategy) Merge

func (s *ChainMergeStrategy) Merge(items []*core.Item, dedup bool) []*core.Item

type ContentRecall

type ContentRecall struct {
	Store ContentStore

	// TopK 返回 TopK 个物品
	TopK int

	// Metric 距离度量方式:cosine / jaccard / tfidf
	Metric string

	// UserPreferencesKey 从 RecommendContext 获取用户偏好的 key
	// 如果为空,则从 Store 获取
	UserPreferencesKey string

	// UserPreferencesExtractor 从 RecommendContext 提取用户偏好(可选)
	UserPreferencesExtractor func(rctx *core.RecommendContext) map[string]float64
}

ContentRecall 是基于内容的召回源(Content-Based Recommendation)。

核心思想:"用户喜欢具有某些特征的物品,推荐具有相似特征的其他物品"

func (*ContentRecall) Name

func (r *ContentRecall) Name() string

func (*ContentRecall) Recall

func (r *ContentRecall) Recall(
	ctx context.Context,
	rctx *core.RecommendContext,
) ([]*core.Item, error)

type ContentStore

type ContentStore = core.RecallDataStore

ContentStore 是基于内容的推荐的存储接口(已废弃,使用 core.RecallDataStore)。 为了向后兼容,保留为类型别名。

type CosineSimilarity

type CosineSimilarity struct{}

CosineSimilarity 是余弦相似度计算器。

func (*CosineSimilarity) Calculate

func (c *CosineSimilarity) Calculate(x, y []float64) float64

type DSSMRecall

type DSSMRecall struct {
	Endpoint string // HTTP 服务端点,例如 "http://localhost:8083/query_embedding"
	Timeout  time.Duration
	Client   *http.Client

	VectorService core.VectorService
	TopK          int
	Collection    string
	Metric        string

	QueryFeatureExtractor feature.FeatureExtractor
}

DSSMRecall 是基于 DSSM 的 Query-Doc 语义召回源。

流程:从 rctx 获取 query_features -> /query_embedding -> 向量检索 Doc。 使用前需训练并启动 DSSM 服务,且 Doc Embeddings 已导入 VectorService。

python train/train_dssm.py
uvicorn service.dssm_server:app --host 0.0.0.0 --port 8083

func (*DSSMRecall) Name

func (r *DSSMRecall) Name() string

func (*DSSMRecall) Recall

func (r *DSSMRecall) Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)

type DistributeRequest

type DistributeRequest struct {
	UserID string
	Items  []PoolItem
	TopK   int
}

DistributeRequest 一次池内分发的请求参数。

func (DistributeRequest) NormalizeTopK

func (r DistributeRequest) NormalizeTopK() int

NormalizeTopK 将 TopK 限制在 [0, len(Items)]。

type DistributeStrategy

type DistributeStrategy interface {
	// Pick 从 items 中为 userID 选取 topK 条。
	Pick(ctx context.Context, req DistributeRequest) ([]PoolItem, error)
}

DistributeStrategy 定义池内 item 的分发策略接口。 从全量池成员中选取 TopK 分发给当前用户。

type EmbRecall

type EmbRecall = ANN

EmbRecall 是 ANN 的类型别名,提供更符合工业习惯的命名。

type ErrorHandler

type ErrorHandler interface {
	// HandleError 处理召回源错误。
	// 返回 error != nil 时将中断整个 Fanout。
	HandleError(ctx context.Context, source Source, err error, rctx *core.RecommendContext) ([]*core.Item, error)
}

ErrorHandler 是错误处理策略接口,用于自定义召回源失败时的处理逻辑。

type ExposureCountStrategy

type ExposureCountStrategy struct {
	Counter ExposureCounter

	// KeyPrefix 曝光计数 key 前缀;最终 key = KeyPrefix:{yyyyMMdd}。
	// 为空时需由调用方通过 PoolKey 推导。
	KeyPrefix string

	// RecordOnPick 为 true 时在 Pick 成功后自动累加曝光计数。
	RecordOnPick bool

	// TZOffset 日期后缀的时区偏移(相对 UTC),默认 8h (UTC+8)。
	TZOffset time.Duration
}

ExposureCountStrategy 从计数器读取曝光次数,优先分发曝光最少的 item。 若 Counter 不可用或出错,自动回退 UserShuffle。

func NewExposureCountStrategy

func NewExposureCountStrategy(counter ExposureCounter, keyPrefix string) *ExposureCountStrategy

func (*ExposureCountStrategy) Pick

type ExposureCounter

type ExposureCounter interface {
	// GetCounts 批量获取 items 的曝光计数。key 为计数存储 key(含日期后缀),itemIDs 为待查询 ID。
	GetCounts(ctx context.Context, key string, itemIDs []string) (map[string]int64, error)
	// IncrCounts 批量累加曝光计数。
	IncrCounts(ctx context.Context, key string, itemIDs []string, delta int64) error
}

ExposureCounter 曝光计数读写接口,用于 ExposureCountStrategy。

type FallbackErrorHandler

type FallbackErrorHandler struct {
	FallbackSource Source
	// OnFallback 可选回调,降级时触发。
	OnFallback func(source Source, err error)
}

FallbackErrorHandler 是降级策略:使用备用召回源。

func (*FallbackErrorHandler) HandleError

func (h *FallbackErrorHandler) HandleError(ctx context.Context, source Source, err error, rctx *core.RecommendContext) ([]*core.Item, error)

type Fanout

type Fanout struct {
	// NodeName 自定义名称(可选),用于嵌套时区分不同 Fanout 实例。
	// 为空时默认 "recall.fanout"。
	NodeName string

	Sources       []Source
	Dedup         bool
	Timeout       time.Duration // 每个召回源的超时时间
	MaxConcurrent int           // 最大并发数(0 表示无限制)

	// MergeStrategy 合并策略(必需)
	// 使用内置策略:FirstMergeStrategy、UnionMergeStrategy、PriorityMergeStrategy
	// 或实现自定义策略
	MergeStrategy MergeStrategy

	// ErrorHandler 错误处理策略(可选)
	// 如果为 nil,则使用默认策略(IgnoreErrorHandler)
	ErrorHandler ErrorHandler

	// SourcePriorities 自定义优先级权重(可选)
	// key: Source 名称,value: 优先级(值越小优先级越高)
	// 如果未设置,则使用 Source 在数组中的索引作为优先级
	SourcePriorities map[string]int
}

Fanout 是一个 Recall Node:并发执行多个召回源,并合并结果。 同时实现 Source 接口,支持嵌套在另一个 Fanout 中作为子召回源。

func (*Fanout) Kind

func (n *Fanout) Kind() pipeline.Kind

func (*Fanout) Name

func (n *Fanout) Name() string

func (*Fanout) Process

func (n *Fanout) Process(
	ctx context.Context,
	rctx *core.RecommendContext,
	_ []*core.Item,
) ([]*core.Item, error)

func (*Fanout) Recall

func (n *Fanout) Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)

Recall 使 Fanout 同时实现 Source 接口,支持嵌套在另一个 Fanout 中作为子召回源。

type FirstMergeStrategy

type FirstMergeStrategy struct{}

FirstMergeStrategy 是默认的合并策略:按 ID 去重,保留第一个出现的。

func (*FirstMergeStrategy) Merge

func (s *FirstMergeStrategy) Merge(items []*core.Item, dedup bool) []*core.Item

type GraphRecall

type GraphRecall struct {
	Endpoint string
	Timeout  time.Duration
	Client   *http.Client
	TopK     int
}

GraphRecall 是基于图嵌入(Node2Vec/GraphSAGE)的召回源。

调用图召回服务 /recall,传入 user_id、top_k,返回相似用户 ID 列表(如「关注页」召回)。 使用前需训练 Node2Vec 并启动图召回服务:

python train/train_node2vec.py --edges data/graph_edges.csv
uvicorn service.graph_recall_server:app --host 0.0.0.0 --port 8084

func (*GraphRecall) Name

func (r *GraphRecall) Name() string

func (*GraphRecall) Recall

func (r *GraphRecall) Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)

type HybridRatioMergeStrategy

type HybridRatioMergeStrategy struct {
	// SourceRatios 显式参与比例分配的源(source -> ratio)
	SourceRatios map[string]float64
	// TotalLimit 合并总上限,<=0 时视为不截断(使用去重后总长度)
	TotalLimit int
	// DropUnconfiguredSources 是否丢弃未配置 ratio 的源,默认 false(即保留)
	DropUnconfiguredSources bool
	// SortByPriorityBeforeDedup 去重前是否按 recall_priority 排序,默认 false
	SortByPriorityBeforeDedup bool
}

HybridRatioMergeStrategy 混合比例合并: 1) 可选按 recall_priority 预排序并去重(保留优先级高者) 2) 未配置在 SourceRatios 的源整路保留(按组内分数降序) 3) SourceRatios 中配置的源在剩余槽位内按比例分配

适用于“核心召回路固定保留 + 实验召回按比例占坑”的在线场景。

func (*HybridRatioMergeStrategy) Merge

func (s *HybridRatioMergeStrategy) Merge(items []*core.Item, dedup bool) []*core.Item

type I2IRecall

type I2IRecall = ItemBasedCF

I2IRecall 是 ItemBasedCF 的类型别名,提供更符合工业习惯的命名。 i2i (Item-to-Item) 是工业级召回的"常青树",电商、内容流、短视频都在用。

type IgnoreErrorHandler

type IgnoreErrorHandler struct {
	OnError func(source Source, err error)
}

IgnoreErrorHandler 忽略错误,返回空结果,不中断其他召回源。 可选 OnError 回调,用于 metrics/alerting 上报。

func (*IgnoreErrorHandler) HandleError

func (h *IgnoreErrorHandler) HandleError(_ context.Context, source Source, err error, _ *core.RecommendContext) ([]*core.Item, error)

type ItemBasedCF

type ItemBasedCF struct {
	Store CFStore

	// TopKSimilarItems 计算相似度时考虑的 TopK 个相似物品
	// 如果 <= 0,则使用 Config 中的默认值
	TopKSimilarItems int

	// TopKItems 最终返回的 TopK 个物品
	// 如果 <= 0,则使用 Config 中的默认值
	TopKItems int

	// SimilarityCalculator 相似度计算器(必需)
	// 使用内置计算器:CosineSimilarity、PearsonCorrelation
	// 或实现自定义计算器
	SimilarityCalculator SimilarityCalculator

	// MinCommonUsers 两个物品至少需要有多少个共同交互用户才计算相似度
	// 如果 <= 0,则使用 Config 中的默认值
	MinCommonUsers int

	// UserHistoryKey 从 RecommendContext 获取用户历史物品的 key
	// 如果为空,则从 Store 获取用户的所有交互物品
	UserHistoryKey string

	// Config 协同过滤配置(必需)
	// 提供默认值,不能为 nil
	Config core.CFConfig
}

ItemBasedCF 是基于物品的协同过滤召回源(Item-based Collaborative Filtering, Item-CF)。

核心思想:"被同一批用户喜欢的物品,相互相似"

算法流程:

  1. 构建物品 → 用户倒排表
  2. 计算物品相似度
  3. 对用户历史行为物品,取相似物品集合

工程特征:

  • 实时性:好
  • 计算复杂度:可控
  • 可解释性:强
  • 稳定性:高

工业地位:

  • 工业级召回的"常青树"
  • 电商、内容流、短视频都在用
  • 可直接线上使用

在 Reckit 中的位置:

  • 核心 Recall Node(i2iRecall)
  • Label:recall.i2i

使用场景:

  • 输入:用户最近点击 items
  • 输出:相似 items
  • "我看了这个,还可能看什么"

func (*ItemBasedCF) Name

func (r *ItemBasedCF) Name() string

func (*ItemBasedCF) Recall

func (r *ItemBasedCF) Recall(
	ctx context.Context,
	rctx *core.RecommendContext,
) ([]*core.Item, error)

type MFRecall

type MFRecall struct {
	Store MFStore

	// TopK 返回 TopK 个物品
	TopK int

	// UserEmbeddingKey 从 RecommendContext 获取用户隐向量的 key
	// 如果为空,则从 Store 获取
	UserEmbeddingKey string

	// UserEmbeddingExtractor 从 RecommendContext 提取用户隐向量(可选)
	UserEmbeddingExtractor func(rctx *core.RecommendContext) []float64
}

MFRecall 是基于矩阵分解(Matrix Factorization)的召回源。

核心思想:将用户-物品交互矩阵分解为用户隐向量和物品隐向量 预测分数 = 用户隐向量 · 物品隐向量

算法类型:

  • MF (Matrix Factorization): 基础矩阵分解
  • ALS (Alternating Least Squares): 交替最小二乘法
  • SVD (Singular Value Decomposition): 奇异值分解

工程特征:

  • 实时性:好(离线训练,在线查表)
  • 计算复杂度:低(向量点积)
  • 可解释性:中等
  • 冷启动:中等

在 Reckit 中的位置:

  • 核心 Recall Node(MFRecall)
  • Label:recall.mf

使用场景:

  • 输入:用户隐向量(离线训练得到)
  • 输出:TopK 物品(通过向量点积计算)

func (*MFRecall) Name

func (r *MFRecall) Name() string

func (*MFRecall) Recall

func (r *MFRecall) Recall(
	ctx context.Context,
	rctx *core.RecommendContext,
) ([]*core.Item, error)

type MFStore

type MFStore = core.RecallDataStore

MFStore 是矩阵分解的存储接口(已废弃,使用 core.RecallDataStore)。 为了向后兼容,保留为类型别名。

type MergeStrategy

type MergeStrategy interface {
	Merge(items []*core.Item, dedup bool) []*core.Item
}

MergeStrategy 是合并策略接口,用于自定义多路召回结果的合并逻辑。

type PearsonCorrelation

type PearsonCorrelation struct{}

PearsonCorrelation 是皮尔逊相关系数计算器。

func (*PearsonCorrelation) Calculate

func (p *PearsonCorrelation) Calculate(x, y []float64) float64

type PoolItem

type PoolItem struct {
	ID    string
	Score float64
}

PoolItem 表示池内单个成员,通常对应 Redis ZSET 的 member + score。

type PoolReader

type PoolReader interface {
	// GetPoolItems 返回指定 key 的池内成员。
	// start/stop 为分数或排名范围(实现自行约定语义);rev=true 表示降序。
	GetPoolItems(ctx context.Context, key string, start, stop int64, rev bool) ([]PoolItem, error)
}

PoolReader 从外部存储读取池内全量成员。 典型实现:Redis ZSET ZRANGEBYSCORE / ZREVRANGEBYSCORE。

type PoolRecall

type PoolRecall struct {
	// Reader 池数据读取器(必需)
	Reader PoolReader

	// PoolKey Redis key 或其他存储标识
	PoolKey string

	// TopK 每次请求返回的最大条数,默认 20
	TopK int

	// Strategy 分发策略(必需)
	Strategy DistributeStrategy

	// SourceName 召回源标识,用于 recall_source label
	SourceName string

	// StrategyResolver 如果非 nil,每次 Recall 前调用,可动态切换分发策略。
	// 返回 nil 表示使用默认 Strategy。
	StrategyResolver func(rctx *core.RecommendContext) DistributeStrategy
}

PoolRecall 从全局池读取全量成员,再经分发策略选取 TopK 给当前用户。

适用于任何"运营池子"召回场景——编辑/算法手动维护内容池,按策略均匀分发给用户。 是比 i2i/u2i 更基础的召回方式。

支持场景配置覆盖:设置 StrategyResolver 可在运行时动态切换分发策略。

示例:

pool := &recall.PoolRecall{
    Reader:     redisPoolReader,
    PoolKey:    "recall:editor_pick",
    TopK:       20,
    Strategy:   recall.NewUserShuffleStrategy(),
    SourceName: "editor_pick",
}

func (*PoolRecall) Kind

func (r *PoolRecall) Kind() pipeline.Kind

func (*PoolRecall) Name

func (r *PoolRecall) Name() string

func (*PoolRecall) Process

func (r *PoolRecall) Process(
	ctx context.Context,
	rctx *core.RecommendContext,
	_ []*core.Item,
) ([]*core.Item, error)

func (*PoolRecall) Recall

func (r *PoolRecall) Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)

type PriorityMergeStrategy

type PriorityMergeStrategy struct {
	PriorityWeights map[string]int
}

PriorityMergeStrategy 是按优先级合并的策略。 优先级由 Source 的索引决定(索引越小优先级越高),或通过 PriorityWeights 自定义。

func (*PriorityMergeStrategy) Merge

func (s *PriorityMergeStrategy) Merge(items []*core.Item, dedup bool) []*core.Item

type QuotaMergeStrategy

type QuotaMergeStrategy struct {
	// SourceQuotas 各源配额(source name -> 数量)。
	SourceQuotas map[string]int

	// DefaultQuota 未在 SourceQuotas 中配置的源的默认配额,0 表示不取。
	DefaultQuota int
}

QuotaMergeStrategy 每个召回源取固定数量的 item。 适用于明确知道各源应出多少条的场景(如运营配置)。

func (*QuotaMergeStrategy) Merge

func (s *QuotaMergeStrategy) Merge(items []*core.Item, dedup bool) []*core.Item

type RPCRecall

type RPCRecall struct {
	// Endpoint 召回服务端点,例如 "http://localhost:8080/recall"
	Endpoint string

	// Timeout 请求超时时间
	Timeout time.Duration

	// TopK 返回 TopK 个物品(可选,服务端也可以返回)
	TopK int

	// Client HTTP 客户端(可选,如果不设置则使用默认客户端)
	Client *http.Client

	// RequestBuilder 自定义请求构建器(可选)
	// 如果为 nil,则使用默认请求格式
	RequestBuilder func(ctx context.Context, rctx *core.RecommendContext, topK int) (map[string]interface{}, error)

	// ResponseParser 自定义响应解析器(可选)
	// 如果为 nil,则使用默认响应格式
	ResponseParser func(resp *http.Response) ([]*core.Item, error)
}

RPCRecall 是通过 RPC/HTTP 调用外部召回服务的召回源。

支持场景:

  • 调用远程召回服务(Python/Java 等实现)
  • 调用微服务架构中的召回服务
  • 调用第三方召回 API

使用示例:

rpcRecall := &recall.RPCRecall{
	Endpoint: "http://localhost:8080/recall",
	Timeout:  2 * time.Second,
	TopK:    20,
}

// 在 Fanout 中使用
fanout := &recall.Fanout{
	Sources: []recall.Source{
		rpcRecall,
		&recall.SortedSetRecall{IDs: []string{"1", "2", "3"}, NodeName: "recall.hot"},
	},
}

func NewRPCRecall

func NewRPCRecall(endpoint string, timeout time.Duration) *RPCRecall

NewRPCRecall 创建一个新的 RPC 召回源

func (*RPCRecall) Name

func (r *RPCRecall) Name() string

func (*RPCRecall) Recall

func (r *RPCRecall) Recall(
	ctx context.Context,
	rctx *core.RecommendContext,
) ([]*core.Item, error)

Recall 实现 Source 接口,调用远程召回服务

func (*RPCRecall) WithClient

func (r *RPCRecall) WithClient(client *http.Client) *RPCRecall

WithClient 设置自定义 HTTP 客户端

func (*RPCRecall) WithRequestBuilder

func (r *RPCRecall) WithRequestBuilder(builder func(ctx context.Context, rctx *core.RecommendContext, topK int) (map[string]interface{}, error)) *RPCRecall

WithRequestBuilder 设置自定义请求构建器

func (*RPCRecall) WithResponseParser

func (r *RPCRecall) WithResponseParser(parser func(resp *http.Response) ([]*core.Item, error)) *RPCRecall

WithResponseParser 设置自定义响应解析器

func (*RPCRecall) WithTopK

func (r *RPCRecall) WithTopK(topK int) *RPCRecall

WithTopK 设置 TopK

type RatioMergeStrategy

type RatioMergeStrategy struct {
	// SourceRatios 各源比例(source name -> 比例 0.0~1.0)。
	// 比例之和不必为 1.0,内部会自动归一化。
	SourceRatios map[string]float64

	// TotalLimit 总数量限制,必须 > 0。
	TotalLimit int
}

RatioMergeStrategy 按比例从各召回源取 item,总量由 TotalLimit 控制。 适用于"hot 占 20%、cf 占 30%、ann 占 50%"的场景。

func (*RatioMergeStrategy) Merge

func (s *RatioMergeStrategy) Merge(items []*core.Item, dedup bool) []*core.Item

type RetryErrorHandler

type RetryErrorHandler struct {
	MaxRetries int
	RetryDelay time.Duration
	// OnRetry 可选回调,每次重试前触发(用于 metrics/日志)。
	OnRetry func(source Source, attempt int, err error)
	// OnGiveUp 可选回调,全部重试失败后触发。
	OnGiveUp func(source Source, err error)
}

RetryErrorHandler 是重试策略:失败后重试若干次,每次间隔 RetryDelay。 全部重试仍失败时返回空结果(降级),不中断 Pipeline。

func (*RetryErrorHandler) HandleError

func (h *RetryErrorHandler) HandleError(ctx context.Context, source Source, err error, rctx *core.RecommendContext) ([]*core.Item, error)

type RoundRobinMergeStrategy

type RoundRobinMergeStrategy struct {
	// SourceOrder 轮询顺序(可选),未指定时按源首次出现的顺序。
	SourceOrder []string

	// TopN 合并后取前 N 个,0 表示不限制。
	TopN int
}

RoundRobinMergeStrategy 从各召回源轮流取 item,实现均匀交叉排列。 适用于信息流等需要内容多样性的场景。

func (*RoundRobinMergeStrategy) Merge

func (s *RoundRobinMergeStrategy) Merge(items []*core.Item, dedup bool) []*core.Item

type ScoredHistoryItem

type ScoredHistoryItem struct {
	ItemID string
	Score  float64
}

ScoredHistoryItem 带时间戳/分数的历史条目。 Score 通常为毫秒时间戳(与 Redis ZSET score 语义一致), 供多行为类型合并时按时间降序归并。

type SimilarItemStore

type SimilarItemStore interface {
	// GetSimilarItems 获取给定物品列表的相似物品
	GetSimilarItems(ctx context.Context, itemIDs []string, topK int) ([]string, error)
}

SimilarItemStore 获取相似物品的存储接口

type SimilarityCalculator

type SimilarityCalculator interface {
	// Calculate 计算两个向量的相似度
	// x, y: 两个向量(必须长度相同)
	// 返回: 相似度值(通常在 -1 到 1 之间)
	Calculate(x, y []float64) float64
}

SimilarityCalculator 是相似度计算接口,用于自定义相似度计算方法。

type SortOrder

type SortOrder string

SortOrder 定义有序集合的排序方向。

const (
	// OrderDesc 降序(默认):适用于热门(按热度)、评分最高(按评分)、最新(按时间戳)等。
	OrderDesc SortOrder = "desc"
	// OrderAsc 升序:适用于价格最低、距离最近等场景。
	OrderAsc SortOrder = "asc"
)

type SortedSetRecall

type SortedSetRecall struct {
	Store core.Store

	// Key 完整的存储 key(优先使用)。
	Key string

	// KeyPrefix key 前缀。当 Key 为空时:
	//   - 若 rctx.Scene 非空,实际 key = KeyPrefix:Scene
	//   - 否则 key = KeyPrefix
	KeyPrefix string

	// TopK 返回数量上限,默认 100。
	TopK int

	// Order 排序方向,默认 OrderDesc。
	Order SortOrder

	// IDs fallback 静态 ID 列表(当 Store 不可用或为空时使用)。
	IDs []string

	// NodeName 自定义节点名称,影响 Name() 返回值和 recall_source 标签。
	// 默认 "recall.sorted_set"。
	NodeName string
}

SortedSetRecall 是基于外部有序集合的通用召回源。

它是 Hot / Trending / Latest / TopRated 等具体业务召回的底层抽象: 从 Store 的有序集合中按指定方向拉取 TopK 个 item ID 及分数。

数据读取策略(按优先级):

  1. SortedSetRangeStore → ZRevRangeWithScores / ZRangeWithScores(带分数 + 双向)
  2. KeyValueStore → ZRange(仅降序、不含分数)
  3. Store.Get → JSON 数组(纯 ID 列表)
  4. IDs 字段 → fallback 静态列表

Key 解析优先级:Key > KeyPrefix+Scene > KeyPrefix。

常见业务场景与对应构造器:

NewHotRecall      → 热门召回(按热度降序)
NewTrendingRecall → 趋势召回(按趋势分降序)
NewLatestRecall   → 最新召回(按发布时间降序)
NewTopRatedRecall → 高分召回(按评分降序)

func NewEditorPickRecall

func NewEditorPickRecall(store core.Store, key string, topK int) *SortedSetRecall

NewEditorPickRecall 创建编辑推荐召回(按运营权重降序)。

func NewHotRecall

func NewHotRecall(store core.Store, key string, topK int) *SortedSetRecall

NewHotRecall 创建热门召回(按热度降序)。

recall := NewHotRecall(store, "hot:feed", 100)
// 或使用 key_prefix,key 按 scene 自动拼接:
recall := NewHotRecall(store, "", 100)
recall.KeyPrefix = "hot"  // key = hot:{scene}

func NewLatestRecall

func NewLatestRecall(store core.Store, key string, topK int) *SortedSetRecall

NewLatestRecall 创建最新召回(按发布时间降序,最新优先)。

func NewTopRatedRecall

func NewTopRatedRecall(store core.Store, key string, topK int) *SortedSetRecall

NewTopRatedRecall 创建高分召回(按评分降序)。

func NewTrendingRecall

func NewTrendingRecall(store core.Store, key string, topK int) *SortedSetRecall

NewTrendingRecall 创建趋势召回(按趋势分降序)。

func (*SortedSetRecall) Kind

func (r *SortedSetRecall) Kind() pipeline.Kind

func (*SortedSetRecall) Name

func (r *SortedSetRecall) Name() string

func (*SortedSetRecall) Process

func (r *SortedSetRecall) Process(
	ctx context.Context,
	rctx *core.RecommendContext,
	_ []*core.Item,
) ([]*core.Item, error)

func (*SortedSetRecall) Recall

func (r *SortedSetRecall) Recall(
	ctx context.Context,
	rctx *core.RecommendContext,
) ([]*core.Item, error)

type Source

type Source interface {
	Name() string
	Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)
}

Source 表示一个可复用的召回源(热门/CF/内容/ANN/...)。 你可以把它理解为“可并发 fan-out 的策略单元”。

type StoreCFAdapter

type StoreCFAdapter struct {

	// KeyPrefix 是存储 key 的前缀
	// 用户物品交互:{KeyPrefix}:user:{userID}
	// 物品用户交互:{KeyPrefix}:item:{itemID}
	// 所有用户列表:{KeyPrefix}:users
	// 所有物品列表:{KeyPrefix}:items
	KeyPrefix string
	// contains filtered or unexported fields
}

StoreCFAdapter 是基于 core.Store 接口的召回数据存储适配器。 实现 core.RecallDataStore 接口,支持协同过滤、内容推荐、矩阵分解等召回算法。 从 Redis/MySQL 等存储中读取召回所需的数据。

func NewStoreCFAdapter

func NewStoreCFAdapter(s core.Store, keyPrefix string) *StoreCFAdapter

NewStoreCFAdapter 创建一个基于 core.Store 的协同过滤适配器。

func (*StoreCFAdapter) GetAllItemVectors

func (a *StoreCFAdapter) GetAllItemVectors(ctx context.Context) (map[string][]float64, error)

GetAllItemVectors 实现 core.RecallDataStore 接口(矩阵分解)

func (*StoreCFAdapter) GetAllItems

func (a *StoreCFAdapter) GetAllItems(ctx context.Context) ([]string, error)

func (*StoreCFAdapter) GetAllUsers

func (a *StoreCFAdapter) GetAllUsers(ctx context.Context) ([]string, error)

func (*StoreCFAdapter) GetItemFeatures

func (a *StoreCFAdapter) GetItemFeatures(ctx context.Context, itemID string) (map[string]float64, error)

GetItemFeatures 实现 core.RecallDataStore 接口(内容推荐)

func (*StoreCFAdapter) GetItemUsers

func (a *StoreCFAdapter) GetItemUsers(ctx context.Context, itemID string) (map[string]float64, error)

func (*StoreCFAdapter) GetItemVector

func (a *StoreCFAdapter) GetItemVector(ctx context.Context, itemID string) ([]float64, error)

GetItemVector 实现 core.RecallDataStore 接口(矩阵分解)

func (*StoreCFAdapter) GetSimilarItems

func (a *StoreCFAdapter) GetSimilarItems(ctx context.Context, itemFeatures map[string]float64, topK int) ([]string, error)

GetSimilarItems 实现 core.RecallDataStore 接口(内容推荐)

func (*StoreCFAdapter) GetUserItems

func (a *StoreCFAdapter) GetUserItems(ctx context.Context, userID string) (map[string]float64, error)

func (*StoreCFAdapter) GetUserPreferences

func (a *StoreCFAdapter) GetUserPreferences(ctx context.Context, userID string) (map[string]float64, error)

GetUserPreferences 实现 core.RecallDataStore 接口(内容推荐)

func (*StoreCFAdapter) GetUserVector

func (a *StoreCFAdapter) GetUserVector(ctx context.Context, userID string) ([]float64, error)

GetUserVector 实现 core.RecallDataStore 接口(矩阵分解)

func (*StoreCFAdapter) Name

func (a *StoreCFAdapter) Name() string

Name 实现 core.RecallDataStore 接口

type StoreContentAdapter

type StoreContentAdapter struct {

	// KeyPrefix 是存储 key 的前缀
	// 物品特征:{KeyPrefix}:item:{itemID}
	// 用户偏好:{KeyPrefix}:user:{userID}
	// 所有物品列表:{KeyPrefix}:items
	KeyPrefix string
	// contains filtered or unexported fields
}

StoreContentAdapter 是基于 core.Store 接口的内容推荐存储适配器。 从 Redis/MySQL 等存储中读取物品特征和用户偏好。

func NewStoreContentAdapter

func NewStoreContentAdapter(s core.Store, keyPrefix string) *StoreContentAdapter

NewStoreContentAdapter 创建一个基于 core.Store 的内容推荐适配器。

func (*StoreContentAdapter) GetAllItemVectors

func (a *StoreContentAdapter) GetAllItemVectors(ctx context.Context) (map[string][]float64, error)

GetAllItemVectors 实现 core.RecallDataStore 接口(矩阵分解)

func (*StoreContentAdapter) GetAllItems

func (a *StoreContentAdapter) GetAllItems(ctx context.Context) ([]string, error)

func (*StoreContentAdapter) GetAllUsers

func (a *StoreContentAdapter) GetAllUsers(ctx context.Context) ([]string, error)

GetAllUsers 实现 core.RecallDataStore 接口(协同过滤)

func (*StoreContentAdapter) GetItemFeatures

func (a *StoreContentAdapter) GetItemFeatures(ctx context.Context, itemID string) (map[string]float64, error)

func (*StoreContentAdapter) GetItemUsers

func (a *StoreContentAdapter) GetItemUsers(ctx context.Context, itemID string) (map[string]float64, error)

GetItemUsers 实现 core.RecallDataStore 接口(协同过滤)

func (*StoreContentAdapter) GetItemVector

func (a *StoreContentAdapter) GetItemVector(ctx context.Context, itemID string) ([]float64, error)

GetItemVector 实现 core.RecallDataStore 接口(矩阵分解)

func (*StoreContentAdapter) GetSimilarItems

func (a *StoreContentAdapter) GetSimilarItems(ctx context.Context, itemFeatures map[string]float64, topK int) ([]string, error)

func (*StoreContentAdapter) GetUserItems

func (a *StoreContentAdapter) GetUserItems(ctx context.Context, userID string) (map[string]float64, error)

GetUserItems 实现 core.RecallDataStore 接口(协同过滤)

func (*StoreContentAdapter) GetUserPreferences

func (a *StoreContentAdapter) GetUserPreferences(ctx context.Context, userID string) (map[string]float64, error)

func (*StoreContentAdapter) GetUserVector

func (a *StoreContentAdapter) GetUserVector(ctx context.Context, userID string) ([]float64, error)

GetUserVector 实现 core.RecallDataStore 接口(矩阵分解)

func (*StoreContentAdapter) Name

func (a *StoreContentAdapter) Name() string

Name 实现 core.RecallDataStore 接口

type StoreMFAdapter

type StoreMFAdapter struct {

	// KeyPrefix 是存储 key 的前缀
	// 用户隐向量:{KeyPrefix}:user:{userID}
	// 物品隐向量:{KeyPrefix}:item:{itemID}
	// 所有物品列表:{KeyPrefix}:items
	KeyPrefix string
	// contains filtered or unexported fields
}

StoreMFAdapter 是基于 core.Store 接口的矩阵分解存储适配器。 从 Redis/MySQL 等存储中读取用户和物品的隐向量。

func NewStoreMFAdapter

func NewStoreMFAdapter(s core.Store, keyPrefix string) *StoreMFAdapter

NewStoreMFAdapter 创建一个基于 core.Store 的矩阵分解适配器。

func (*StoreMFAdapter) GetAllItemVectors

func (a *StoreMFAdapter) GetAllItemVectors(ctx context.Context) (map[string][]float64, error)

func (*StoreMFAdapter) GetAllItems

func (a *StoreMFAdapter) GetAllItems(ctx context.Context) ([]string, error)

GetAllItems 实现 core.RecallDataStore 接口(通用方法)

func (*StoreMFAdapter) GetAllUsers

func (a *StoreMFAdapter) GetAllUsers(ctx context.Context) ([]string, error)

GetAllUsers 实现 core.RecallDataStore 接口(协同过滤)

func (*StoreMFAdapter) GetItemFeatures

func (a *StoreMFAdapter) GetItemFeatures(ctx context.Context, itemID string) (map[string]float64, error)

GetItemFeatures 实现 core.RecallDataStore 接口(内容推荐)

func (*StoreMFAdapter) GetItemUsers

func (a *StoreMFAdapter) GetItemUsers(ctx context.Context, itemID string) (map[string]float64, error)

GetItemUsers 实现 core.RecallDataStore 接口(协同过滤)

func (*StoreMFAdapter) GetItemVector

func (a *StoreMFAdapter) GetItemVector(ctx context.Context, itemID string) ([]float64, error)

func (*StoreMFAdapter) GetSimilarItems

func (a *StoreMFAdapter) GetSimilarItems(ctx context.Context, itemFeatures map[string]float64, topK int) ([]string, error)

GetSimilarItems 实现 core.RecallDataStore 接口(内容推荐)

func (*StoreMFAdapter) GetUserItems

func (a *StoreMFAdapter) GetUserItems(ctx context.Context, userID string) (map[string]float64, error)

GetUserItems 实现 core.RecallDataStore 接口(协同过滤)

func (*StoreMFAdapter) GetUserPreferences

func (a *StoreMFAdapter) GetUserPreferences(ctx context.Context, userID string) (map[string]float64, error)

GetUserPreferences 实现 core.RecallDataStore 接口(内容推荐)

func (*StoreMFAdapter) GetUserVector

func (a *StoreMFAdapter) GetUserVector(ctx context.Context, userID string) ([]float64, error)

func (*StoreMFAdapter) Name

func (a *StoreMFAdapter) Name() string

Name 实现 core.RecallDataStore 接口

type TwoTowerRecall

type TwoTowerRecall struct {
	// FeatureService 特征服务,用于获取用户特征
	FeatureService core.FeatureService

	// UserTowerService 用户塔推理服务(core.MLService 接口)
	// 支持 ONNX Runtime、TorchServe、TensorFlow Serving 等
	UserTowerService core.MLService

	// VectorService 向量检索服务(core.VectorService 接口)
	// 支持 Milvus、Faiss 等向量数据库
	// 注意:使用领域接口(core.VectorService),由基础设施层(vector)实现
	VectorService core.VectorService

	// TopK 返回 TopK 个物品
	TopK int

	// Collection 向量数据库集合名称(用于存储 Item Embeddings)
	Collection string

	// Metric 距离度量方式:cosine / euclidean / inner_product
	// 默认:inner_product(内积,适合双塔模型)
	Metric string

	// UserFeatureExtractor 自定义用户特征提取器(可选)
	// 如果为 nil,则使用 FeatureService.GetUserFeatures 或默认抽取逻辑
	// 支持传入 feature.FeatureExtractor 接口或函数类型(通过适配器)
	UserFeatureExtractor feature.FeatureExtractor
}

TwoTowerRecall 是基于双塔模型的召回源实现。

核心流程:

  1. 获取用户特征(通过 FeatureService)
  2. 运行用户塔推理(通过 MLService,如 ONNX Runtime、TorchServe)
  3. 向量检索(通过 ANNService,如 Milvus、Faiss)

设计原则:

  • 高内聚:TwoTowerRecall 只负责协调流程,具体逻辑在各服务中
  • 低耦合:通过接口依赖,可替换实现(FeatureService、MLService、ANNService)
  • DDD 模式:每个服务都是独立的领域服务

使用示例:

// 1. 创建特征服务
featureService := feature.NewFeatureService(...)

// 2. 创建用户塔推理服务(KServe V2 / Triton / TorchServe)
userTowerService := service.NewKServeClient("http://localhost:8080", "user_tower")

// 3. 创建向量检索服务(Milvus)
vectorService := vector.NewMilvusService("localhost:19530")

// 4. 创建双塔召回源
twoTowerRecall := recall.NewTwoTowerRecall(
	featureService,
	userTowerService,
	vectorService,
	recall.WithTwoTowerTopK(100),
	recall.WithTwoTowerCollection("item_embeddings"),
)

// 5. 在 Fanout 中使用
fanout := &recall.Fanout{
	Sources: []recall.Source{
		twoTowerRecall,
		&recall.SortedSetRecall{IDs: []string{"1", "2", "3"}, NodeName: "recall.hot"},
	},
}

func NewTwoTowerRecall

func NewTwoTowerRecall(
	featureService core.FeatureService,
	userTowerService core.MLService,
	vectorService core.VectorService,
	opts ...TwoTowerRecallOption,
) *TwoTowerRecall

NewTwoTowerRecall 创建一个新的双塔召回源。

func (*TwoTowerRecall) Name

func (r *TwoTowerRecall) Name() string

func (*TwoTowerRecall) Recall

func (r *TwoTowerRecall) Recall(
	ctx context.Context,
	rctx *core.RecommendContext,
) ([]*core.Item, error)

Recall 实现 Source 接口,执行双塔召回流程

type TwoTowerRecallOption

type TwoTowerRecallOption func(*TwoTowerRecall)

TwoTowerRecallOption 双塔召回配置选项

func WithTwoTowerCollection

func WithTwoTowerCollection(collection string) TwoTowerRecallOption

WithTwoTowerCollection 设置向量数据库集合名称

func WithTwoTowerMetric

func WithTwoTowerMetric(metric string) TwoTowerRecallOption

WithTwoTowerMetric 设置距离度量方式

func WithTwoTowerTopK

func WithTwoTowerTopK(topK int) TwoTowerRecallOption

WithTwoTowerTopK 设置 TopK

func WithTwoTowerUserFeatureExtractor

func WithTwoTowerUserFeatureExtractor(extractor interface{}) TwoTowerRecallOption

WithTwoTowerUserFeatureExtractor 设置自定义用户特征提取器 支持传入 feature.FeatureExtractor 接口或函数类型(自动包装为 CustomFeatureExtractor)

type U2IRecall

type U2IRecall = UserBasedCF

U2IRecall 是 UserBasedCF 的类型别名,提供更符合工业习惯的命名。 u2i (User-to-Item) 表示"直接给用户算候选物品集合"的召回方向。

type UnionMergeStrategy

type UnionMergeStrategy struct{}

UnionMergeStrategy 是并集策略:不去重,保留所有结果。

func (*UnionMergeStrategy) Merge

func (s *UnionMergeStrategy) Merge(items []*core.Item, dedup bool) []*core.Item

type UserBasedCF

type UserBasedCF struct {
	Store CFStore

	// TopKSimilarUsers 计算相似度时考虑的 TopK 个相似用户
	// 如果 <= 0,则使用 Config 中的默认值
	TopKSimilarUsers int

	// TopKItems 最终返回的 TopK 个物品
	// 如果 <= 0,则使用 Config 中的默认值
	TopKItems int

	// SimilarityCalculator 相似度计算器(必需)
	// 使用内置计算器:CosineSimilarity、PearsonCorrelation
	// 或实现自定义计算器
	SimilarityCalculator SimilarityCalculator

	// MinCommonItems 两个用户至少需要有多少个共同交互物品才计算相似度
	// 如果 <= 0,则使用 Config 中的默认值
	MinCommonItems int

	// Config 协同过滤配置(必需)
	// 提供默认值,不能为 nil
	Config core.CFConfig
}

UserBasedCF 是基于用户的协同过滤召回源(User-based Collaborative Filtering, User-CF)。

核心思想:"兴趣相似的用户,喜欢相似的物品"

算法流程:

  1. 用户 → 行为向量(点击/收藏/购买)
  2. 计算用户相似度(Cosine / Pearson)
  3. 找 TopK 相似用户
  4. 推荐这些用户喜欢但目标用户未见过的物品

工程特征:

  • 实时性:较差(用户变化快)
  • 计算复杂度:高(用户数大)
  • 可解释性:强
  • 冷启动:差

工程使用现状:

  • ❌ 几乎不直接在线用
  • ✅ 离线分析 / 冷启动补充

在 Reckit 中的位置:

  • 离线产出 u2u / u2i 结果
  • 作为 Recall Node(u2u → u2i 工程拆分)

func (*UserBasedCF) Name

func (r *UserBasedCF) Name() string

func (*UserBasedCF) Recall

func (r *UserBasedCF) Recall(
	ctx context.Context,
	rctx *core.RecommendContext,
) ([]*core.Item, error)

type UserBucketStrategy

type UserBucketStrategy struct{}

UserBucketStrategy 将用户均分到池内不同位置,每个 item 获得近似 1/N 的用户流量。 适合公平分发场景。

func NewUserBucketStrategy

func NewUserBucketStrategy() *UserBucketStrategy

func (*UserBucketStrategy) Pick

type UserHistory

type UserHistory struct {
	Store UserHistoryStore

	// KeyPrefix 是 Store 中的 key 前缀,实际 key 为 {KeyPrefix}:{UserID}
	KeyPrefix string

	// BehaviorType 行为类型:view / click / purchase / favorite 等
	BehaviorType string

	// TopK 返回 TopK 个物品
	TopK int

	// TimeWindow 时间窗口(秒),只考虑该时间窗口内的历史
	// 0 表示考虑所有历史
	TimeWindow int64

	// EnableSimilarExtend 是否启用相似物品扩展(I2I 召回)。
	//
	// 开启后,会通过 SimilarItemStore.GetSimilarItems() 将用户历史物品扩展为相似物品,
	// 实现 I2I (Item-to-Item) 召回,推荐用户未交互过但可能感兴趣的物品。
	//
	// 与 I2IRecall 的对比:
	//   - UserHistory + EnableSimilarExtend:
	//     * 本质:I2I 接口模式,依赖外部实现 SimilarItemStore
	//     * 输入:用户历史 item 列表
	//     * 输出:相似 item 列表
	//     * 实现方式:外部提供 I2I 数据源(预计算索引、向量检索等)
	//     * 性能:依赖外部实现,可使用预计算索引(性能更好)
	//     * 灵活性:高,可切换不同 I2I 数据源
	//     * 适用场景:生产环境推荐,使用预计算的 I2I 索引
	//
	//   - I2IRecall (ItemBasedCF):
	//     * 本质:I2I 完整实现,内部通过协同过滤计算相似度
	//     * 输入:用户历史 item
	//     * 输出:相似 item
	//     * 实现方式:内部实时计算物品相似度(Cosine/Pearson)
	//     * 性能:实时计算,可能较慢
	//     * 灵活性:低,固定协同过滤算法
	//     * 适用场景:小规模数据或需要实时计算的场景
	//
	// 使用示例:
	//   // 方式 1:使用预计算的 I2I 索引(推荐)
	//   userHistoryRecall := &recall.UserHistory{
	//       Store:               &PrecomputedI2IStore{store: redisStore},
	//       EnableSimilarExtend: true,  // 开启 I2I
	//   }
	//
	//   // 方式 2:直接使用 I2IRecall(实时计算)
	//   i2iRecall := &recall.I2IRecall{
	//       Store:                cfStore,
	//       SimilarityCalculator: &recall.CosineSimilarity{},
	//   }
	EnableSimilarExtend bool
}

UserHistory 是基于用户 historical 行为的个性化召回源。 支持从 Store 读取用户的浏览、点击、购买等历史,推荐相似物品。

func (*UserHistory) Kind

func (r *UserHistory) Kind() pipeline.Kind

func (*UserHistory) Name

func (r *UserHistory) Name() string

func (*UserHistory) Process

func (r *UserHistory) Process(
	ctx context.Context,
	rctx *core.RecommendContext,
	_ []*core.Item,
) ([]*core.Item, error)

func (*UserHistory) Recall

func (r *UserHistory) Recall(
	ctx context.Context,
	rctx *core.RecommendContext,
) ([]*core.Item, error)

type UserHistoryStore

type UserHistoryStore interface {
	// GetUserHistory 获取用户的历史行为物品列表(带时间戳/分数)。
	// 返回按 Score 降序排列的历史条目,Score 通常为毫秒时间戳。
	GetUserHistory(ctx context.Context, userID string, keyPrefix, behaviorType string, timeWindow int64) ([]ScoredHistoryItem, error)
}

UserHistoryStore 是用户历史存储接口。

type UserShuffleStrategy

type UserShuffleStrategy struct{}

UserShuffleStrategy 按用户确定性 shuffle 后取 TopK。 同用户同天稳定,不同用户看到不同子集。适合通用打散。

func NewUserShuffleStrategy

func NewUserShuffleStrategy() *UserShuffleStrategy

func (*UserShuffleStrategy) Pick

type WaterfallMergeStrategy

type WaterfallMergeStrategy struct {
	// SourcePriority 源优先级顺序(高 -> 低),不在列表中的源排在最后。
	SourcePriority []string

	// TotalLimit 总数量限制,必须 > 0。
	TotalLimit int

	// SourceLimits 每源最大数量(可选),防止单源占满全部配额。
	// 未配置的源无单独限制,受 TotalLimit 约束。
	SourceLimits map[string]int
}

WaterfallMergeStrategy 高优先级源优先填满,不足时由低优先级源补充。 适用于"优先用个性化结果,不够再用热门兜底"的场景。

func (*WaterfallMergeStrategy) Merge

func (s *WaterfallMergeStrategy) Merge(items []*core.Item, dedup bool) []*core.Item

type WeightedScoreMergeStrategy

type WeightedScoreMergeStrategy struct {
	// SourceWeights 各召回源的权重乘数(source name -> weight multiplier)。
	// 未配置的源使用 DefaultWeight。
	SourceWeights map[string]float64

	// DefaultWeight 未在 SourceWeights 中配置的源的默认权重,默认 1.0。
	DefaultWeight float64

	// TopN 合并后取前 N 个,0 表示不限制。
	TopN int
}

WeightedScoreMergeStrategy 按召回源权重调整 item 分数,然后按分数降序排列。 适用于通过权重控制各源贡献度的场景。

func (*WeightedScoreMergeStrategy) Merge

func (s *WeightedScoreMergeStrategy) Merge(items []*core.Item, dedup bool) []*core.Item

type Word2VecRecall

type Word2VecRecall struct {
	// Model Word2Vec 模型
	Model *model.Word2VecModel

	// Store 存储接口
	Store Word2VecStore

	// TopK 返回 TopK 个物品
	TopK int

	// Mode 召回模式:text(基于文本)或 sequence(基于序列)
	Mode string

	// TextField 文本字段:title / description / tags
	TextField string

	// HistoryFunc 可选,返回用户最近点击的物品 ID 列表。
	// 未设置时从 rctx.Attributes["recent_clicks"] 获取。
	HistoryFunc func(rctx *core.RecommendContext) []string
}

Word2VecRecall 是基于 Word2Vec 的召回源。

核心思想:

  • 将用户行为序列或物品文本特征转换为向量
  • 通过向量相似度找到相似物品

使用场景:

  • I2I 召回:基于物品文本相似度
  • 序列召回:基于用户行为序列

func (*Word2VecRecall) Name

func (r *Word2VecRecall) Name() string

func (*Word2VecRecall) Recall

func (r *Word2VecRecall) Recall(
	ctx context.Context,
	rctx *core.RecommendContext,
) ([]*core.Item, error)

type Word2VecStore

type Word2VecStore interface {
	// GetItemText 获取物品的文本特征(标题、描述、标签等)
	GetItemText(ctx context.Context, itemID string) (string, error)

	// GetItemTags 获取物品的标签列表
	GetItemTags(ctx context.Context, itemID string) ([]string, error)

	// GetUserSequence 获取用户行为序列(点击的物品ID列表)
	GetUserSequence(ctx context.Context, userID string, maxLen int) ([]string, error)

	// GetAllItems 获取所有物品 ID 列表
	GetAllItems(ctx context.Context) ([]string, error)
}

Word2VecStore 是 Word2Vec 召回所需的存储接口。

type YouTubeDNNRecall

type YouTubeDNNRecall struct {
	FeatureService core.FeatureService
	Endpoint       string // HTTP 服务端点,例如 "http://localhost:8082/user_embedding"
	Timeout        time.Duration
	Client         *http.Client

	VectorService core.VectorService
	TopK          int
	Collection    string
	Metric        string

	UserFeatureExtractor feature.FeatureExtractor
	HistoryExtractor     *feature.HistoryExtractor
}

YouTubeDNNRecall 是基于 YouTube DNN 的召回源。

流程:用户特征 + 用户历史行为 -> /user_embedding -> 用户向量 -> 向量检索 Item。 使用前需训练并启动 YouTube DNN 服务,且 Item Embeddings 已导入 VectorService。

python train/train_youtube_dnn.py
uvicorn service.youtube_dnn_server:app --host 0.0.0.0 --port 8082

func (*YouTubeDNNRecall) Name

func (r *YouTubeDNNRecall) Name() string

func (*YouTubeDNNRecall) Recall

func (r *YouTubeDNNRecall) Recall(ctx context.Context, rctx *core.RecommendContext) ([]*core.Item, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL