pipeline

package
v0.0.0-...-02f0996 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ParamPipelineInputCount  = "__pipeline_input_count__"
	ParamPipelineOutputCount = "__pipeline_output_count__"
	ParamNodeInputCountMap   = "__pipeline_node_input_count__"
	ParamNodeOutputCountMap  = "__pipeline_node_output_count__"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CompositeErrorHook

type CompositeErrorHook struct {
	Hooks []ErrorHook
}

CompositeErrorHook 组合多个 ErrorHook,依次调用。 任一子 hook 返回 recovered=true 即视为整体 recovered。

func (*CompositeErrorHook) OnNodeError

func (h *CompositeErrorHook) OnNodeError(ctx context.Context, rctx *core.RecommendContext, node Node, err error) bool

type Config

type Config struct {
	Pipeline struct {
		Name  string       `yaml:"name" json:"name"`
		Nodes []NodeConfig `yaml:"nodes" json:"nodes"`
	} `yaml:"pipeline" json:"pipeline"`
}

Config 是 Pipeline 的配置结构(支持 YAML/JSON)。

func LoadFromJSON

func LoadFromJSON(path string) (*Config, error)

LoadFromJSON 从 JSON 文件加载 Pipeline 配置。

func LoadFromYAML

func LoadFromYAML(path string) (*Config, error)

LoadFromYAML 从 YAML 文件加载 Pipeline 配置。

func (*Config) BuildPipeline

func (c *Config) BuildPipeline(factory *NodeFactory) (*Pipeline, error)

BuildPipeline 根据配置构建 Pipeline(需要 NodeFactory 注册 Node 构建器)。 注意:factory 应该在独立的 config 包中,避免循环依赖。

type Configurable

type Configurable interface {
	// ApplyConfig 在 Process 前被 Pipeline runner 调用。
	// Node 可从 rctx 中读取场景配置并调整自身参数。
	// 返回 error 时按 ErrorHook 策略处理(同 Process 错误)。
	ApplyConfig(ctx context.Context, rctx *core.RecommendContext) error
}

Configurable 是 Node 的可选接口,用于支持运行时动态配置注入。

当 Node 实现此接口时,Pipeline runner 在调用 Process 前会自动调用 ApplyConfig, 使 Node 有机会从 RecommendContext 中读取场景级覆盖参数并调整自身行为。

典型使用场景:

  • 多样性 Node 根据场景配置动态调整 DiversityKeys / MaxConsecutive
  • TopN Node 根据场景配置调整截断数量
  • 召回 Node 根据场景配置选择不同的分发策略

示例:

type MyNode struct { TopK int }

func (n *MyNode) ApplyConfig(ctx context.Context, rctx *core.RecommendContext) error {
    if cfg, ok := core.ExtensionAs[*SceneConfig](rctx, "scene_config"); ok {
        if override := cfg.GetTopK(n.Name()); override > 0 {
            n.TopK = override
        }
    }
    return nil
}

type ErrorCallbackHook

type ErrorCallbackHook struct {
	Callback func(ctx context.Context, node Node, err error)
}

ErrorCallbackHook 调用回调函数上报错误,自身不做降级(recovered 始终为 false)。 适合只接入 metrics/alerting、不改变执行流的场景。

func (*ErrorCallbackHook) OnNodeError

func (h *ErrorCallbackHook) OnNodeError(ctx context.Context, _ *core.RecommendContext, node Node, err error) bool

type ErrorHook

type ErrorHook interface {
	OnNodeError(ctx context.Context, rctx *core.RecommendContext, node Node, err error) (recovered bool)
}

ErrorHook 是 Pipeline 全局错误钩子,用于错误上报和降级控制。

当 Node 执行出错时,Pipeline 依次调用所有 ErrorHook(不论前一个返回什么):

  • 若任一 ErrorHook 返回 recovered=true,Pipeline 跳过该 Node 继续执行(使用上一步 items)
  • 若所有 ErrorHook 返回 recovered=false,Pipeline 终止并返回错误

所有 ErrorHook 都会被调用,便于 metrics/alerting 采集完整数据。

type Kind

type Kind string

Kind 用于标记 Node 类型,方便观测/治理/编排(例如按阶段打点)。

const (
	KindRecall      Kind = "recall"      // 召回阶段:生成候选集
	KindFilter      Kind = "filter"      // 过滤阶段:剔除不符合约束的候选
	KindFeature     Kind = "feature"     // 特征注入阶段:为候选补充特征(pre-rank)
	KindRank        Kind = "rank"        // 排序阶段:对候选打分并排序
	KindReRank      Kind = "rerank"      // 重排阶段:在排序结果上做多样性/业务调优
	KindPostProcess Kind = "postprocess" // 后处理阶段:补充特征或最终结果修饰
)

type KindRecoveryHook

type KindRecoveryHook struct {
	RecoverKinds map[Kind]bool
	// OnError 可选回调,在判断前触发(用于 metrics/alerting)。
	OnError func(node Node, err error)
}

KindRecoveryHook 按 Node Kind 决定是否降级。 仅当失败 Node 的 Kind 在 RecoverKinds 中时跳过,其余错误仍终止 Pipeline。

func (*KindRecoveryHook) OnNodeError

func (h *KindRecoveryHook) OnNodeError(_ context.Context, _ *core.RecommendContext, node Node, err error) bool

type Node

type Node interface {
	Name() string
	Kind() Kind

	Process(
		ctx context.Context,
		rctx *core.RecommendContext,
		items []*core.Item,
	) ([]*core.Item, error)
}

Node 是 Pipeline 的最小可扩展单元。 统一采用“输入 items -> 输出 items”的形态,方便 Recall 生成、Filter 截断、ReRank 重排等操作。

type NodeBuilder

type NodeBuilder func(map[string]interface{}) (Node, error)

NodeBuilder 是 Node 构建器函数类型。

type NodeConfig

type NodeConfig struct {
	Type   string                 `yaml:"type" json:"type"`     // recall.fanout / rank.lr / rerank.diversity 等
	Config map[string]interface{} `yaml:"config" json:"config"` // Node 特定配置
}

NodeConfig 是单个 Node 的配置。

type NodeFactory

type NodeFactory struct {
	// contains filtered or unexported fields
}

NodeFactory 用于根据配置构建 Node 实例。 支持线程安全的动态注册,用户可以在运行时注册自定义 Node 类型。

func NewNodeFactory

func NewNodeFactory() *NodeFactory

func (*NodeFactory) Build

func (f *NodeFactory) Build(nodeType string, config map[string]interface{}) (Node, error)

Build 根据类型和配置构建 Node(线程安全)。 若 nodeType 未注册,返回错误并附带已支持类型列表,便于排查配置错误。

func (*NodeFactory) ListRegisteredTypes

func (f *NodeFactory) ListRegisteredTypes() []string

ListRegisteredTypes 返回所有已注册的 Node 类型(线程安全)。

func (*NodeFactory) Register

func (f *NodeFactory) Register(nodeType string, builder NodeBuilder)

Register 注册 Node 构建器(线程安全)。 用户可以在运行时注册自定义 Node 类型,无需修改库代码。

示例:

factory := pipeline.NewNodeFactory()
factory.Register("my.custom.node", func(config map[string]interface{}) (pipeline.Node, error) {
    // 构建自定义 Node
    return &MyCustomNode{}, nil
})

func (*NodeFactory) Unregister

func (f *NodeFactory) Unregister(nodeType string)

Unregister 取消注册 Node 构建器(线程安全)。

type Pipeline

type Pipeline struct {
	Nodes []Node
	Hooks []PipelineHook // Hook 列表,按顺序执行

	// ErrorHooks 全局错误钩子列表。
	// 当 Node(或 PipelineHook)执行出错时,Pipeline 依次调用所有 ErrorHook:
	//   - 若任一 ErrorHook 返回 recovered=true,跳过该 Node 继续执行(使用上一步 items)
	//   - 若全部返回 false,Pipeline 终止并返回错误
	// 未配置 ErrorHooks 时行为与之前完全一致(fail-fast)。
	ErrorHooks []ErrorHook
}

Pipeline 是 Reckit 的核心抽象:把推荐逻辑拆成可组合的 Node 链。 支持 Hook 机制,允许用户插入中间件功能。

func (*Pipeline) Run

func (p *Pipeline) Run(
	ctx context.Context,
	rctx *core.RecommendContext,
	items []*core.Item,
) ([]*core.Item, error)

Run 执行 Pipeline,依次处理每个 Node。 如果设置了 Hooks,会在每个 Node 执行前后调用相应的 Hook。 如果设置了 ErrorHooks,Node 出错时会尝试降级(跳过该 Node)。

type PipelineHook

type PipelineHook interface {
	// BeforeNode 在 Node 执行前调用
	// 可以修改 items 或 context,返回修改后的 items
	// 如果返回 error,Pipeline 将中断执行
	BeforeNode(ctx context.Context, rctx *core.RecommendContext, node Node, items []*core.Item) ([]*core.Item, error)

	// AfterNode 在 Node 执行后调用
	// 可以修改 items 或记录执行结果,返回修改后的 items
	// 如果返回 error,Pipeline 将中断执行
	AfterNode(ctx context.Context, rctx *core.RecommendContext, node Node, items []*core.Item, err error) ([]*core.Item, error)
}

PipelineHook 是 Pipeline 执行过程中的 Hook 接口,用于实现中间件功能。 用户可以实现此接口来添加日志、监控、缓存、性能分析等功能。

type StatsHook

type StatsHook struct{}

StatsHook 在 pipeline 执行阶段记录节点输入/输出条数到 rctx.Params。 适合作为通用观测模板,业务可基于这些统计打日志或指标。

func (*StatsHook) AfterNode

func (h *StatsHook) AfterNode(_ context.Context, rctx *core.RecommendContext, node Node, items []*core.Item, err error) ([]*core.Item, error)

func (*StatsHook) BeforeNode

func (h *StatsHook) BeforeNode(_ context.Context, rctx *core.RecommendContext, node Node, items []*core.Item) ([]*core.Item, error)

type WarnAndSkipHook

type WarnAndSkipHook struct {
	// Writer 日志输出目标,nil 时使用 os.Stderr。
	Writer io.Writer
}

WarnAndSkipHook 记录日志并跳过失败的 Node(始终降级)。 适用于非关键 Node,如 feature.enrich、rerank.diversity 等。

func (*WarnAndSkipHook) OnNodeError

func (h *WarnAndSkipHook) OnNodeError(_ context.Context, _ *core.RecommendContext, node Node, err error) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL