Documentation
¶
Index ¶
Constants ¶
const ( ParamPipelineInputCount = "__pipeline_input_count__" ParamPipelineOutputCount = "__pipeline_output_count__" ParamNodeInputCountMap = "__pipeline_node_input_count__" ParamNodeOutputCountMap = "__pipeline_node_output_count__" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CompositeErrorHook ¶
type CompositeErrorHook struct {
Hooks []ErrorHook
}
CompositeErrorHook 组合多个 ErrorHook,依次调用。 任一子 hook 返回 recovered=true 即视为整体 recovered。
func (*CompositeErrorHook) OnNodeError ¶
func (h *CompositeErrorHook) OnNodeError(ctx context.Context, rctx *core.RecommendContext, node Node, err error) bool
type Config ¶
type Config struct {
Pipeline struct {
Name string `yaml:"name" json:"name"`
Nodes []NodeConfig `yaml:"nodes" json:"nodes"`
} `yaml:"pipeline" json:"pipeline"`
}
Config 是 Pipeline 的配置结构(支持 YAML/JSON)。
func LoadFromJSON ¶
LoadFromJSON 从 JSON 文件加载 Pipeline 配置。
func LoadFromYAML ¶
LoadFromYAML 从 YAML 文件加载 Pipeline 配置。
func (*Config) BuildPipeline ¶
func (c *Config) BuildPipeline(factory *NodeFactory) (*Pipeline, error)
BuildPipeline 根据配置构建 Pipeline(需要 NodeFactory 注册 Node 构建器)。 注意:factory 应该在独立的 config 包中,避免循环依赖。
type Configurable ¶
type Configurable interface {
// ApplyConfig 在 Process 前被 Pipeline runner 调用。
// Node 可从 rctx 中读取场景配置并调整自身参数。
// 返回 error 时按 ErrorHook 策略处理(同 Process 错误)。
ApplyConfig(ctx context.Context, rctx *core.RecommendContext) error
}
Configurable 是 Node 的可选接口,用于支持运行时动态配置注入。
当 Node 实现此接口时,Pipeline runner 在调用 Process 前会自动调用 ApplyConfig, 使 Node 有机会从 RecommendContext 中读取场景级覆盖参数并调整自身行为。
典型使用场景:
- 多样性 Node 根据场景配置动态调整 DiversityKeys / MaxConsecutive
- TopN Node 根据场景配置调整截断数量
- 召回 Node 根据场景配置选择不同的分发策略
示例:
type MyNode struct { TopK int }
func (n *MyNode) ApplyConfig(ctx context.Context, rctx *core.RecommendContext) error {
if cfg, ok := core.ExtensionAs[*SceneConfig](rctx, "scene_config"); ok {
if override := cfg.GetTopK(n.Name()); override > 0 {
n.TopK = override
}
}
return nil
}
type ErrorCallbackHook ¶
ErrorCallbackHook 调用回调函数上报错误,自身不做降级(recovered 始终为 false)。 适合只接入 metrics/alerting、不改变执行流的场景。
func (*ErrorCallbackHook) OnNodeError ¶
func (h *ErrorCallbackHook) OnNodeError(ctx context.Context, _ *core.RecommendContext, node Node, err error) bool
type ErrorHook ¶
type ErrorHook interface {
OnNodeError(ctx context.Context, rctx *core.RecommendContext, node Node, err error) (recovered bool)
}
ErrorHook 是 Pipeline 全局错误钩子,用于错误上报和降级控制。
当 Node 执行出错时,Pipeline 依次调用所有 ErrorHook(不论前一个返回什么):
- 若任一 ErrorHook 返回 recovered=true,Pipeline 跳过该 Node 继续执行(使用上一步 items)
- 若所有 ErrorHook 返回 recovered=false,Pipeline 终止并返回错误
所有 ErrorHook 都会被调用,便于 metrics/alerting 采集完整数据。
type Kind ¶
type Kind string
Kind 用于标记 Node 类型,方便观测/治理/编排(例如按阶段打点)。
const ( KindRecall Kind = "recall" // 召回阶段:生成候选集 KindFilter Kind = "filter" // 过滤阶段:剔除不符合约束的候选 KindFeature Kind = "feature" // 特征注入阶段:为候选补充特征(pre-rank) KindRank Kind = "rank" // 排序阶段:对候选打分并排序 KindReRank Kind = "rerank" // 重排阶段:在排序结果上做多样性/业务调优 KindPostProcess Kind = "postprocess" // 后处理阶段:补充特征或最终结果修饰 )
type KindRecoveryHook ¶
type KindRecoveryHook struct {
RecoverKinds map[Kind]bool
// OnError 可选回调,在判断前触发(用于 metrics/alerting)。
OnError func(node Node, err error)
}
KindRecoveryHook 按 Node Kind 决定是否降级。 仅当失败 Node 的 Kind 在 RecoverKinds 中时跳过,其余错误仍终止 Pipeline。
func (*KindRecoveryHook) OnNodeError ¶
func (h *KindRecoveryHook) OnNodeError(_ context.Context, _ *core.RecommendContext, node Node, err error) bool
type Node ¶
type Node interface {
Name() string
Kind() Kind
Process(
ctx context.Context,
rctx *core.RecommendContext,
items []*core.Item,
) ([]*core.Item, error)
}
Node 是 Pipeline 的最小可扩展单元。 统一采用“输入 items -> 输出 items”的形态,方便 Recall 生成、Filter 截断、ReRank 重排等操作。
type NodeBuilder ¶
NodeBuilder 是 Node 构建器函数类型。
type NodeConfig ¶
type NodeConfig struct {
Type string `yaml:"type" json:"type"` // recall.fanout / rank.lr / rerank.diversity 等
Config map[string]interface{} `yaml:"config" json:"config"` // Node 特定配置
}
NodeConfig 是单个 Node 的配置。
type NodeFactory ¶
type NodeFactory struct {
// contains filtered or unexported fields
}
NodeFactory 用于根据配置构建 Node 实例。 支持线程安全的动态注册,用户可以在运行时注册自定义 Node 类型。
func NewNodeFactory ¶
func NewNodeFactory() *NodeFactory
func (*NodeFactory) Build ¶
func (f *NodeFactory) Build(nodeType string, config map[string]interface{}) (Node, error)
Build 根据类型和配置构建 Node(线程安全)。 若 nodeType 未注册,返回错误并附带已支持类型列表,便于排查配置错误。
func (*NodeFactory) ListRegisteredTypes ¶
func (f *NodeFactory) ListRegisteredTypes() []string
ListRegisteredTypes 返回所有已注册的 Node 类型(线程安全)。
func (*NodeFactory) Register ¶
func (f *NodeFactory) Register(nodeType string, builder NodeBuilder)
Register 注册 Node 构建器(线程安全)。 用户可以在运行时注册自定义 Node 类型,无需修改库代码。
示例:
factory := pipeline.NewNodeFactory()
factory.Register("my.custom.node", func(config map[string]interface{}) (pipeline.Node, error) {
// 构建自定义 Node
return &MyCustomNode{}, nil
})
func (*NodeFactory) Unregister ¶
func (f *NodeFactory) Unregister(nodeType string)
Unregister 取消注册 Node 构建器(线程安全)。
type Pipeline ¶
type Pipeline struct {
Nodes []Node
Hooks []PipelineHook // Hook 列表,按顺序执行
// ErrorHooks 全局错误钩子列表。
// 当 Node(或 PipelineHook)执行出错时,Pipeline 依次调用所有 ErrorHook:
// - 若任一 ErrorHook 返回 recovered=true,跳过该 Node 继续执行(使用上一步 items)
// - 若全部返回 false,Pipeline 终止并返回错误
// 未配置 ErrorHooks 时行为与之前完全一致(fail-fast)。
ErrorHooks []ErrorHook
}
Pipeline 是 Reckit 的核心抽象:把推荐逻辑拆成可组合的 Node 链。 支持 Hook 机制,允许用户插入中间件功能。
type PipelineHook ¶
type PipelineHook interface {
// BeforeNode 在 Node 执行前调用
// 可以修改 items 或 context,返回修改后的 items
// 如果返回 error,Pipeline 将中断执行
BeforeNode(ctx context.Context, rctx *core.RecommendContext, node Node, items []*core.Item) ([]*core.Item, error)
// AfterNode 在 Node 执行后调用
// 可以修改 items 或记录执行结果,返回修改后的 items
// 如果返回 error,Pipeline 将中断执行
AfterNode(ctx context.Context, rctx *core.RecommendContext, node Node, items []*core.Item, err error) ([]*core.Item, error)
}
PipelineHook 是 Pipeline 执行过程中的 Hook 接口,用于实现中间件功能。 用户可以实现此接口来添加日志、监控、缓存、性能分析等功能。
type StatsHook ¶
type StatsHook struct{}
StatsHook 在 pipeline 执行阶段记录节点输入/输出条数到 rctx.Params。 适合作为通用观测模板,业务可基于这些统计打日志或指标。
type WarnAndSkipHook ¶
WarnAndSkipHook 记录日志并跳过失败的 Node(始终降级)。 适用于非关键 Node,如 feature.enrich、rerank.diversity 等。
func (*WarnAndSkipHook) OnNodeError ¶
func (h *WarnAndSkipHook) OnNodeError(_ context.Context, _ *core.RecommendContext, node Node, err error) bool