engine

package
v0.0.0-...-56eedac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2025 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Overview

Package engine provides the core functionality for the RuleGo rule engine. It includes implementations for rule contexts, rule engines, and related components that enable the execution and management of rule chains.

Package engine 提供 RuleGo 规则引擎的核心功能。 它包括规则上下文、规则引擎和相关组件的实现, 这些组件支持规则链的执行和管理。

The engine package is responsible for: engine 包负责:

  • Defining and managing rule contexts (DefaultRuleContext) 定义和管理规则上下文(DefaultRuleContext)
  • Implementing the main rule engine (RuleEngine) 实现主要的规则引擎(RuleEngine)
  • Handling rule chain execution and flow control 处理规则链执行和流程控制
  • Managing built-in aspects and extensions 管理内置切面和扩展
  • Providing utilities for rule processing and message handling 提供规则处理和消息处理的工具

Key Components: 关键组件:

  • RuleEngine: Main engine instance managing rule chain execution RuleEngine:管理规则链执行的主引擎实例
  • RuleChainCtx: Context for individual rule chains RuleChainCtx:单个规则链的上下文
  • DefaultRuleContext: Context for message processing within rule chains DefaultRuleContext:规则链内消息处理的上下文
  • RuleNodeCtx: Context wrapper for individual node components RuleNodeCtx:单个节点组件的上下文包装器

Architecture Overview: 架构概述:

The engine follows a hierarchical structure where a RuleEngine contains
one root RuleChainCtx, which manages multiple RuleNodeCtx instances.
Message processing flows through DefaultRuleContext instances that
coordinate between nodes and handle aspect-oriented programming features.

引擎遵循分层结构,其中 RuleEngine 包含一个根 RuleChainCtx,
该上下文管理多个 RuleNodeCtx 实例。消息处理通过 DefaultRuleContext
实例流转,这些实例在节点之间协调并处理面向切面编程功能。

This package is central to the RuleGo framework, offering the primary mechanisms for rule-based processing and decision making in various applications.

此包是 RuleGo 框架的核心,为各种应用程序中基于规则的处理和决策制定 提供主要机制。

Index

Constants

View Source
const PluginsSymbol = "Plugins"

PluginsSymbol is the symbol used to identify plugins in a Go plugin file.

Variables

View Source
var BuiltinsAspects = []types.Aspect{&aspect.Validator{}, &aspect.Debug{}, &aspect.MetricsAspect{}}

BuiltinsAspects holds a list of built-in aspects for the rule engine. These aspects provide essential cross-cutting functionality and are automatically integrated into every rule engine instance to ensure consistent behavior.

BuiltinsAspects 保存规则引擎的内置切面列表。 这些切面提供基本的横切功能,并自动集成到每个规则引擎实例中以确保一致的行为。

Built-in Aspects: 内置切面:

  • Validator: Validates node configurations and rule chain definitions before execution to prevent runtime errors. 验证器:在执行前验证节点配置和规则链定义,以防止运行时错误。

  • Debug: Provides debugging capabilities including execution tracing, state inspection, and development-time diagnostics. 调试器:提供调试功能,包括执行跟踪、状态检查和开发时诊断。

  • MetricsAspect: Collects performance metrics, execution statistics, and operational data for monitoring and observability. 指标切面:收集性能指标、执行统计和运营数据,用于监控和可观察性。

Automatic Integration: 自动集成:

These aspects are automatically added to the rule engine during initialization
via the initBuiltinsAspects() method. If custom aspects are provided, the
built-in aspects are still included unless an aspect of the same type already
exists in the custom list. This ensures that essential functionality is always
available without requiring explicit configuration.

这些切面在初始化期间通过 initBuiltinsAspects() 方法自动添加到规则引擎中。
如果提供了自定义切面,除非自定义列表中已存在相同类型的切面,否则仍会包含
内置切面。这确保基本功能始终可用,无需显式配置。
View Source
var DefaultPool = &Pool{}

DefaultPool is the default global instance of the rule engine pool. It provides a singleton pool for managing rule engine instances across the application. DefaultPool 是规则引擎池的默认全局实例。 它提供单例池来管理整个应用程序中的规则引擎实例。

View Source
var ErrDSLEmpty = errors.New("dsl is empty")

ErrDSLEmpty dsl is empty

View Source
var ErrRuleEnginePoolNil = errors.New("rule engine pool is nil")

ErrRuleEnginePoolNil rule engine pool is nil

Registry is the default registry for rule engine components.

Functions

func Del

func Del(id string)

Del deletes a specified ID rule engine instance from the default rule chain pool.

Del 从默认规则链池中删除指定的 ID 规则引擎实例。

Parameters: 参数:

  • id: ID of the rule engine 规则引擎的 ID

Usage: 使用:

Del("engine1")

func Get

func Get(id string) (types.RuleEngine, bool)

Get retrieves a specified ID rule engine instance from the default rule chain pool.

Get 从默认规则链池中检索指定的 ID 规则引擎实例。

Parameters: 参数:

  • id: ID of the rule engine 规则引擎的 ID

Returns: 返回:

  • types.RuleEngine: Rule engine instance 规则引擎实例
  • bool: Existence of the rule engine 规则引擎的存在

Usage: 使用:

engine, exists := Get("engine1")

func Load

func Load(folderPath string, opts ...types.RuleEngineOption) error

Load loads all rule chain configurations from the specified folder and its subfolders into the default rule engine instance pool. The rule chain ID is taken from the configuration file's ruleChain.id.

Load 从指定文件夹及其子文件夹加载所有规则链配置到默认规则引擎实例池中。 规则链 ID 取自配置文件的 ruleChain.id。

Parameters: 参数:

  • folderPath: Path to the folder containing rule chain files 包含规则链文件的文件夹路径
  • opts: Optional configuration functions for the rule engines 规则引擎的可选配置函数

Returns: 返回:

  • error: Loading error if any 如果有的话,加载错误

Usage: 使用:

err := Load("path/to/rulechains", types.WithRuleEnginePool(pool))

func New

func New(id string, rootRuleChainSrc []byte, opts ...types.RuleEngineOption) (types.RuleEngine, error)

New creates a new RuleEngine and stores it in the default rule chain pool.

New 创建新的规则引擎并将其存储在默认规则链池中。

Parameters: 参数:

  • id: ID of the rule engine 规则引擎的 ID
  • rootRuleChainSrc: Raw bytes of the rule chain 规则链的原始字节
  • opts: Optional configuration functions for the rule engine 规则引擎的可选配置函数

Returns: 返回:

  • types.RuleEngine: New rule engine instance 新的规则引擎实例
  • error: Loading error if any 如果有的话,加载错误

Usage: 使用:

engine, err := New("engine1", ruleChainBytes)

func NewConfig

func NewConfig(opts ...types.Option) types.Config

NewConfig creates a new Config and applies the options. It initializes all necessary components with sensible defaults.

NewConfig 创建新的配置并应用选项。 它使用合理的默认值初始化所有必要的组件。

Parameters: 参数:

  • opts: Optional configuration functions 可选的配置函数

Returns: 返回:

  • types.Config: Initialized configuration 已初始化的配置

Default components include: 默认组件包括:

  • JSON parser for rule chain definitions 规则链定义的 JSON 解析器
  • Default component registry with built-in components 包含内置组件的默认组件注册表
  • User-defined functions registry 用户定义函数注册表
  • Default cache implementation 默认缓存实现

func NewCustomComponentRegistry

func NewCustomComponentRegistry(defaultComponents, customComponents types.ComponentRegistry) types.ComponentRegistry

NewCustomComponentRegistry creates a composite registry that checks custom components first, then falls back to default components. Parameters: - defaultComponents: base registry with pre-defined components - customComponents: registry for user-defined components Returns a ComponentRegistry that combines both sources

func OnMsg

func OnMsg(msg types.RuleMsg)

OnMsg calls all rule engine instances in the default rule chain pool to process a message. All rule chains in the rule engine instance pool will attempt to process the message.

OnMsg 调用默认规则链池中的所有规则引擎实例来处理消息。 所有规则引擎实例池中的所有规则链将尝试处理消息。

Parameters: 参数:

  • msg: Rule message to be processed 要处理的消息

Usage: 使用:

OnMsg(ruleMsg)

func Range

func Range(f func(key, value any) bool)

Range iterates over all rule engine instances in the default rule chain pool.

Range 遍历默认规则链池中的所有规则引擎实例。

Parameters: 参数:

  • f: Function to apply to each rule engine instance 要应用于每个规则引擎实例的函数

Usage: 使用:

Range(func(key, value any) bool {
  // Use key and value as needed
  return true
})

func Reload

func Reload(opts ...types.RuleEngineOption)

Reload reloads all rule engine instances in the default rule chain pool.

Reload 重新加载默认规则链池中的所有规则引擎实例。

Parameters: 参数:

  • opts: Optional configuration functions for the rule engines 规则引擎的可选配置函数

Usage: 使用:

Reload(types.WithRuleEnginePool(pool))

func Stop

func Stop()

Stop releases all rule engine instances in the default rule chain pool.

Stop 释放默认规则链池中的所有规则引擎实例。

Usage: 使用:

Stop()

func WithConfig

func WithConfig(config types.Config) types.RuleEngineOption

WithConfig is an option that sets the Config of the RuleEngine. WithConfig 是设置 RuleEngine 配置的选项。

Types

type ContextObserver

type ContextObserver struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

ContextObserver tracks the execution state of nodes in the rule chain.

type CustomComponentRegistry

type CustomComponentRegistry struct {
	// contains filtered or unexported fields
}

CustomComponentRegistry combines default and custom component registries

func (*CustomComponentRegistry) CustomComponents

func (r *CustomComponentRegistry) CustomComponents() types.ComponentRegistry

func (*CustomComponentRegistry) DefaultComponents

func (r *CustomComponentRegistry) DefaultComponents() types.ComponentRegistry

func (*CustomComponentRegistry) GetComponentForms

func (r *CustomComponentRegistry) GetComponentForms() types.ComponentFormList

GetComponentForms returns combined component metadata Includes forms from both default and custom components

func (*CustomComponentRegistry) GetComponents

func (r *CustomComponentRegistry) GetComponents() map[string]types.Node

GetComponents returns merged view of all components: Default components are overridden by custom components with same type

func (*CustomComponentRegistry) NewNode

func (r *CustomComponentRegistry) NewNode(componentType string) (types.Node, error)

NewNode creates a component instance with fallback logic: 1. First tries to create from default components 2. If fails, attempts to create from custom components Returns: - component instance if found in either registry - error if not found in both registries

func (*CustomComponentRegistry) Register

func (r *CustomComponentRegistry) Register(node types.Node) error

Register adds a custom component to the registry Returns error if component type already exists

func (*CustomComponentRegistry) RegisterPlugin

func (r *CustomComponentRegistry) RegisterPlugin(name string, file string) error

RegisterPlugin loads a plugin containing components name: plugin name for dependency tracking file: plugin file path (dynamic library)

func (*CustomComponentRegistry) Unregister

func (r *CustomComponentRegistry) Unregister(componentType string) error

Unregister removes a component from the custom registry Returns error if component not found

type DefaultRuleContext

type DefaultRuleContext struct {
	// contains filtered or unexported fields
}

DefaultRuleContext is the default context for message processing in the rule engine.

func NewRuleContext

func NewRuleContext(context context.Context, config types.Config, ruleChainCtx *RuleChainCtx, from types.NodeCtx, self types.NodeCtx, pool types.Pool, onEnd types.OnEndFunc, ruleChainPool types.RuleEnginePool) *DefaultRuleContext

NewRuleContext creates a new instance of the default rule engine message processing context.

func (*DefaultRuleContext) ChainCache

func (ctx *DefaultRuleContext) ChainCache() types.Cache

func (*DefaultRuleContext) Config

func (ctx *DefaultRuleContext) Config() types.Config

func (*DefaultRuleContext) DoOnEnd

func (ctx *DefaultRuleContext) DoOnEnd(msg types.RuleMsg, err error, relationType string)

DoOnEnd 结束规则链分支执行,触发 OnEnd 回调函数

func (*DefaultRuleContext) From

func (ctx *DefaultRuleContext) From() types.NodeCtx

func (*DefaultRuleContext) GetCallbackFunc

func (ctx *DefaultRuleContext) GetCallbackFunc(functionName string) interface{}

func (*DefaultRuleContext) GetContext

func (ctx *DefaultRuleContext) GetContext() context.Context

func (*DefaultRuleContext) GetEndFunc

func (ctx *DefaultRuleContext) GetEndFunc() types.OnEndFunc

func (*DefaultRuleContext) GetEnv

func (ctx *DefaultRuleContext) GetEnv(msg types.RuleMsg, useMetadata bool) map[string]interface{}

GetEnv 获取环境变量和元数据

func (*DefaultRuleContext) GetErr

func (ctx *DefaultRuleContext) GetErr() error

func (*DefaultRuleContext) GetOut

func (ctx *DefaultRuleContext) GetOut() types.RuleMsg

func (*DefaultRuleContext) GetRuleChainPool

func (ctx *DefaultRuleContext) GetRuleChainPool() types.RuleEnginePool

GetRuleChainPool 获取子规则链池

func (*DefaultRuleContext) GetSelfId

func (ctx *DefaultRuleContext) GetSelfId() string

func (*DefaultRuleContext) GlobalCache

func (ctx *DefaultRuleContext) GlobalCache() types.Cache

func (*DefaultRuleContext) IsDebugMode

func (ctx *DefaultRuleContext) IsDebugMode() bool

IsDebugMode 是否调试模式,优先使用规则链指定的调试模式

func (*DefaultRuleContext) NewMsg

func (ctx *DefaultRuleContext) NewMsg(msgType string, metaData *types.Metadata, data string) types.RuleMsg

func (*DefaultRuleContext) NewNextNodeRuleContext

func (ctx *DefaultRuleContext) NewNextNodeRuleContext(nextNode types.NodeCtx) *DefaultRuleContext

NewNextNodeRuleContext creates a new instance of RuleContext for the next node in the rule engine.

func (*DefaultRuleContext) OnDebug

func (ctx *DefaultRuleContext) OnDebug(ruleChainId string, flowType string, nodeId string, msg types.RuleMsg, relationType string, err error)

func (*DefaultRuleContext) RuleChain

func (ctx *DefaultRuleContext) RuleChain() types.NodeCtx

func (*DefaultRuleContext) Self

func (ctx *DefaultRuleContext) Self() types.NodeCtx

func (*DefaultRuleContext) SetCallbackFunc

func (ctx *DefaultRuleContext) SetCallbackFunc(functionName string, f interface{})

func (*DefaultRuleContext) SetContext

func (ctx *DefaultRuleContext) SetContext(c context.Context) types.RuleContext

func (*DefaultRuleContext) SetEndFunc

func (ctx *DefaultRuleContext) SetEndFunc(onEndFunc types.OnEndFunc) types.RuleContext

func (*DefaultRuleContext) SetExecuteNode

func (ctx *DefaultRuleContext) SetExecuteNode(nodeId string, relationTypes ...string)

func (*DefaultRuleContext) SetOnAllNodeCompleted

func (ctx *DefaultRuleContext) SetOnAllNodeCompleted(onAllNodeCompleted func())

SetOnAllNodeCompleted 设置所有节点执行完回调

func (*DefaultRuleContext) SetRuleChainPool

func (ctx *DefaultRuleContext) SetRuleChainPool(ruleChainPool types.RuleEnginePool)

SetRuleChainPool 设置子规则链池

func (*DefaultRuleContext) SubmitTack deprecated

func (ctx *DefaultRuleContext) SubmitTack(task func())

Deprecated: Use Flow SubmitTask instead.

func (*DefaultRuleContext) SubmitTask

func (ctx *DefaultRuleContext) SubmitTask(task func())

func (*DefaultRuleContext) TellChainNode

func (ctx *DefaultRuleContext) TellChainNode(chanCtx context.Context, ruleChainId, nodeId string, msg types.RuleMsg, skipTellNext bool, onEnd types.OnEndFunc, onAllNodeCompleted func())

func (*DefaultRuleContext) TellCollect

func (ctx *DefaultRuleContext) TellCollect(msg types.RuleMsg, callback func(msgList []types.WrapperMsg)) bool

func (*DefaultRuleContext) TellFailure

func (ctx *DefaultRuleContext) TellFailure(msg types.RuleMsg, err error)

func (*DefaultRuleContext) TellFlow

func (ctx *DefaultRuleContext) TellFlow(ruleChainId string, msg types.RuleMsg, opts ...types.RuleContextOption)

TellFlow 执行子规则链,ruleChainId 规则链ID onEndFunc 子规则链链分支执行完的回调,并返回该链执行结果,如果同时触发多个分支链,则会调用多次 onAllNodeCompleted 所以节点执行完触发,无结果返回 如果找不到规则链,并把消息通过`Failure`关系发送到下一个节点

func (*DefaultRuleContext) TellNext

func (ctx *DefaultRuleContext) TellNext(msg types.RuleMsg, relationTypes ...string)

func (*DefaultRuleContext) TellNextOrElse

func (ctx *DefaultRuleContext) TellNextOrElse(msg types.RuleMsg, defaultRelationType string, relationTypes ...string)

func (*DefaultRuleContext) TellNode

func (ctx *DefaultRuleContext) TellNode(chanCtx context.Context, nodeId string, msg types.RuleMsg, skipTellNext bool, onEnd types.OnEndFunc, onAllNodeCompleted func())

TellNode 从指定节点开始执行,如果 skipTellNext=true 则只执行当前节点,不通知下一个节点。 onEnd 查看获得最终执行结果 onAllNodeCompleted 所以节点执行完触发,无结果返回

func (*DefaultRuleContext) TellSelf

func (ctx *DefaultRuleContext) TellSelf(msg types.RuleMsg, delayMs int64)

func (*DefaultRuleContext) TellSuccess

func (ctx *DefaultRuleContext) TellSuccess(msg types.RuleMsg)

type DynamicNode

type DynamicNode struct {
	//ComponentType 组件类型
	ComponentType string
	//Dsl 子规则链 DSL
	Dsl string
	// contains filtered or unexported fields
}

DynamicNode 通过子规则链动态定义节点组件 ruleChain.id: 定义组件类型 ruleChain.name: 定义组件label ruleChain.additionalInfo.category: 定义组件分类 ruleChain.additionalInfo.icon: 定义组件图标 ruleChain.additionalInfo.description: 定义组件描述 ruleChain.additionalInfo.inputSchema: 使用JSON Schema 定义组件的输入参数(组件参数配置) ruleChain.additionalInfo.relationTypes: 定义和下一个节点允许连接关系类型 组件通过 ${vars.xx} 方式获取组件配置参数 使用示例: 通过dsl定义组件: dynamicNode := NewDynamicNode("fahrenheit", `

		 {
		 "ruleChain": {
		   "id": "fahrenheit",
		   "name": "华氏温度转换",
		   "debugMode": false,
		   "root": false,
		   "additionalInfo": {
		     "layoutX": 720,
		     "layoutY": 260,
	         "description":"this is a description",
		     "relationTypes":["Success","Failure"],
		     "inputSchema": {
		       "type": "object",
		       "properties": {
		         "scaleFactor": {
		           "type": "number",
                  "title": "换算系数",
                  "default": 1.8
		         }
		       },
		       "required": ["scaleFactor"]
		     }

		   }
		 },
		 "metadata": {
		   "firstNodeIndex": 0,
		   "nodes": [
		     {
		       "id": "s2",
		       "type": "jsTransform",
		       "name": "摄氏温度转华氏温度",
		       "debugMode": true,
		       "configuration": {
		         "jsScript": "var newMsg={'temperature': msg.temperature*vars.scaleFactor+32};\n return {'msg':newMsg,'metadata':metadata,'msgType':msgType};"
		       }
		     }
		   ],
		   "connections": [
		     {
		     }
		   ]
		 }
		}

	`)
	注册组件
	Registry.Register(dynamicNode)

func NewDynamicNode

func NewDynamicNode(componentType, componentDsl string) *DynamicNode

func (*DynamicNode) Def

func (x *DynamicNode) Def() types.ComponentForm

Def 组件定义

func (*DynamicNode) Destroy

func (x *DynamicNode) Destroy()

Destroy 销毁

func (*DynamicNode) Init

func (x *DynamicNode) Init(ruleConfig types.Config, configuration types.Configuration) error

Init 初始化

func (*DynamicNode) New

func (x *DynamicNode) New() types.Node

func (*DynamicNode) OnMsg

func (x *DynamicNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg)

OnMsg 处理消息

func (*DynamicNode) Type

func (x *DynamicNode) Type() string

Type 组件类型

type JsonParser

type JsonParser struct {
}

JsonParser Json

func (*JsonParser) DecodeRuleChain

func (p *JsonParser) DecodeRuleChain(rootRuleChain []byte) (types.RuleChain, error)

DecodeRuleChain 通过json解析规则链结构体

func (*JsonParser) DecodeRuleNode

func (p *JsonParser) DecodeRuleNode(rootRuleChain []byte) (types.RuleNode, error)

DecodeRuleNode 通过json解析节点结构体

func (*JsonParser) EncodeRuleChain

func (p *JsonParser) EncodeRuleChain(def interface{}) ([]byte, error)

func (*JsonParser) EncodeRuleNode

func (p *JsonParser) EncodeRuleNode(def interface{}) ([]byte, error)

type PluginComponentRegistry

type PluginComponentRegistry struct {
	// contains filtered or unexported fields
}

PluginComponentRegistry is an initializer for Go plugin components.

func (*PluginComponentRegistry) Components

func (p *PluginComponentRegistry) Components() []types.Node

Components returns a slice of components provided by the plugin.

func (*PluginComponentRegistry) Init

func (p *PluginComponentRegistry) Init() error

Init initializes the plugin component registry by loading the plugin from a file.

type Pool

type Pool struct {

	// Callbacks provides hooks for rule engine lifecycle events,
	// enabling custom handling of creation, updates, and deletion.
	// Callbacks 为规则引擎生命周期事件提供钩子,
	// 支持创建、更新和删除的自定义处理。
	Callbacks types.Callbacks
	// contains filtered or unexported fields
}

Pool is a pool of rule engine instances. It provides centralized management of multiple rule engines, enabling efficient resource sharing, batch operations, and coordinated lifecycle management.

Pool 是规则引擎实例的池。 它提供多个规则引擎的集中管理,支持高效的资源共享、批量操作和协调的生命周期管理。

Key Features: 主要特性:

  • Concurrent-safe rule engine storage using sync.Map 使用 sync.Map 的并发安全规则引擎存储
  • Automatic rule chain loading from filesystem 从文件系统自动加载规则链
  • Callback-based lifecycle events 基于回调的生命周期事件
  • Batch operations across multiple engines 跨多个引擎的批量操作
  • Dynamic engine creation and management 动态引擎创建和管理

Use Cases: 使用场景:

  • Multi-tenant rule engine management 多租户规则引擎管理
  • Rule chain hot reloading and deployment 规则链热重载和部署
  • Distributed rule processing coordination 分布式规则处理协调
  • Resource sharing between related rule chains 相关规则链之间的资源共享

func NewPool

func NewPool() *Pool

NewPool creates a new instance of a rule engine pool. This function initializes an empty pool ready for use.

NewPool 创建规则引擎池的新实例。 此函数初始化一个准备使用的空池。

Returns: 返回:

  • *Pool: New pool instance 新的池实例

Usage: 使用:

pool := NewPool()
engine, err := pool.New("engine1", ruleChainBytes)

func (*Pool) Del

func (g *Pool) Del(id string)

Del deletes a rule engine instance by its ID.

func (*Pool) Get

func (g *Pool) Get(id string) (types.RuleEngine, bool)

Get retrieves a rule engine instance by its ID.

func (*Pool) Load

func (g *Pool) Load(folderPath string, opts ...types.RuleEngineOption) error

Load loads all rule chain configurations from a specified folder and its subfolders into the rule engine instance pool. The rule chain ID is taken from the configuration file's ruleChain.id.

Load 从指定文件夹及其子文件夹加载所有规则链配置到规则引擎实例池中。 规则链 ID 取自配置文件的 ruleChain.id。

Parameters: 参数:

  • folderPath: Path to the folder containing rule chain files 包含规则链文件的文件夹路径
  • opts: Optional configuration functions for the rule engines 规则引擎的可选配置函数

Returns: 返回:

  • error: Loading error if any 如果有的话,加载错误

File Processing: 文件处理:

  • Supports JSON files (*.json, *.JSON) 支持 JSON 文件(*.json、*.JSON)
  • Recursively processes subdirectories 递归处理子目录
  • Uses glob patterns for file matching 使用 glob 模式进行文件匹配
  • Automatically extracts rule chain ID from file content 自动从文件内容提取规则链 ID

Error Handling: 错误处理:

  • Individual file errors are logged but don't stop the overall process 单个文件错误会被记录但不会停止整个过程
  • Returns error only for critical failures like invalid folder path 仅对关键故障(如无效文件夹路径)返回错误

Callback Integration: 回调集成:

  • Triggers OnNew callback for each successfully loaded rule chain 为每个成功加载的规则链触发 OnNew 回调
  • Enables custom processing and validation of loaded chains 支持已加载链的自定义处理和验证

func (*Pool) New

func (g *Pool) New(id string, rootRuleChainSrc []byte, opts ...types.RuleEngineOption) (types.RuleEngine, error)

New creates a new RuleEngine instance and stores it in the rule chain pool. If the specified id is empty, the ruleChain.id from the rule chain file is used.

func (*Pool) OnMsg

func (g *Pool) OnMsg(msg types.RuleMsg)

OnMsg invokes all rule engine instances to process a message. All rule chains in the rule engine instance pool will attempt to process the message.

func (*Pool) Range

func (g *Pool) Range(f func(key, value any) bool)

Range iterates over all rule engine instances in the pool.

func (*Pool) Reload

func (g *Pool) Reload(opts ...types.RuleEngineOption)

Reload reloads all rule engine instances in the pool with the given options.

func (*Pool) SetCallbacks

func (g *Pool) SetCallbacks(callbacks types.Callbacks)

func (*Pool) Stop

func (g *Pool) Stop()

Stop releases all rule engine instances in the pool.

type RelationCache

type RelationCache struct {
	// contains filtered or unexported fields
}

RelationCache caches the outgoing node relationships based on the incoming node. This structure is used as a key for caching node relationships to improve performance by avoiding repeated lookups of node routing information.

RelationCache 基于传入节点缓存传出节点关系。 此结构用作缓存节点关系的键,通过避免重复查找节点路由信息来提高性能。

Cache Key Structure: 缓存键结构:

  • inNodeId: The source node from which the relationship originates 关系源头的源节点
  • relationType: The type of relationship (e.g., "Success", "Failure", "True", "False") 关系类型(例如,"Success"、"Failure"、"True"、"False")

Usage: 用法:

This cache significantly improves performance in rule chains with complex
routing by avoiding repeated traversal of the node relationship map.
该缓存通过避免重复遍历节点关系映射,显著提高了具有复杂路由的规则链的性能。

type RuleChainCtx

type RuleChainCtx struct {
	// Id is the unique identifier of the rule chain node
	// Id 是规则链节点的唯一标识符
	Id types.RuleNodeId

	// SelfDefinition contains the complete rule chain definition including
	// metadata, nodes, connections, and configuration
	// SelfDefinition 包含完整的规则链定义,包括元数据、节点、连接和配置
	SelfDefinition *types.RuleChain

	// RWMutex provides thread-safe access to the rule chain context,
	// allowing concurrent reads while ensuring exclusive writes
	// RWMutex 为规则链上下文提供线程安全访问,允许并发读取同时确保独占写入
	sync.RWMutex
	// contains filtered or unexported fields
}

RuleChainCtx defines an instance of a rule chain. It initializes all nodes and records the routing relationships between all nodes in the rule chain. This is the core context that manages the execution environment for a complete rule chain.

RuleChainCtx 定义规则链的实例。 它初始化所有节点并记录规则链中所有节点之间的路由关系。 这是管理完整规则链执行环境的核心上下文。

Core Responsibilities: 核心职责:

  • Node lifecycle management 节点生命周期管理
  • Routing relationship management 路由关系管理
  • Configuration and variable handling 配置和变量处理
  • Aspect-oriented programming integration 面向切面编程集成
  • Sub-rule chain orchestration 子规则链编排
  • Thread-safe operations 线程安全操作

Architecture: 架构:

Each RuleChainCtx represents a complete rule chain with:
每个 RuleChainCtx 代表一个完整的规则链,包含:
- Multiple RuleNodeCtx instances (individual nodes)  多个 RuleNodeCtx 实例(单个节点)
- Routing matrix defining node connections  定义节点连接的路由矩阵
- Shared configuration and variables  共享配置和变量
- Root context for message processing  消息处理的根上下文

Performance Features: 性能特性:

  • Relationship caching for fast routing lookups 用于快速路由查找的关系缓存
  • Efficient parent-child node tracking 高效的父子节点跟踪
  • Lock-optimized concurrent access 锁优化的并发访问
  • Variable preprocessing and secret decryption 变量预处理和密钥解密

func InitRuleChainCtx

func InitRuleChainCtx(config types.Config, aspects types.AspectList, ruleChainDef *types.RuleChain, ruleChainPool types.RuleEnginePool) (*RuleChainCtx, error)

InitRuleChainCtx initializes a RuleChainCtx with the given configuration, aspects, and rule chain definition. This function performs the complete initialization of a rule chain context, including node creation, relationship mapping, variable processing, and aspect integration.

InitRuleChainCtx 使用给定的配置、切面和规则链定义初始化 RuleChainCtx。 此函数执行规则链上下文的完整初始化,包括节点创建、关系映射、变量处理和切面集成。

Parameters: 参数:

  • config: Rule engine configuration containing component registry and global settings 包含组件注册表和全局设置的规则引擎配置
  • aspects: List of AOP aspects to be applied to the rule chain 要应用于规则链的 AOP 切面列表
  • ruleChainDef: Complete rule chain definition with nodes and connections 包含节点和连接的完整规则链定义

Returns: 返回:

  • *RuleChainCtx: Fully initialized rule chain context 完全初始化的规则链上下文
  • error: Initialization error if any 如果有的话,初始化错误

Initialization Process: 初始化过程:

  1. Execute before-init aspects 执行初始化前切面
  2. Create and configure RuleChainCtx structure 创建和配置 RuleChainCtx 结构
  3. Process variables and secrets 处理变量和密钥
  4. Initialize all node components 初始化所有节点组件
  5. Build node relationship mappings 构建节点关系映射
  6. Set up sub-rule chain connections 设置子规则链连接
  7. Create root rule context 创建根规则上下文
  8. Handle empty rule chain cases 处理空规则链情况

Error Handling: 错误处理:

  • Aspect initialization failures 切面初始化失败
  • Node component creation errors 节点组件创建错误
  • Variable processing failures 变量处理失败
  • Invalid rule chain definitions 无效的规则链定义

func (*RuleChainCtx) Config

func (rc *RuleChainCtx) Config() types.Config

Config returns the configuration of the rule chain context

func (*RuleChainCtx) Copy

func (rc *RuleChainCtx) Copy(newCtx *RuleChainCtx)

Copy copies the content from another RuleChainCtx

func (*RuleChainCtx) DSL

func (rc *RuleChainCtx) DSL() []byte

DSL returns the rule chain definition as a byte slice

func (*RuleChainCtx) Definition

func (rc *RuleChainCtx) Definition() *types.RuleChain

Definition returns the rule chain definition

func (*RuleChainCtx) Destroy

func (rc *RuleChainCtx) Destroy()

Destroy cleans up resources and executes destroy aspects

func (*RuleChainCtx) GetAspects

func (rc *RuleChainCtx) GetAspects() types.AspectList

GetAspects retrieves the aspects of the rule chain

func (*RuleChainCtx) GetFirstNode

func (rc *RuleChainCtx) GetFirstNode() (types.NodeCtx, bool)

GetFirstNode retrieves the first node, where the message starts flowing. By default, it's the node with index 0

func (*RuleChainCtx) GetNextNodes

func (rc *RuleChainCtx) GetNextNodes(id types.RuleNodeId, relationType string) ([]types.NodeCtx, bool)

GetNextNodes retrieves the child nodes of the current node with the specified relationship This method implements a two-level caching strategy: first checking the relationCache, then building the cache if needed, providing high-performance routing for message flow.

GetNextNodes 检索具有指定关系的当前节点的子节点 此方法实现两级缓存策略:首先检查 relationCache,然后在需要时构建缓存, 为消息流提供高性能路由。

Parameters: 参数:

  • id: Source node identifier 源节点标识符
  • relationType: Type of relationship to follow (e.g., "Success", "Failure", "True", "False") 要遵循的关系类型(例如,"Success"、"Failure"、"True"、"False")

Returns: 返回:

  • []types.NodeCtx: List of child node contexts 子节点上下文列表
  • bool: true if any child nodes found, false otherwise 如果找到任何子节点则为 true,否则为 false

Performance Features: 性能特性:

  • Relationship caching: O(1) lookup time for cached relationships 关系缓存:缓存关系的 O(1) 查找时间
  • Lazy cache building: cache is built only when needed 延迟缓存构建:仅在需要时构建缓存
  • Thread-safe: proper locking for concurrent access 线程安全:适当的锁定以进行并发访问

Cache Strategy: 缓存策略:

  1. Check relationCache for existing entry 检查 relationCache 中的现有条目
  2. If not found, traverse nodeRoutes to find matches 如果未找到,遍历 nodeRoutes 查找匹配项
  3. Build node context list from matching relationships 从匹配的关系构建节点上下文列表
  4. Store result in relationCache for future use 将结果存储在 relationCache 中以供将来使用

func (*RuleChainCtx) GetNodeById

func (rc *RuleChainCtx) GetNodeById(id types.RuleNodeId) (types.NodeCtx, bool)

GetNodeById retrieves a node context by its ID This method supports both regular nodes and sub-rule chains, providing unified access to all types of nodes in the rule chain.

GetNodeById 通过 ID 检索节点上下文 此方法支持常规节点和子规则链,为规则链中所有类型的节点提供统一访问。

Parameters: 参数:

  • id: Node identifier with type information 带有类型信息的节点标识符

Returns: 返回:

  • types.NodeCtx: Node context if found 如果找到,节点上下文
  • bool: true if node exists, false otherwise 如果节点存在则为 true,否则为 false

Behavior: 行为:

  • For CHAIN type: searches in the rule engine pool 对于 CHAIN 类型:在规则引擎池中搜索
  • For NODE type: searches in the local nodes map 对于 NODE 类型:在本地节点映射中搜索
  • Thread-safe: uses read lock for concurrent access 线程安全:使用读锁进行并发访问

func (*RuleChainCtx) GetNodeByIndex

func (rc *RuleChainCtx) GetNodeByIndex(index int) (types.NodeCtx, bool)

GetNodeByIndex retrieves a node context by its index

func (*RuleChainCtx) GetNodeId

func (rc *RuleChainCtx) GetNodeId() types.RuleNodeId

GetNodeId returns the node ID

func (*RuleChainCtx) GetNodeRoutes

func (rc *RuleChainCtx) GetNodeRoutes(id types.RuleNodeId) ([]types.RuleNodeRelation, bool)

GetNodeRoutes retrieves the routes for a given node ID

func (*RuleChainCtx) GetParentNodeIds

func (rc *RuleChainCtx) GetParentNodeIds(id types.RuleNodeId) ([]types.RuleNodeId, bool)

GetParentNodeIds retrieves the parent node IDs for a given node ID

func (*RuleChainCtx) GetRuleEnginePool

func (rc *RuleChainCtx) GetRuleEnginePool() types.RuleEnginePool

GetRuleEnginePool retrieves the sub-rule chain pool

func (*RuleChainCtx) Init

func (rc *RuleChainCtx) Init(_ types.Config, configuration types.Configuration) error

Init initializes the rule chain context

func (*RuleChainCtx) IsDebugMode

func (rc *RuleChainCtx) IsDebugMode() bool

IsDebugMode checks if debug mode is enabled

func (*RuleChainCtx) New

func (rc *RuleChainCtx) New() types.Node

New creates a new instance (not supported for RuleChainCtx)

func (*RuleChainCtx) OnMsg

func (rc *RuleChainCtx) OnMsg(ctx types.RuleContext, msg types.RuleMsg)

OnMsg processes incoming messages

func (*RuleChainCtx) ReloadChild

func (rc *RuleChainCtx) ReloadChild(ruleNodeId types.RuleNodeId, def []byte) error

ReloadChild reloads a child node

func (*RuleChainCtx) ReloadSelf

func (rc *RuleChainCtx) ReloadSelf(def []byte) error

ReloadSelf reloads the rule chain from a byte slice definition

func (*RuleChainCtx) ReloadSelfFromDef

func (rc *RuleChainCtx) ReloadSelfFromDef(def types.RuleChain) error

ReloadSelfFromDef reloads the rule chain from a RuleChain definition This method performs hot reloading of rule chain configuration, supporting dynamic updates without stopping the rule engine.

ReloadSelfFromDef 从 RuleChain 定义重新加载规则链 此方法执行规则链配置的热重载,支持在不停止规则引擎的情况下进行动态更新。

Parameters: 参数:

  • def: New rule chain definition 新的规则链定义

Returns: 返回:

  • error: Reload error if any 如果有的话,重载错误

Hot Reload Process: 热重载过程:

  1. Check if rule chain is disabled 检查规则链是否被禁用
  2. Initialize new rule chain context 初始化新的规则链上下文
  3. Safely destroy old nodes without holding locks 在不持有锁的情况下安全销毁旧节点
  4. Execute destroy aspects for cleanup 执行销毁切面进行清理
  5. Atomically replace old context with new one 原子性地用新上下文替换旧上下文
  6. Execute reload aspects for post-reload processing 执行重载切面进行重载后处理

Error Handling: 错误处理:

  • Disabled rule chain detection 禁用规则链检测
  • Context initialization failures 上下文初始化失败
  • Aspect execution errors 切面执行错误

func (*RuleChainCtx) SetAspects

func (rc *RuleChainCtx) SetAspects(aspects types.AspectList)

SetAspects sets the aspects for the rule chain

func (*RuleChainCtx) SetRuleEnginePool

func (rc *RuleChainCtx) SetRuleEnginePool(ruleChainPool types.RuleEnginePool)

SetRuleEnginePool sets the sub-rule chain pool

func (*RuleChainCtx) Type

func (rc *RuleChainCtx) Type() string

Type returns the component type

type RuleComponentRegistry

type RuleComponentRegistry struct {

	// RWMutex is a read/write mutex lock.
	sync.RWMutex
	// contains filtered or unexported fields
}

RuleComponentRegistry is a registry for rule engine components.

func (*RuleComponentRegistry) GetComponentForms

func (r *RuleComponentRegistry) GetComponentForms() types.ComponentFormList

GetComponentForms returns a list of component forms for all registered components.

func (*RuleComponentRegistry) GetComponents

func (r *RuleComponentRegistry) GetComponents() map[string]types.Node

GetComponents returns a map of all registered components.

func (*RuleComponentRegistry) NewNode

func (r *RuleComponentRegistry) NewNode(componentType string) (types.Node, error)

NewNode creates a new instance of a rule engine node component by its type.

func (*RuleComponentRegistry) Register

func (r *RuleComponentRegistry) Register(node types.Node) error

Register adds a rule engine node component to the registry.

func (*RuleComponentRegistry) RegisterPlugin

func (r *RuleComponentRegistry) RegisterPlugin(name string, file string) error

RegisterPlugin adds a rule engine node component from a plugin file.

func (*RuleComponentRegistry) Unregister

func (r *RuleComponentRegistry) Unregister(componentType string) error

Unregister removes a component from the registry by its type or plugin name.

type RuleEngine

type RuleEngine struct {
	// Embed graceful shutdown functionality
	// 嵌入优雅停机功能
	base.GracefulShutdown

	// Config is the configuration for the rule engine containing
	// global settings, component registry, and execution parameters
	// Config 是规则引擎的配置,包含全局设置、组件注册表和执行参数
	Config types.Config

	// Aspects is a list of AOP (Aspect-Oriented Programming) aspects
	// that provide cross-cutting concerns like logging, validation, and metrics
	// Aspects 是面向切面编程(AOP)切面列表,提供如日志、验证和指标等横切关注点
	Aspects types.AspectList

	// OnUpdated is a callback function triggered when the rule chain is updated
	// OnUpdated 是规则链更新时触发的回调函数
	OnUpdated func(chainId, nodeId string, dsl []byte)
	// contains filtered or unexported fields
}

RuleEngine is the core structure for a rule engine instance. Each RuleEngine instance manages exactly one root rule chain and provides the primary interface for message processing and rule execution.

RuleEngine 是规则引擎实例的核心结构。 每个 RuleEngine 实例管理恰好一个根规则链,并为消息处理和规则执行提供主要接口。

Architecture & Features: 架构和特性:

  • Single root rule chain management with hot reloading capability 单根规则链管理,支持热重载功能
  • Aspect-oriented programming support for cross-cutting concerns 面向切面编程支持,用于横切关注点
  • Two-phase graceful shutdown for safe resource cleanup 两阶段优雅停机,确保安全的资源清理
  • Deadlock-free reload mechanism with message queuing 无死锁重载机制,支持消息队列
  • Concurrent message processing with atomic operations 并发消息处理,使用原子操作
  • Context-aware processing with shutdown signal integration 上下文感知处理,集成停机信号
  • Sub-rule chain pool integration for nested execution 子规则链池集成,支持嵌套执行
  • Comprehensive metrics and debugging capabilities 全面的指标和调试功能
  • Backpressure control during reload to prevent memory overflow 重载期间的背压控制,防止内存溢出

Lifecycle Management: 生命周期管理:

  1. Creation with NewRuleEngine() and rule chain definition 使用 NewRuleEngine() 和规则链定义创建
  2. Message processing via OnMsg() with concurrent safety 通过 OnMsg() 进行消息处理,具有并发安全性
  3. Optional hot reloading with ReloadSelf() without downtime 使用 ReloadSelf() 进行可选的热重载,无需停机
  4. Graceful cleanup with Stop() and proper resource release 使用 Stop() 进行优雅清理和适当的资源释放

Thread Safety & Concurrency: 线程安全和并发:

RuleEngine is designed for high-concurrency scenarios with:
RuleEngine 设计用于高并发场景,具有:
- Lock-free message processing using atomic operations
  使用原子操作的无锁消息处理
- Safe concurrent access to rule chain definitions
  对规则链定义的安全并发访问
- Coordinated reload operations without blocking message flow
  协调的重载操作,不阻塞消息流
- Graceful shutdown handling for all concurrent operations
  对所有并发操作的优雅停机处理
- Backpressure control to prevent resource exhaustion
  背压控制以防止资源耗尽

Memory Safety During Reload: 重载期间的内存安全:

The engine implements sophisticated backpressure mechanisms to prevent
memory overflow during reload operations:
引擎实现了复杂的背压机制,在重载操作期间防止内存溢出:
- Limited concurrent goroutines waiting for reload completion
  限制并发等待重载完成的goroutine数量
- Fast-fail strategy for excessive reload wait requests
  对过量重载等待请求的快速失败策略
- Configurable memory protection thresholds
  可配置的内存保护阈值
- Automatic degradation to reject mode under high load
  高负载下自动降级到拒绝模式

This design ensures reliable, high-performance rule processing in production environments. 此设计确保在生产环境中可靠的高性能规则处理。

func NewRuleEngine

func NewRuleEngine(id string, def []byte, opts ...types.RuleEngineOption) (*RuleEngine, error)

NewRuleEngine creates a new RuleEngine instance with the given ID and definition. It applies the provided RuleEngineOptions during the creation process.

NewRuleEngine 使用给定的 ID 和定义创建新的 RuleEngine 实例。 它在创建过程中应用提供的 RuleEngineOptions。

Parameters: 参数:

  • id: Unique identifier for the rule engine (can be empty to use chain ID) 规则引擎的唯一标识符(可以为空以使用链 ID)
  • def: Rule chain definition in JSON or other supported format JSON 或其他支持格式的规则链定义
  • opts: Optional configuration functions to customize the engine 可选的配置函数来自定义引擎

Returns: 返回:

  • *RuleEngine: Initialized rule engine instance 已初始化的规则引擎实例
  • error: Initialization error if any 如果有的话,初始化错误

The creation process involves: 创建过程包括:

  1. Parsing the rule chain definition 解析规则链定义
  2. Initializing all components and their relationships 初始化所有组件及其关系
  3. Setting up aspects and callback functions 设置切面和回调函数
  4. Validating the configuration 验证配置
  5. Configuring backpressure control for memory safety 配置背压控制以确保内存安全

func (*RuleEngine) DSL

func (e *RuleEngine) DSL() []byte

DSL returns the current rule chain configuration in its original format. DSL 返回原始格式的当前规则链配置。

func (*RuleEngine) Definition

func (e *RuleEngine) Definition() types.RuleChain

Definition returns the rule chain definition structure. Definition 返回规则链定义结构。

func (*RuleEngine) GetAspects

func (e *RuleEngine) GetAspects() types.AspectList

GetAspects returns a copy of the current aspects list to avoid data races. GetAspects 返回当前切面列表的副本以避免数据竞争。

func (*RuleEngine) GetMetrics

func (e *RuleEngine) GetMetrics() *metrics.EngineMetrics

GetMetrics returns engine metrics if the metrics aspect is enabled. GetMetrics 如果启用了指标切面,则返回引擎指标。

func (*RuleEngine) GetReloadWaitersStats

func (e *RuleEngine) GetReloadWaitersStats() (maxWaiters int64, currentWaiters int64, isReloading bool)

GetReloadWaitersStats returns current reload waiters statistics for monitoring. This provides insight into reload behavior under load.

GetReloadWaitersStats 返回当前重载等待者统计信息用于监控。 这提供了负载下重载行为的洞察。

Returns: 返回:

  • maxWaiters: Maximum allowed concurrent waiters (0 means unlimited) maxWaiters: 最大允许的并发等待者(0 表示无限制)
  • currentWaiters: Current number of goroutines waiting for reload currentWaiters: 当前等待重载的 goroutine 数量
  • isReloading: Whether engine is currently reloading isReloading: 引擎当前是否正在重载

func (*RuleEngine) Id

func (e *RuleEngine) Id() string

Id returns the unique identifier of the rule engine instance. Id 返回规则引擎实例的唯一标识符。

func (*RuleEngine) Initialized

func (e *RuleEngine) Initialized() bool

Initialized returns whether the rule engine has been properly initialized. Initialized 返回规则引擎是否已正确初始化。

func (*RuleEngine) NodeDSL

func (e *RuleEngine) NodeDSL(chainId types.RuleNodeId, childNodeId types.RuleNodeId) []byte

NodeDSL returns the configuration of a specific node within the rule chain. NodeDSL 返回规则链中特定节点的配置。

func (*RuleEngine) OnMsg

func (e *RuleEngine) OnMsg(msg types.RuleMsg, opts ...types.RuleContextOption)

OnMsg asynchronously processes a message using the rule engine. It accepts optional RuleContextOption parameters to customize the execution context.

OnMsg 使用规则引擎异步处理消息。 它接受可选的 RuleContextOption 参数来自定义执行上下文。

func (*RuleEngine) OnMsgAndWait

func (e *RuleEngine) OnMsgAndWait(msg types.RuleMsg, opts ...types.RuleContextOption)

OnMsgAndWait synchronously processes a message using the rule engine and waits for all nodes in the rule chain to complete before returning. OnMsgAndWait 使用规则引擎同步处理消息,并在返回前等待规则链中的所有节点完成。

func (*RuleEngine) OnMsgWithEndFunc

func (e *RuleEngine) OnMsgWithEndFunc(msg types.RuleMsg, endFunc types.OnEndFunc)

OnMsgWithEndFunc is a deprecated method that asynchronously processes a message using the rule engine. The endFunc callback is used to obtain the results after the rule chain execution is complete. Note: If the rule chain has multiple endpoints, the callback function will be executed multiple times. Deprecated: Use OnMsg instead.

OnMsgWithEndFunc 是一个已弃用的方法,使用规则引擎异步处理消息。 endFunc 回调用于在规则链执行完成后获取结果。 注意:如果规则链有多个端点,回调函数将被执行多次。 已弃用:请改用 OnMsg。

func (*RuleEngine) OnMsgWithOptions

func (e *RuleEngine) OnMsgWithOptions(msg types.RuleMsg, opts ...types.RuleContextOption)

OnMsgWithOptions is a deprecated method that asynchronously processes a message using the rule engine. It allows carrying context options and an end callback option. The context is used for sharing data between different component instances. The endFunc callback is used to obtain the results after the rule chain execution is complete. Note: If the rule chain has multiple endpoints, the callback function will be executed multiple times. Deprecated: Use OnMsg instead.

OnMsgWithOptions 是一个已弃用的方法,使用规则引擎异步处理消息。 它允许携带上下文选项和结束回调选项。 上下文用于在不同组件实例之间共享数据。 endFunc 回调用于在规则链执行完成后获取结果。 注意:如果规则链有多个端点,回调函数将被执行多次。 已弃用:请改用 OnMsg。

func (*RuleEngine) Reload

func (e *RuleEngine) Reload(opts ...types.RuleEngineOption) error

Reload reloads the current rule chain with optional new configuration. This is a convenience method that uses the current DSL definition. Reload 使用可选的新配置重载当前规则链。 这是使用当前 DSL 定义的便捷方法。

func (*RuleEngine) ReloadChild

func (e *RuleEngine) ReloadChild(ruleNodeId string, dsl []byte) error

ReloadChild updates a specific node within the root rule chain. If ruleNodeId is empty, it updates the entire root rule chain. It gracefully stops accepting new messages, waits for active messages to complete, performs the reload, and then resumes normal operation.

ReloadChild 更新根规则链中的特定节点。 如果 ruleNodeId 为空,则更新整个根规则链。 它优雅地停止接收新消息,等待活跃消息完成,执行重载,然后恢复正常运行。

Parameters: 参数:

  • ruleNodeId: ID of the node to update (empty for root chain) 要更新的节点 ID(根链为空)
  • dsl: New configuration for the node/chain 节点/链的新配置

Returns: 返回:

  • error: Update error if any 如果有的话,更新错误

func (*RuleEngine) ReloadSelf

func (e *RuleEngine) ReloadSelf(dsl []byte, opts ...types.RuleEngineOption) error

ReloadSelf reloads the rule chain with new definition and options. This method supports hot reloading of rule configurations without stopping the engine. It implements a two-phase graceful reload process:

Phase 1: Preparation (设置阶段) - Apply configuration options 应用配置选项 - Wait for any ongoing reload to complete 等待任何正在进行的重载完成 - Set reloading state to block new messages 设置重载状态以阻塞新消息 - Wait for active messages to complete 等待活跃消息完成

Phase 2: Reload (重载阶段) - Parse new rule chain definition 解析新的规则链定义 - Update or create rule chain context 更新或创建规则链上下文 - Update atomic aspect pointers 更新原子切面指针 - Resume normal operation 恢复正常运行

ReloadSelf 使用新定义和选项重新加载规则链。 此方法支持在不停止引擎的情况下热重载规则配置。 它实现了两阶段优雅重载过程:

Parameters: 参数:

  • dsl: Rule chain definition in byte format 字节格式的规则链定义
  • opts: Optional configuration functions 可选的配置函数

Returns: 返回:

  • error: Reload error if any 如果有的话,重载错误

func (*RuleEngine) RootRuleChainCtx

func (e *RuleEngine) RootRuleChainCtx() types.ChainCtx

RootRuleChainCtx returns the root rule chain context. RootRuleChainCtx 返回根规则链上下文。

func (*RuleEngine) RootRuleContext

func (e *RuleEngine) RootRuleContext() types.RuleContext

RootRuleContext returns the root rule context for advanced operations. RootRuleContext 返回用于高级操作的根规则上下文。

func (*RuleEngine) SetAspects

func (e *RuleEngine) SetAspects(aspects ...types.Aspect)

SetAspects updates the list of aspects used by the rule engine. Aspects provide cross-cutting functionality like logging and validation. SetAspects 更新规则引擎使用的切面列表。 切面提供如日志和验证等横切功能。

func (*RuleEngine) SetConfig

func (e *RuleEngine) SetConfig(config types.Config)

SetConfig updates the configuration of the rule engine. This should be called before initialization for best results. SetConfig 更新规则引擎的配置。 为了获得最佳效果,应在初始化前调用。

func (*RuleEngine) SetMaxReloadWaiters

func (e *RuleEngine) SetMaxReloadWaiters(maxWaiters int64)

SetMaxReloadWaiters configures the maximum number of concurrent goroutines that can wait for reload completion. This prevents memory overflow during high-traffic reload scenarios.

SetMaxReloadWaiters 配置可以等待重载完成的最大并发 goroutine 数量。 这防止高流量重载场景下的内存溢出。

Parameters: 参数:

  • maxWaiters: Maximum number of concurrent goroutines allowed to wait If 0, disables the limit (unlimited waiters) If negative, keeps current setting unchanged maxWaiters: 允许等待的最大并发 goroutine 数量 如果为 0,禁用限制(无限等待者) 如果为负数,保持当前设置不变

Thread Safety: 线程安全:

This method is thread-safe and can be called during message processing.
此方法是线程安全的,可以在消息处理期间调用。

func (*RuleEngine) SetRuleEnginePool

func (e *RuleEngine) SetRuleEnginePool(ruleChainPool types.RuleEnginePool)

SetRuleEnginePool sets the pool used for managing sub-rule chains. This allows for nested rule chain execution and resource sharing. SetRuleEnginePool 设置用于管理子规则链的池。 这允许嵌套规则链执行和资源共享。

func (*RuleEngine) Stop

func (e *RuleEngine) Stop(ctx context.Context)

Stop shuts down the rule engine and releases all resources. Implements a two-phase graceful shutdown strategy:

Phase 1: Graceful Shutdown (优雅停机阶段) - Set shutdown flag to reject new messages 设置停机标志拒绝新消息 - Wait for all active messages to complete naturally 等待所有活跃消息自然完成 - Respect the provided context timeout 遵守提供的上下文超时

Phase 2: Force Shutdown (强制停机阶段) - If timeout exceeded, cancel contexts to interrupt operations 如果超时,取消上下文以中断操作 - Give brief time for operations to respond to cancellation 给操作短暂时间响应取消 - Clean up all resources immediately 立即清理所有资源

Context handling: 上下文处理: - If ctx is provided with deadline: uses that timeout 如果提供了带截止时间的ctx:使用该超时 - If ctx is context.Background(): uses default 10s timeout 如果ctx是context.Background():使用默认10秒超时 - If ctx is nil: performs immediate shutdown 如果ctx为nil:执行立即停机

Concurrent calls handling: 并发调用处理:

  • If graceful shutdown is already in progress, subsequent calls wait for completion 如果优雅停机已在进行中,后续调用等待其完成
  • Only one graceful shutdown process can execute at a time 一次只能执行一个优雅停机过程
  • No forced interruption of ongoing graceful shutdown 不会强制中断正在进行的优雅停机

Stop 关闭规则引擎并释放所有资源。 实现两阶段优雅停机策略:

type RuleNodeCtx

type RuleNodeCtx struct {
	// types.Node is the embedded node implementation providing the core functionality.
	// This embedding allows RuleNodeCtx to act as a node while adding wrapper capabilities.
	// types.Node 是嵌入的节点实现,提供核心功能。
	// 这种嵌入允许 RuleNodeCtx 在添加包装器功能的同时充当节点。
	types.Node

	// ChainCtx provides access to the parent rule chain context,
	// enabling node-to-chain communication and access to shared resources.
	// ChainCtx 提供对父规则链上下文的访问,支持节点到链的通信和对共享资源的访问。
	ChainCtx *RuleChainCtx

	// SelfDefinition contains the configuration and metadata for this specific node,
	// including its type, ID, configuration parameters, and behavioral settings.
	// SelfDefinition 包含此特定节点的配置和元数据,
	// 包括其类型、ID、配置参数和行为设置。
	SelfDefinition *types.RuleNode

	// sync.RWMutex provides thread-safe access to the node context,
	// ensuring concurrent safety during hot reloads and message processing.
	// sync.RWMutex 为节点上下文提供线程安全访问,
	// 确保在热重载和消息处理期间的并发安全。
	sync.RWMutex
	// contains filtered or unexported fields
}

RuleNodeCtx represents an instance of a node component within the rule engine. It acts as a wrapper around the actual node implementation, providing additional context and metadata required for rule chain execution.

RuleNodeCtx 表示规则引擎中节点组件的实例。 它充当实际节点实现的包装器,提供规则链执行所需的额外上下文和元数据。

Architecture: 架构:

RuleNodeCtx embeds the types.Node interface, allowing it to act as both
a node wrapper and a node implementation. This design provides:
RuleNodeCtx 嵌入 types.Node 接口,允许它既充当节点包装器又充当节点实现。
此设计提供:
- Direct access to node methods through interface embedding  通过接口嵌入直接访问节点方法
- Additional context and configuration management  额外的上下文和配置管理
- Thread-safe operations with mutex protection  使用互斥锁保护的线程安全操作
- Hot reloading capabilities  热重载功能

func InitNetResourceNodeCtx

func InitNetResourceNodeCtx(config types.Config, chainCtx *RuleChainCtx, aspects types.AspectList, selfDefinition *types.RuleNode) (*RuleNodeCtx, error)

InitNetResourceNodeCtx initializes a RuleNodeCtx with network resources. This function is used for nodes that require network connectivity and resources.

InitNetResourceNodeCtx 初始化带有网络资源的 RuleNodeCtx。 此函数用于需要网络连接和资源的节点。

Parameters: 参数:

  • config: Global rule engine configuration 全局规则引擎配置
  • chainCtx: Parent rule chain context 父规则链上下文
  • aspects: List of AOP aspects to apply 要应用的 AOP 切面列表
  • selfDefinition: Node definition and configuration 节点定义和配置

Returns: 返回:

  • *RuleNodeCtx: Initialized node context with network resources 已初始化的带网络资源的节点上下文
  • error: Initialization error if any 如果有的话,初始化错误

func InitRuleNodeCtx

func InitRuleNodeCtx(config types.Config, chainCtx *RuleChainCtx, aspects types.AspectList, selfDefinition *types.RuleNode) (*RuleNodeCtx, error)

InitRuleNodeCtx initializes a RuleNodeCtx with the given parameters. This is the standard initialization function for regular nodes without network resources.

InitRuleNodeCtx 使用给定参数初始化 RuleNodeCtx。 这是不带网络资源的常规节点的标准初始化函数。

Parameters: 参数:

  • config: Global rule engine configuration 全局规则引擎配置
  • chainCtx: Parent rule chain context 父规则链上下文
  • aspects: List of AOP aspects to apply 要应用的 AOP 切面列表
  • selfDefinition: Node definition and configuration 节点定义和配置

Returns: 返回:

  • *RuleNodeCtx: Initialized node context 已初始化的节点上下文
  • error: Initialization error if any 如果有的话,初始化错误

func (*RuleNodeCtx) Config

func (rn *RuleNodeCtx) Config() types.Config

Config returns the configuration of the rule engine.

func (*RuleNodeCtx) Copy

func (rn *RuleNodeCtx) Copy(newCtx *RuleNodeCtx)

Copy copies the contents of a new RuleNodeCtx into this one. This method is used for updating node configuration during reloads.

Copy 将新 RuleNodeCtx 的内容复制到当前实例中。 此方法用于在重载期间更新节点配置。

Parameters: 参数:

  • newCtx: New node context to copy from 要复制的新节点上下文

func (*RuleNodeCtx) DSL

func (rn *RuleNodeCtx) DSL() []byte

DSL returns the DSL representation of the node.

func (*RuleNodeCtx) Destroy

func (rn *RuleNodeCtx) Destroy()

Destroy safely destroys the embedded node

func (*RuleNodeCtx) GetNodeById

func (rn *RuleNodeCtx) GetNodeById(_ types.RuleNodeId) (types.NodeCtx, bool)

GetNodeById is not supported for RuleNodeCtx.

func (*RuleNodeCtx) GetNodeId

func (rn *RuleNodeCtx) GetNodeId() types.RuleNodeId

GetNodeId returns the ID of the node.

func (*RuleNodeCtx) IsDebugMode

func (rn *RuleNodeCtx) IsDebugMode() bool

IsDebugMode returns whether the node is in debug mode.

func (*RuleNodeCtx) OnMsg

func (rn *RuleNodeCtx) OnMsg(ctx types.RuleContext, msg types.RuleMsg)

OnMsg 提供并发安全的消息处理,保护内嵌Node访问 OnMsg provides concurrent-safe message processing with protected access to the embedded Node. This method ensures thread safety during message processing by using read locks to protect against concurrent modifications during hot reloads.

OnMsg 提供并发安全的消息处理,通过使用读锁保护嵌入的 Node 访问。 此方法通过使用读锁防止热重载期间的并发修改,确保消息处理期间的线程安全。

Parameters: 参数:

  • ctx: Rule context for message processing 用于消息处理的规则上下文
  • msg: Message to be processed 要处理的消息

func (*RuleNodeCtx) ReloadChild

func (rn *RuleNodeCtx) ReloadChild(_ types.RuleNodeId, _ []byte) error

ReloadChild is not supported for RuleNodeCtx.

func (*RuleNodeCtx) ReloadSelf

func (rn *RuleNodeCtx) ReloadSelf(def []byte) error

ReloadSelf reloads the node from a byte slice definition.

func (*RuleNodeCtx) ReloadSelfFromDef

func (rn *RuleNodeCtx) ReloadSelfFromDef(def types.RuleNode) error

ReloadSelfFromDef reloads the node from a RuleNode definition. This method implements hot reloading for individual nodes, allowing dynamic updates without stopping the entire rule chain.

ReloadSelfFromDef 从 RuleNode 定义重新加载节点。 此方法为单个节点实现热重载,允许在不停止整个规则链的情况下进行动态更新。

Parameters: 参数:

  • def: New node definition 新的节点定义

Returns: 返回:

  • error: Reload error if any 如果有的话,重载错误

type RunSnapshot

type RunSnapshot struct {
	// contains filtered or unexported fields
}

RunSnapshot holds the state and logs for a rule chain execution.

func NewRunSnapshot

func NewRunSnapshot(msgId string, chainCtx *RuleChainCtx, startTs int64) *RunSnapshot

NewRunSnapshot creates a new instance of RunSnapshot with the given parameters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL