engine

package
v0.0.0-...-a264c11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2025 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const PluginsSymbol = "Plugins"

PluginsSymbol is the symbol used to identify plugins in a Go plugin file.

Variables

这些切面在初始化期间通过 initBuiltinsAspects() 方法自动添加到规则引擎中。 如果提供了自定义切面,除非自定义列表中已存在相同类型的切面,否则仍会包含 内置切面。这确保基本功能始终可用,无需显式配置。

Registry is the default registry for rule engine components.

Functions

func NewChainAggregationEngine

func NewChainAggregationEngine(def []byte, opts ...types.EngineOption) (types.Engine, error)

func NewChainEngine

func NewChainEngine(def []byte, opts ...types.EngineOption) (types.Engine, error)

func NewConfig

func NewConfig(opts ...types.Option) types.Config

NewConfig creates a new Config and applies the options. It initializes all necessary components with sensible defaults.

NewConfig 创建新的配置并应用选项。 它使用合理的默认值初始化所有必要的组件。

Parameters: 参数:

  • opts: Optional configuration functions 可选的配置函数

Returns: 返回:

  • types.Config: Initialized configuration 已初始化的配置

Default components include: 默认组件包括:

  • JSON parser for rule chain definitions 规则链定义的 JSON 解析器
  • Default component registry with built-in components 包含内置组件的默认组件注册表
  • User-defined functions registry 用户定义函数注册表
  • Default cache implementation 默认缓存实现

func WithAspects

func WithAspects(aspects ...types.Aspect) types.EngineOption

WithAspects creates a RuleEngineOption to set the aspects of a RuleEngine. Aspects provide AOP (Aspect-Oriented Programming) capabilities for cross-cutting concerns like logging, metrics, validation, and debugging.

WithAspects 创建一个 RuleEngineOption 来设置 RuleEngine 的切面。 切面为日志记录、指标、验证和调试等横切关注点提供 AOP(面向切面编程)功能。

func WithConfig

func WithConfig(config types.Config) types.EngineOption

WithConfig is an option that sets the Config of the RuleEngine. WithConfig 是设置 RuleEngine 配置的选项。

Types

type ChainAggregationCtx

type ChainAggregationCtx struct {
	Aggregation types.ChainCtx
	// contains filtered or unexported fields
}

func InitChainAggregationCtx

func InitChainAggregationCtx(config types.Config, aspects types.AspectList, chainAggregationDef *types.ChainAggregation) (*ChainAggregationCtx, error)

func (*ChainAggregationCtx) Config

func (rc *ChainAggregationCtx) Config() types.Config

Config returns the configuration of the rule chain context

func (*ChainAggregationCtx) DSL

func (rc *ChainAggregationCtx) DSL() []byte

DSL returns the rule chain definition as a byte slice

func (*ChainAggregationCtx) Destroy

func (rc *ChainAggregationCtx) Destroy()

Destroy cleans up resources and executes destroy aspects

func (*ChainAggregationCtx) GetChainById

func (rc *ChainAggregationCtx) GetChainById(id string) (types.ChainCtx, bool)

GetNodeById retrieves a node context by its ID

func (*ChainAggregationCtx) Id

func (rc *ChainAggregationCtx) Id() string

GetNodeById retrieves a node context by its ID

func (*ChainAggregationCtx) Init

func (rc *ChainAggregationCtx) Init(_ types.Config, configuration types.Configuration) error

Init initializes the rule chain context

func (*ChainAggregationCtx) Name

func (rc *ChainAggregationCtx) Name() string

GetNodeById retrieves a node context by its ID

func (*ChainAggregationCtx) New

func (rc *ChainAggregationCtx) New() types.Node

New creates a new instance (not supported for RuleChainCtx)

func (*ChainAggregationCtx) OnMsg

func (rc *ChainAggregationCtx) OnMsg(ctx context.Context, msg types.RuleMsg) (string, error)

OnMsg processes incoming messages

func (*ChainAggregationCtx) TerminalOnErr

func (rc *ChainAggregationCtx) TerminalOnErr() bool

GetNodeById retrieves a node context by its ID

func (*ChainAggregationCtx) Type

func (rc *ChainAggregationCtx) Type() types.NodeType

Type returns the component type

type ChainAggregationEngine

type ChainAggregationEngine struct {
	// contains filtered or unexported fields
}

func (*ChainAggregationEngine) DSL

func (e *ChainAggregationEngine) DSL() []byte

DSL returns the current rule chain configuration in its original format. DSL 返回原始格式的当前规则链配置。

func (*ChainAggregationEngine) GetAspects

func (e *ChainAggregationEngine) GetAspects() types.AspectList

GetAspects returns a copy of the current aspects list to avoid data races. GetAspects 返回当前切面列表的副本以避免数据竞争。

func (*ChainAggregationEngine) GetMetrics

func (e *ChainAggregationEngine) GetMetrics() *metrics.EngineMetrics

GetMetrics returns engine metrics if the metrics aspect is enabled. GetMetrics 如果启用了指标切面,则返回引擎指标。

func (*ChainAggregationEngine) Id

Id 返回规则引擎实例的唯一标识符。

func (*ChainAggregationEngine) Name

func (e *ChainAggregationEngine) Name() string

Name 返回规则引擎实例的唯一标识符。

func (*ChainAggregationEngine) OnMsg

OnMsg asynchronously processes a message using the rule engine. It accepts optional RuleContextOption parameters to customize the execution context.

OnMsg 使用规则引擎异步处理消息。 它接受可选的 RuleContextOption 参数来自定义执行上下文。

func (*ChainAggregationEngine) ReloadSelf

func (e *ChainAggregationEngine) ReloadSelf(dsl []byte) error

ReloadSelf reloads the rule chain with new definition and options. This method supports hot reloading of rule configurations without stopping the engine. It implements a two-phase graceful reload process:

Phase 1: Preparation (设置阶段) - Apply configuration options 应用配置选项 - Wait for any ongoing reload to complete 等待任何正在进行的重载完成 - Set reloading state to block new messages 设置重载状态以阻塞新消息 - Wait for active messages to complete 等待活跃消息完成

Phase 2: Reload (重载阶段) - Parse new rule chain definition 解析新的规则链定义 - Update or create rule chain context 更新或创建规则链上下文 - Update atomic aspect pointers 更新原子切面指针 - Resume normal operation 恢复正常运行

ReloadSelf 使用新定义和选项重新加载规则链。 此方法支持在不停止引擎的情况下热重载规则配置。 它实现了两阶段优雅重载过程:

Parameters: 参数:

  • dsl: Rule chain definition in byte format 字节格式的规则链定义
  • opts: Optional configuration functions 可选的配置函数

Returns: 返回:

  • error: Reload error if any 如果有的话,重载错误

func (*ChainAggregationEngine) SetAspects

func (e *ChainAggregationEngine) SetAspects(aspects ...types.Aspect)

SetAspects 更新规则引擎使用的切面列表。 切面提供如日志和验证等横切功能。

func (*ChainAggregationEngine) SetConfig

func (e *ChainAggregationEngine) SetConfig(config types.Config)

SetConfig 更新规则引擎的配置。 为了获得最佳效果,应在初始化前调用。

func (*ChainAggregationEngine) Stop

func (e *ChainAggregationEngine) Stop()

Stop 关闭规则引擎并释放所有资源。 实现两阶段优雅停机策略:

func (*ChainAggregationEngine) TerminalOnErr

func (rc *ChainAggregationEngine) TerminalOnErr() bool

TerminalOnErr 是否出错推出

type ChainCtx

type ChainCtx struct {
	// contains filtered or unexported fields
}

func InitChainCtx

func InitChainCtx(config types.Config, aspects types.AspectList, chainDef *types.Chain) (*ChainCtx, error)

func (*ChainCtx) Config

func (rc *ChainCtx) Config() types.Config

Config returns the configuration of the rule chain context

func (*ChainCtx) DSL

func (rc *ChainCtx) DSL() []byte

DSL returns the rule chain definition as a byte slice

func (*ChainCtx) Destroy

func (rc *ChainCtx) Destroy()

Destroy cleans up resources and executes destroy aspects

func (*ChainCtx) GetNodeById

func (rc *ChainCtx) GetNodeById(id string) (types.NodeCtx, bool)

GetNodeById retrieves a node context by its ID

func (*ChainCtx) GetNodeRoutes

func (rc *ChainCtx) GetNodeRoutes(id string) ([]types.RuleNodeRelation, bool)

GetNodeRoutes retrieves the routes for a given node ID

func (*ChainCtx) Id

func (rc *ChainCtx) Id() string

GetNodeById retrieves a node context by its ID

func (*ChainCtx) Init

func (rc *ChainCtx) Init(_ types.Config, configuration types.Configuration) error

Init initializes the rule chain context

func (*ChainCtx) Name

func (rc *ChainCtx) Name() string

GetNodeById retrieves a node context by its ID

func (*ChainCtx) New

func (rc *ChainCtx) New() types.Node

New creates a new instance (not supported for RuleChainCtx)

func (*ChainCtx) OnMsg

func (rc *ChainCtx) OnMsg(ctx context.Context, msg types.RuleMsg) (string, error)

OnMsg processes incoming messages

func (*ChainCtx) TerminalOnErr

func (rc *ChainCtx) TerminalOnErr() bool

GetNodeById retrieves a node context by its ID

func (*ChainCtx) Type

func (rc *ChainCtx) Type() types.NodeType

Type returns the component type

type ChainEngine

type ChainEngine struct {
	// contains filtered or unexported fields
}

func (*ChainEngine) DSL

func (e *ChainEngine) DSL() []byte

DSL returns the current rule chain configuration in its original format. DSL 返回原始格式的当前规则链配置。

func (*ChainEngine) GetAspects

func (e *ChainEngine) GetAspects() types.AspectList

GetAspects returns a copy of the current aspects list to avoid data races. GetAspects 返回当前切面列表的副本以避免数据竞争。

func (*ChainEngine) Id

func (e *ChainEngine) Id() string

Id returns the unique identifier of the rule engine instance. Id 返回规则引擎实例的唯一标识符。

func (*ChainEngine) Name

func (rc *ChainEngine) Name() string

GetNodeById retrieves a node context by its ID

func (*ChainEngine) OnMsg

func (e *ChainEngine) OnMsg(ctx context.Context, msg types.RuleMsg) error

OnMsg asynchronously processes a message using the rule engine. It accepts optional RuleContextOption parameters to customize the execution context.

OnMsg 使用规则引擎异步处理消息。 它接受可选的 RuleContextOption 参数来自定义执行上下文。

func (*ChainEngine) ReloadSelf

func (e *ChainEngine) ReloadSelf(dsl []byte) error

ReloadSelf reloads the rule chain with new definition and options. This method supports hot reloading of rule configurations without stopping the engine. It implements a two-phase graceful reload process:

Phase 1: Preparation (设置阶段) - Apply configuration options 应用配置选项 - Wait for any ongoing reload to complete 等待任何正在进行的重载完成 - Set reloading state to block new messages 设置重载状态以阻塞新消息 - Wait for active messages to complete 等待活跃消息完成

Phase 2: Reload (重载阶段) - Parse new rule chain definition 解析新的规则链定义 - Update or create rule chain context 更新或创建规则链上下文 - Update atomic aspect pointers 更新原子切面指针 - Resume normal operation 恢复正常运行

ReloadSelf 使用新定义和选项重新加载规则链。 此方法支持在不停止引擎的情况下热重载规则配置。 它实现了两阶段优雅重载过程:

Parameters: 参数:

  • dsl: Rule chain definition in byte format 字节格式的规则链定义
  • opts: Optional configuration functions 可选的配置函数

Returns: 返回:

  • error: Reload error if any 如果有的话,重载错误

func (*ChainEngine) SetAspects

func (e *ChainEngine) SetAspects(aspects ...types.Aspect)

SetAspects updates the list of aspects used by the rule engine. Aspects provide cross-cutting functionality like logging and validation. SetAspects 更新规则引擎使用的切面列表。 切面提供如日志和验证等横切功能。

func (*ChainEngine) SetConfig

func (e *ChainEngine) SetConfig(config types.Config)

SetConfig updates the configuration of the rule engine. This should be called before initialization for best results. SetConfig 更新规则引擎的配置。 为了获得最佳效果,应在初始化前调用。

func (*ChainEngine) Stop

func (e *ChainEngine) Stop()

Stop 关闭规则引擎并释放所有资源。 实现两阶段优雅停机策略:

func (*ChainEngine) TerminalOnErr

func (rc *ChainEngine) TerminalOnErr() bool

GetNodeById retrieves a node context by its ID

type DefaultChainContext

type DefaultChainContext struct {
	// contains filtered or unexported fields
}

DefaultRuleContext is the default context for message processing in the rule engine.

func NewChainContext

func NewChainContext(self types.ChainCtx) *DefaultChainContext

NewRuleContext creates a new instance of the default rule engine message processing context.

func (*DefaultChainContext) From

func (ctx *DefaultChainContext) From() types.NodeCtx

func (*DefaultChainContext) GetSelfId

func (rCtx *DefaultChainContext) GetSelfId() string

func (*DefaultChainContext) Self

func (ctx *DefaultChainContext) Self() types.NodeCtx

func (*DefaultChainContext) Tell

func (rCtx *DefaultChainContext) Tell(ctx context.Context, msg types.RuleMsg, relationType string) error

func (*DefaultChainContext) TellNext

func (rCtx *DefaultChainContext) TellNext(ctx context.Context, msg types.RuleMsg, relationType string) error

type JsonParser

type JsonParser struct {
}

JsonParser Json

func (*JsonParser) DecodeChain

func (p *JsonParser) DecodeChain(chainDef []byte) (types.Chain, error)

DecodeRuleChain 通过json解析规则链结构体

func (*JsonParser) DecodeChainAggregation

func (p *JsonParser) DecodeChainAggregation(chainAggregationDef []byte) (types.ChainAggregation, error)

DecodeRuleChain 通过json解析规则链结构体

func (*JsonParser) DecodeRule

func (p *JsonParser) DecodeRule(ruleDef []byte) (types.BaseInfo, error)

DecodeRuleNode 通过json解析节点结构体

func (*JsonParser) EncodeChain

func (p *JsonParser) EncodeChain(def interface{}) ([]byte, error)

func (*JsonParser) EncodeChainAggregation

func (p *JsonParser) EncodeChainAggregation(def interface{}) ([]byte, error)

func (*JsonParser) EncodeRule

func (p *JsonParser) EncodeRule(def interface{}) ([]byte, error)

type RuleComponentRegistry

type RuleComponentRegistry struct {

	// RWMutex is a read/write mutex lock.
	sync.RWMutex
	// contains filtered or unexported fields
}

RuleComponentRegistry is a registry for rule engine components.

func (*RuleComponentRegistry) GetComponents

func (r *RuleComponentRegistry) GetComponents() map[types.NodeType]types.Node

GetComponents returns a map of all registered components.

func (*RuleComponentRegistry) NewNode

func (r *RuleComponentRegistry) NewNode(componentType types.NodeType) (types.Node, error)

NewNode creates a new instance of a rule engine node component by its type.

func (*RuleComponentRegistry) Register

func (r *RuleComponentRegistry) Register(node types.Node) error

Register adds a rule engine node component to the registry.

func (*RuleComponentRegistry) Unregister

func (r *RuleComponentRegistry) Unregister(componentType types.NodeType) error

Unregister removes a component from the registry by its type or plugin name.

type RuleNodeCtx

type RuleNodeCtx struct {
	// types.Node is the embedded node implementation providing the core functionality.
	// This embedding allows RuleNodeCtx to act as a node while adding wrapper capabilities.
	// types.Node 是嵌入的节点实现,提供核心功能。
	// 这种嵌入允许 RuleNodeCtx 在添加包装器功能的同时充当节点。
	types.Node
	// contains filtered or unexported fields
}

RuleNodeCtx represents an instance of a node component within the rule engine. It acts as a wrapper around the actual node implementation, providing additional context and metadata required for rule chain execution.

RuleNodeCtx 表示规则引擎中节点组件的实例。 它充当实际节点实现的包装器,提供规则链执行所需的额外上下文和元数据。

Architecture: 架构:

RuleNodeCtx embeds the types.Node interface, allowing it to act as both
a node wrapper and a node implementation. This design provides:
RuleNodeCtx 嵌入 types.Node 接口,允许它既充当节点包装器又充当节点实现。
此设计提供:
- Direct access to node methods through interface embedding  通过接口嵌入直接访问节点方法
- Additional context and configuration management  额外的上下文和配置管理
- Thread-safe operations with mutex protection  使用互斥锁保护的线程安全操作
- Hot reloading capabilities  热重载功能

func InitRuleNodeCtx

func InitRuleNodeCtx(config types.Config, chainCtx *ChainCtx, aspects types.AspectList, selfDefinition *types.BaseInfo) (*RuleNodeCtx, error)

InitRuleNodeCtx initializes a RuleNodeCtx with the given parameters. This is the standard initialization function for regular nodes without network resources.

InitRuleNodeCtx 使用给定参数初始化 RuleNodeCtx。 这是不带网络资源的常规节点的标准初始化函数。

Parameters: 参数:

  • config: Global rule engine configuration 全局规则引擎配置
  • chainCtx: Parent rule chain context 父规则链上下文
  • aspects: List of AOP aspects to apply 要应用的 AOP 切面列表
  • selfDefinition: Node definition and configuration 节点定义和配置

Returns: 返回:

  • *RuleNodeCtx: Initialized node context 已初始化的节点上下文
  • error: Initialization error if any 如果有的话,初始化错误

func (*RuleNodeCtx) Config

func (rn *RuleNodeCtx) Config() types.Config

Config returns the configuration of the rule engine.

func (*RuleNodeCtx) DSL

func (rn *RuleNodeCtx) DSL() []byte

DSL returns the DSL representation of the node.

func (*RuleNodeCtx) Destroy

func (rn *RuleNodeCtx) Destroy()

Destroy safely destroys the embedded node

func (*RuleNodeCtx) Id

func (rn *RuleNodeCtx) Id() string

GetNodeId returns the ID of the node.

func (*RuleNodeCtx) Name

func (rn *RuleNodeCtx) Name() string

GetNodeId returns the ID of the node.

func (*RuleNodeCtx) OnMsg

func (rn *RuleNodeCtx) OnMsg(ctx context.Context, msg types.RuleMsg) (string, error)

OnMsg 提供并发安全的消息处理,保护内嵌Node访问 OnMsg provides concurrent-safe message processing with protected access to the embedded Node. This method ensures thread safety during message processing by using read locks to protect against concurrent modifications during hot reloads.

OnMsg 提供并发安全的消息处理,通过使用读锁保护嵌入的 Node 访问。 此方法通过使用读锁防止热重载期间的并发修改,确保消息处理期间的线程安全。

Parameters: 参数:

  • ctx: Rule context for message processing 用于消息处理的规则上下文
  • msg: Message to be processed 要处理的消息

func (*RuleNodeCtx) ReloadSelf

func (rn *RuleNodeCtx) ReloadSelf(_ []byte) error

ReloadSelf reloads the node from a byte slice definition.

func (*RuleNodeCtx) TerminalOnErr

func (rc *RuleNodeCtx) TerminalOnErr() bool

GetNodeById retrieves a node context by its ID

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL