Documentation
¶
Index ¶
- Constants
- Variables
- func NewChainAggregationEngine(def []byte, opts ...types.EngineOption) (types.Engine, error)
- func NewChainEngine(def []byte, opts ...types.EngineOption) (types.Engine, error)
- func NewConfig(opts ...types.Option) types.Config
- func WithAspects(aspects ...types.Aspect) types.EngineOption
- func WithConfig(config types.Config) types.EngineOption
- type ChainAggregationCtx
- func (rc *ChainAggregationCtx) Config() types.Config
- func (rc *ChainAggregationCtx) DSL() []byte
- func (rc *ChainAggregationCtx) Destroy()
- func (rc *ChainAggregationCtx) GetChainById(id string) (types.ChainCtx, bool)
- func (rc *ChainAggregationCtx) Id() string
- func (rc *ChainAggregationCtx) Init(_ types.Config, configuration types.Configuration) error
- func (rc *ChainAggregationCtx) Name() string
- func (rc *ChainAggregationCtx) New() types.Node
- func (rc *ChainAggregationCtx) OnMsg(ctx context.Context, msg types.RuleMsg) (string, error)
- func (rc *ChainAggregationCtx) TerminalOnErr() bool
- func (rc *ChainAggregationCtx) Type() types.NodeType
- type ChainAggregationEngine
- func (e *ChainAggregationEngine) DSL() []byte
- func (e *ChainAggregationEngine) GetAspects() types.AspectList
- func (e *ChainAggregationEngine) GetMetrics() *metrics.EngineMetrics
- func (e *ChainAggregationEngine) Id() string
- func (e *ChainAggregationEngine) Name() string
- func (e *ChainAggregationEngine) OnMsg(ctx context.Context, msg types.RuleMsg) error
- func (e *ChainAggregationEngine) ReloadSelf(dsl []byte) error
- func (e *ChainAggregationEngine) SetAspects(aspects ...types.Aspect)
- func (e *ChainAggregationEngine) SetConfig(config types.Config)
- func (e *ChainAggregationEngine) Stop()
- func (rc *ChainAggregationEngine) TerminalOnErr() bool
- type ChainCtx
- func (rc *ChainCtx) Config() types.Config
- func (rc *ChainCtx) DSL() []byte
- func (rc *ChainCtx) Destroy()
- func (rc *ChainCtx) GetNodeById(id string) (types.NodeCtx, bool)
- func (rc *ChainCtx) GetNodeRoutes(id string) ([]types.RuleNodeRelation, bool)
- func (rc *ChainCtx) Id() string
- func (rc *ChainCtx) Init(_ types.Config, configuration types.Configuration) error
- func (rc *ChainCtx) Name() string
- func (rc *ChainCtx) New() types.Node
- func (rc *ChainCtx) OnMsg(ctx context.Context, msg types.RuleMsg) (string, error)
- func (rc *ChainCtx) TerminalOnErr() bool
- func (rc *ChainCtx) Type() types.NodeType
- type ChainEngine
- func (e *ChainEngine) DSL() []byte
- func (e *ChainEngine) GetAspects() types.AspectList
- func (e *ChainEngine) Id() string
- func (rc *ChainEngine) Name() string
- func (e *ChainEngine) OnMsg(ctx context.Context, msg types.RuleMsg) error
- func (e *ChainEngine) ReloadSelf(dsl []byte) error
- func (e *ChainEngine) SetAspects(aspects ...types.Aspect)
- func (e *ChainEngine) SetConfig(config types.Config)
- func (e *ChainEngine) Stop()
- func (rc *ChainEngine) TerminalOnErr() bool
- type DefaultChainContext
- func (ctx *DefaultChainContext) From() types.NodeCtx
- func (rCtx *DefaultChainContext) GetSelfId() string
- func (ctx *DefaultChainContext) Self() types.NodeCtx
- func (rCtx *DefaultChainContext) Tell(ctx context.Context, msg types.RuleMsg, relationType string) error
- func (rCtx *DefaultChainContext) TellNext(ctx context.Context, msg types.RuleMsg, relationType string) error
- type JsonParser
- func (p *JsonParser) DecodeChain(chainDef []byte) (types.Chain, error)
- func (p *JsonParser) DecodeChainAggregation(chainAggregationDef []byte) (types.ChainAggregation, error)
- func (p *JsonParser) DecodeRule(ruleDef []byte) (types.BaseInfo, error)
- func (p *JsonParser) EncodeChain(def interface{}) ([]byte, error)
- func (p *JsonParser) EncodeChainAggregation(def interface{}) ([]byte, error)
- func (p *JsonParser) EncodeRule(def interface{}) ([]byte, error)
- type RuleComponentRegistry
- func (r *RuleComponentRegistry) GetComponents() map[types.NodeType]types.Node
- func (r *RuleComponentRegistry) NewNode(componentType types.NodeType) (types.Node, error)
- func (r *RuleComponentRegistry) Register(node types.Node) error
- func (r *RuleComponentRegistry) Unregister(componentType types.NodeType) error
- type RuleNodeCtx
- func (rn *RuleNodeCtx) Config() types.Config
- func (rn *RuleNodeCtx) DSL() []byte
- func (rn *RuleNodeCtx) Destroy()
- func (rn *RuleNodeCtx) Id() string
- func (rn *RuleNodeCtx) Name() string
- func (rn *RuleNodeCtx) OnMsg(ctx context.Context, msg types.RuleMsg) (string, error)
- func (rn *RuleNodeCtx) ReloadSelf(_ []byte) error
- func (rc *RuleNodeCtx) TerminalOnErr() bool
Constants ¶
const PluginsSymbol = "Plugins"
PluginsSymbol is the symbol used to identify plugins in a Go plugin file.
Variables ¶
var BuiltinsAspects = []types.Aspect{&aspect.ChainAggregationValidator{}, &aspect.ChainValidator{}, &aspect.MetricsAspect{}}
这些切面在初始化期间通过 initBuiltinsAspects() 方法自动添加到规则引擎中。 如果提供了自定义切面,除非自定义列表中已存在相同类型的切面,否则仍会包含 内置切面。这确保基本功能始终可用,无需显式配置。
var Registry = new(RuleComponentRegistry)
Registry is the default registry for rule engine components.
Functions ¶
func NewChainEngine ¶
func NewConfig ¶
NewConfig creates a new Config and applies the options. It initializes all necessary components with sensible defaults.
NewConfig 创建新的配置并应用选项。 它使用合理的默认值初始化所有必要的组件。
Parameters: 参数:
- opts: Optional configuration functions 可选的配置函数
Returns: 返回:
- types.Config: Initialized configuration 已初始化的配置
Default components include: 默认组件包括:
- JSON parser for rule chain definitions 规则链定义的 JSON 解析器
- Default component registry with built-in components 包含内置组件的默认组件注册表
- User-defined functions registry 用户定义函数注册表
- Default cache implementation 默认缓存实现
func WithAspects ¶
func WithAspects(aspects ...types.Aspect) types.EngineOption
WithAspects creates a RuleEngineOption to set the aspects of a RuleEngine. Aspects provide AOP (Aspect-Oriented Programming) capabilities for cross-cutting concerns like logging, metrics, validation, and debugging.
WithAspects 创建一个 RuleEngineOption 来设置 RuleEngine 的切面。 切面为日志记录、指标、验证和调试等横切关注点提供 AOP(面向切面编程)功能。
func WithConfig ¶
func WithConfig(config types.Config) types.EngineOption
WithConfig is an option that sets the Config of the RuleEngine. WithConfig 是设置 RuleEngine 配置的选项。
Types ¶
type ChainAggregationCtx ¶
type ChainAggregationCtx struct {
Aggregation types.ChainCtx
// contains filtered or unexported fields
}
func InitChainAggregationCtx ¶
func InitChainAggregationCtx(config types.Config, aspects types.AspectList, chainAggregationDef *types.ChainAggregation) (*ChainAggregationCtx, error)
func (*ChainAggregationCtx) Config ¶
func (rc *ChainAggregationCtx) Config() types.Config
Config returns the configuration of the rule chain context
func (*ChainAggregationCtx) DSL ¶
func (rc *ChainAggregationCtx) DSL() []byte
DSL returns the rule chain definition as a byte slice
func (*ChainAggregationCtx) Destroy ¶
func (rc *ChainAggregationCtx) Destroy()
Destroy cleans up resources and executes destroy aspects
func (*ChainAggregationCtx) GetChainById ¶
func (rc *ChainAggregationCtx) GetChainById(id string) (types.ChainCtx, bool)
GetNodeById retrieves a node context by its ID
func (*ChainAggregationCtx) Id ¶
func (rc *ChainAggregationCtx) Id() string
GetNodeById retrieves a node context by its ID
func (*ChainAggregationCtx) Init ¶
func (rc *ChainAggregationCtx) Init(_ types.Config, configuration types.Configuration) error
Init initializes the rule chain context
func (*ChainAggregationCtx) Name ¶
func (rc *ChainAggregationCtx) Name() string
GetNodeById retrieves a node context by its ID
func (*ChainAggregationCtx) New ¶
func (rc *ChainAggregationCtx) New() types.Node
New creates a new instance (not supported for RuleChainCtx)
func (*ChainAggregationCtx) TerminalOnErr ¶
func (rc *ChainAggregationCtx) TerminalOnErr() bool
GetNodeById retrieves a node context by its ID
func (*ChainAggregationCtx) Type ¶
func (rc *ChainAggregationCtx) Type() types.NodeType
Type returns the component type
type ChainAggregationEngine ¶
type ChainAggregationEngine struct {
// contains filtered or unexported fields
}
func (*ChainAggregationEngine) DSL ¶
func (e *ChainAggregationEngine) DSL() []byte
DSL returns the current rule chain configuration in its original format. DSL 返回原始格式的当前规则链配置。
func (*ChainAggregationEngine) GetAspects ¶
func (e *ChainAggregationEngine) GetAspects() types.AspectList
GetAspects returns a copy of the current aspects list to avoid data races. GetAspects 返回当前切面列表的副本以避免数据竞争。
func (*ChainAggregationEngine) GetMetrics ¶
func (e *ChainAggregationEngine) GetMetrics() *metrics.EngineMetrics
GetMetrics returns engine metrics if the metrics aspect is enabled. GetMetrics 如果启用了指标切面,则返回引擎指标。
func (*ChainAggregationEngine) Name ¶
func (e *ChainAggregationEngine) Name() string
Name 返回规则引擎实例的唯一标识符。
func (*ChainAggregationEngine) OnMsg ¶
OnMsg asynchronously processes a message using the rule engine. It accepts optional RuleContextOption parameters to customize the execution context.
OnMsg 使用规则引擎异步处理消息。 它接受可选的 RuleContextOption 参数来自定义执行上下文。
func (*ChainAggregationEngine) ReloadSelf ¶
func (e *ChainAggregationEngine) ReloadSelf(dsl []byte) error
ReloadSelf reloads the rule chain with new definition and options. This method supports hot reloading of rule configurations without stopping the engine. It implements a two-phase graceful reload process:
Phase 1: Preparation (设置阶段) - Apply configuration options 应用配置选项 - Wait for any ongoing reload to complete 等待任何正在进行的重载完成 - Set reloading state to block new messages 设置重载状态以阻塞新消息 - Wait for active messages to complete 等待活跃消息完成
Phase 2: Reload (重载阶段) - Parse new rule chain definition 解析新的规则链定义 - Update or create rule chain context 更新或创建规则链上下文 - Update atomic aspect pointers 更新原子切面指针 - Resume normal operation 恢复正常运行
ReloadSelf 使用新定义和选项重新加载规则链。 此方法支持在不停止引擎的情况下热重载规则配置。 它实现了两阶段优雅重载过程:
Parameters: 参数:
- dsl: Rule chain definition in byte format 字节格式的规则链定义
- opts: Optional configuration functions 可选的配置函数
Returns: 返回:
- error: Reload error if any 如果有的话,重载错误
func (*ChainAggregationEngine) SetAspects ¶
func (e *ChainAggregationEngine) SetAspects(aspects ...types.Aspect)
SetAspects 更新规则引擎使用的切面列表。 切面提供如日志和验证等横切功能。
func (*ChainAggregationEngine) SetConfig ¶
func (e *ChainAggregationEngine) SetConfig(config types.Config)
SetConfig 更新规则引擎的配置。 为了获得最佳效果,应在初始化前调用。
func (*ChainAggregationEngine) Stop ¶
func (e *ChainAggregationEngine) Stop()
Stop 关闭规则引擎并释放所有资源。 实现两阶段优雅停机策略:
func (*ChainAggregationEngine) TerminalOnErr ¶
func (rc *ChainAggregationEngine) TerminalOnErr() bool
TerminalOnErr 是否出错推出
type ChainCtx ¶
type ChainCtx struct {
// contains filtered or unexported fields
}
func InitChainCtx ¶
func (*ChainCtx) Destroy ¶
func (rc *ChainCtx) Destroy()
Destroy cleans up resources and executes destroy aspects
func (*ChainCtx) GetNodeById ¶
GetNodeById retrieves a node context by its ID
func (*ChainCtx) GetNodeRoutes ¶
func (rc *ChainCtx) GetNodeRoutes(id string) ([]types.RuleNodeRelation, bool)
GetNodeRoutes retrieves the routes for a given node ID
func (*ChainCtx) TerminalOnErr ¶
GetNodeById retrieves a node context by its ID
type ChainEngine ¶
type ChainEngine struct {
// contains filtered or unexported fields
}
func (*ChainEngine) DSL ¶
func (e *ChainEngine) DSL() []byte
DSL returns the current rule chain configuration in its original format. DSL 返回原始格式的当前规则链配置。
func (*ChainEngine) GetAspects ¶
func (e *ChainEngine) GetAspects() types.AspectList
GetAspects returns a copy of the current aspects list to avoid data races. GetAspects 返回当前切面列表的副本以避免数据竞争。
func (*ChainEngine) Id ¶
func (e *ChainEngine) Id() string
Id returns the unique identifier of the rule engine instance. Id 返回规则引擎实例的唯一标识符。
func (*ChainEngine) Name ¶
func (rc *ChainEngine) Name() string
GetNodeById retrieves a node context by its ID
func (*ChainEngine) OnMsg ¶
OnMsg asynchronously processes a message using the rule engine. It accepts optional RuleContextOption parameters to customize the execution context.
OnMsg 使用规则引擎异步处理消息。 它接受可选的 RuleContextOption 参数来自定义执行上下文。
func (*ChainEngine) ReloadSelf ¶
func (e *ChainEngine) ReloadSelf(dsl []byte) error
ReloadSelf reloads the rule chain with new definition and options. This method supports hot reloading of rule configurations without stopping the engine. It implements a two-phase graceful reload process:
Phase 1: Preparation (设置阶段) - Apply configuration options 应用配置选项 - Wait for any ongoing reload to complete 等待任何正在进行的重载完成 - Set reloading state to block new messages 设置重载状态以阻塞新消息 - Wait for active messages to complete 等待活跃消息完成
Phase 2: Reload (重载阶段) - Parse new rule chain definition 解析新的规则链定义 - Update or create rule chain context 更新或创建规则链上下文 - Update atomic aspect pointers 更新原子切面指针 - Resume normal operation 恢复正常运行
ReloadSelf 使用新定义和选项重新加载规则链。 此方法支持在不停止引擎的情况下热重载规则配置。 它实现了两阶段优雅重载过程:
Parameters: 参数:
- dsl: Rule chain definition in byte format 字节格式的规则链定义
- opts: Optional configuration functions 可选的配置函数
Returns: 返回:
- error: Reload error if any 如果有的话,重载错误
func (*ChainEngine) SetAspects ¶
func (e *ChainEngine) SetAspects(aspects ...types.Aspect)
SetAspects updates the list of aspects used by the rule engine. Aspects provide cross-cutting functionality like logging and validation. SetAspects 更新规则引擎使用的切面列表。 切面提供如日志和验证等横切功能。
func (*ChainEngine) SetConfig ¶
func (e *ChainEngine) SetConfig(config types.Config)
SetConfig updates the configuration of the rule engine. This should be called before initialization for best results. SetConfig 更新规则引擎的配置。 为了获得最佳效果,应在初始化前调用。
func (*ChainEngine) TerminalOnErr ¶
func (rc *ChainEngine) TerminalOnErr() bool
GetNodeById retrieves a node context by its ID
type DefaultChainContext ¶
type DefaultChainContext struct {
// contains filtered or unexported fields
}
DefaultRuleContext is the default context for message processing in the rule engine.
func NewChainContext ¶
func NewChainContext(self types.ChainCtx) *DefaultChainContext
NewRuleContext creates a new instance of the default rule engine message processing context.
func (*DefaultChainContext) From ¶
func (ctx *DefaultChainContext) From() types.NodeCtx
func (*DefaultChainContext) GetSelfId ¶
func (rCtx *DefaultChainContext) GetSelfId() string
func (*DefaultChainContext) Self ¶
func (ctx *DefaultChainContext) Self() types.NodeCtx
type JsonParser ¶
type JsonParser struct {
}
JsonParser Json
func (*JsonParser) DecodeChain ¶
func (p *JsonParser) DecodeChain(chainDef []byte) (types.Chain, error)
DecodeRuleChain 通过json解析规则链结构体
func (*JsonParser) DecodeChainAggregation ¶
func (p *JsonParser) DecodeChainAggregation(chainAggregationDef []byte) (types.ChainAggregation, error)
DecodeRuleChain 通过json解析规则链结构体
func (*JsonParser) DecodeRule ¶
func (p *JsonParser) DecodeRule(ruleDef []byte) (types.BaseInfo, error)
DecodeRuleNode 通过json解析节点结构体
func (*JsonParser) EncodeChain ¶
func (p *JsonParser) EncodeChain(def interface{}) ([]byte, error)
func (*JsonParser) EncodeChainAggregation ¶
func (p *JsonParser) EncodeChainAggregation(def interface{}) ([]byte, error)
func (*JsonParser) EncodeRule ¶
func (p *JsonParser) EncodeRule(def interface{}) ([]byte, error)
type RuleComponentRegistry ¶
type RuleComponentRegistry struct {
// RWMutex is a read/write mutex lock.
sync.RWMutex
// contains filtered or unexported fields
}
RuleComponentRegistry is a registry for rule engine components.
func (*RuleComponentRegistry) GetComponents ¶
func (r *RuleComponentRegistry) GetComponents() map[types.NodeType]types.Node
GetComponents returns a map of all registered components.
func (*RuleComponentRegistry) NewNode ¶
NewNode creates a new instance of a rule engine node component by its type.
func (*RuleComponentRegistry) Register ¶
func (r *RuleComponentRegistry) Register(node types.Node) error
Register adds a rule engine node component to the registry.
func (*RuleComponentRegistry) Unregister ¶
func (r *RuleComponentRegistry) Unregister(componentType types.NodeType) error
Unregister removes a component from the registry by its type or plugin name.
type RuleNodeCtx ¶
type RuleNodeCtx struct {
// types.Node is the embedded node implementation providing the core functionality.
// This embedding allows RuleNodeCtx to act as a node while adding wrapper capabilities.
// types.Node 是嵌入的节点实现,提供核心功能。
// 这种嵌入允许 RuleNodeCtx 在添加包装器功能的同时充当节点。
types.Node
// contains filtered or unexported fields
}
RuleNodeCtx represents an instance of a node component within the rule engine. It acts as a wrapper around the actual node implementation, providing additional context and metadata required for rule chain execution.
RuleNodeCtx 表示规则引擎中节点组件的实例。 它充当实际节点实现的包装器,提供规则链执行所需的额外上下文和元数据。
Architecture: 架构:
RuleNodeCtx embeds the types.Node interface, allowing it to act as both a node wrapper and a node implementation. This design provides: RuleNodeCtx 嵌入 types.Node 接口,允许它既充当节点包装器又充当节点实现。 此设计提供: - Direct access to node methods through interface embedding 通过接口嵌入直接访问节点方法 - Additional context and configuration management 额外的上下文和配置管理 - Thread-safe operations with mutex protection 使用互斥锁保护的线程安全操作 - Hot reloading capabilities 热重载功能
func InitRuleNodeCtx ¶
func InitRuleNodeCtx(config types.Config, chainCtx *ChainCtx, aspects types.AspectList, selfDefinition *types.BaseInfo) (*RuleNodeCtx, error)
InitRuleNodeCtx initializes a RuleNodeCtx with the given parameters. This is the standard initialization function for regular nodes without network resources.
InitRuleNodeCtx 使用给定参数初始化 RuleNodeCtx。 这是不带网络资源的常规节点的标准初始化函数。
Parameters: 参数:
- config: Global rule engine configuration 全局规则引擎配置
- chainCtx: Parent rule chain context 父规则链上下文
- aspects: List of AOP aspects to apply 要应用的 AOP 切面列表
- selfDefinition: Node definition and configuration 节点定义和配置
Returns: 返回:
- *RuleNodeCtx: Initialized node context 已初始化的节点上下文
- error: Initialization error if any 如果有的话,初始化错误
func (*RuleNodeCtx) Config ¶
func (rn *RuleNodeCtx) Config() types.Config
Config returns the configuration of the rule engine.
func (*RuleNodeCtx) DSL ¶
func (rn *RuleNodeCtx) DSL() []byte
DSL returns the DSL representation of the node.
func (*RuleNodeCtx) Destroy ¶
func (rn *RuleNodeCtx) Destroy()
Destroy safely destroys the embedded node
func (*RuleNodeCtx) Name ¶
func (rn *RuleNodeCtx) Name() string
GetNodeId returns the ID of the node.
func (*RuleNodeCtx) OnMsg ¶
OnMsg 提供并发安全的消息处理,保护内嵌Node访问 OnMsg provides concurrent-safe message processing with protected access to the embedded Node. This method ensures thread safety during message processing by using read locks to protect against concurrent modifications during hot reloads.
OnMsg 提供并发安全的消息处理,通过使用读锁保护嵌入的 Node 访问。 此方法通过使用读锁防止热重载期间的并发修改,确保消息处理期间的线程安全。
Parameters: 参数:
- ctx: Rule context for message processing 用于消息处理的规则上下文
- msg: Message to be processed 要处理的消息
func (*RuleNodeCtx) ReloadSelf ¶
func (rn *RuleNodeCtx) ReloadSelf(_ []byte) error
ReloadSelf reloads the node from a byte slice definition.
func (*RuleNodeCtx) TerminalOnErr ¶
func (rc *RuleNodeCtx) TerminalOnErr() bool
GetNodeById retrieves a node context by its ID