rulego

package module
v0.0.0-...-d2e3077 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

README

RuleGo

English| 中文

RuleGo is a lightweight, high-performance, embedded orchestration rule engine based on Go language. It is also a flexible and highly customizable event processing framework. It can aggregate, distribute, filter, transform, enrich and execute various actions on input messages.

This project is largely inspired by thingsboard .

Documentation

RuleGo documentation is hosted on: rulego.cc .

Features

  • Development language: Go 1.18+
  • Lightweight: No external middleware dependencies, can efficiently process and link data on low-cost devices, suitable for IoT edge computing.
  • High performance: Thanks to the high-performance characteristics of Go, in addition, RuleGo adopts technologies such as coroutine pool and object pool. For 10W data processing JS script filtering->JS script data processing->HTTP push, the average time is 9 seconds.
  • Embedded: Support embedding RuleGo into existing projects, non-intrusively utilizing its features.
  • Componentized: All business logic is componentized and can be flexibly configured and reused.
  • Rule chain: You can flexibly combine and reuse different components to achieve highly customizable and scalable business processes.
  • Process orchestration: Support dynamic orchestration of rule chains, you can encapsulate your business into RuleGo components, and then achieve your highly changing business needs by building blocks.
  • Easy to extend: Provide rich and flexible extension interfaces and hooks, such as: custom components, component registration management, rule chain DSL parser, coroutine pool, rule node message inflow/outflow callback, rule chain processing end callback.
  • Dynamic loading: Support dynamic loading of components and extension components through Go plugin.
  • Built-in common components: Message type Switch,JavaScript Switch,JavaScript filter,JavaScript converter,HTTP push,MQTT push,Send email,Log record and other components. You can extend other components by yourself.
  • Context isolation mechanism: Reliable context isolation mechanism, no need to worry about data streaming in high concurrency situations.

Use Cases

RuleGo is a rule engine based on orchestration, which is best at decoupling your system.

  • If your system is complex and bloated with code
  • If your business scenario is highly customized or frequently changed
  • If your system needs to interface with a large number of third-party systems or protocols
  • Or you need an end-to-end IoT solution
  • Or you need to process data from heterogeneous systems centrally
  • Or you want to try hot deployment in Go language... Then RuleGo framework will be a very good solution.
Typical use cases
  • Edge computing: For example: You can deploy RuleGo on the edge server, preprocess, filter, aggregate or calculate the data before reporting it to the cloud. The data processing rules and distribution rules can be dynamically configured and modified through the rule chain without restarting the system.
  • Internet of Things: For example: Collect device data reporting, and after the rule judgment of the rule chain, trigger one or more actions, such as: send email, send alarm, and link with other devices or systems.
  • Data distribution: For example: You can distribute data to different systems according to different message types, such as HTTP, MQTT or gRPC.
  • Application integration: Use RuleGo as a glue to various different systems or protocols, such as: kafka, message queue, database, chatGPT, third-party systems.
  • Data processing from heterogeneous systems: For example: Receive data from different data sources (such as MQTT, HTTP, etc.), and then filter, format conversion, and then distribute to databases, business systems or dashboards.
  • Highly customized business: For example: Decouple highly customized or frequently changed business and hand it over to RuleGo rule chain for management. Business requirements change without restarting the main program.
  • Complex business orchestration: For example: Encapsulate the business into custom components, and use RuleGo to orchestrate and drive these custom components, and support dynamic adjustment.
  • Microservice orchestration: For example: Use RuleGo to orchestrate and drive microservices, or dynamically call third-party services to process business and return results.
  • Business code and business logic decoupling: For example: User points calculation system, risk control system.
  • Flexible configuration and highly customized event processing framework: For example: Asynchronously or synchronously process different message types.
  • Automation: For example, process automation systems, marketing automation systems.

Installation

Use the go get command to install RuleGo:

go get github.com/xyzbit/rulego

Usage

Use Json format to define the rule chain DSL:
The following example defines 3 rule nodes, and the rule chain logic is as follows: (For more examples, refer to testcases/)

{
  "ruleChain": {
    "name": "Test rule chain",
    "root": true,
    "debugMode": false
  },
  "metadata": {
    "nodes": [
      {
        "id": "s1",
        "type": "jsFilter",
        "name": "Filtering Data",
        "debugMode": true,
        "configuration": {
          "jsScript": "return msg!='bb';"
        }
      },
      {
        "id": "s2",
        "type": "jsTransform",
        "name": "Transform Data",
        "debugMode": true,
        "configuration": {
          "jsScript": "metadata['test']='test02';\n metadata['index']=50;\n msgType='TEST_MSG_TYPE2';\n var msg2=JSON.parse(msg);\n msg2['aa']=66;\n return {'msg':msg2,'metadata':metadata,'msgType':msgType};"
        }
      },
      {
        "id": "s3",
        "type": "restApiCall",
        "name": "Call Rest Api Push Data",
        "debugMode": true,
        "configuration": {
          "restEndpointUrlPattern": "http://192.168.216.21:9099/api/socket/msg",
          "requestMethod": "POST",
          "maxParallelRequestsCount": 200
        }
      }
    ],
    "connections": [
      {
        "fromId": "s1",
        "toId": "s2",
        "type": "True"
      },
      {
        "fromId": "s2",
        "toId": "s3",
        "type": "Success"
      }
    ],
    "ruleChainConnections": null
  }
}

Description:

  • ruleChain: The root object of the rule chain definition, which contains the following fields:
    • name: The name of the rule chain, which can be any string.
    • root: A boolean value indicating whether this rule chain is the root rule chain or a sub-rule chain. Only one root rule chain is allowed per rule engine instance.
    • debugMode: A boolean value indicating whether this rule chain is in debug mode or not. If true, the debug callback function will be triggered when the rule chain processes messages.
  • metadata: An object that contains the information of the nodes and connections in the rule chain, which has the following fields:
    • nodes: An array of objects, each representing a rule node in the rule chain. Each node object has the following fields:
      • id: A unique identifier for the node, which can be any string.
      • type: The type of the node, which determines the logic and behavior of the node. It should match one of the registered node types in the rule engine.
      • name: The name of the node, which can be any string.
      • debugMode: A boolean value indicating whether this node is in debug mode or not. If true, the debug callback function will be triggered when the node processes messages.
      • configuration: An object that contains the configuration parameters for the node, which vary depending on the node type. For example, a JS filter node may have a jsScript field that defines the filtering logic, while a REST API call node may have a restEndpointUrlPattern field that defines the URL to call.
    • connections: An array of objects, each representing a connection between two nodes in the rule chain. Each connection object has the following fields:
      • fromId: The id of the source node of the connection, which should match one of the node ids in the nodes array.
      • toId: The id of the destination node of the connection, which should match one of the node ids in the nodes array.
      • type: The type of the connection, which determines when and how messages are sent from one node to another. It should match one of the supported connection types by the source node type. For example, a JS filter node may support two connection types: "True" and "False", indicating whether messages pass or fail the filter condition.
    • ruleChainConnections: An array of objects, each representing a connection between a node and a sub-rule chain in the rule chain. Each rule chain connection object has the following fields:
      • fromId: The id of the source node of the connection, which should match one of the node ids in the nodes array.
      • toId: The id of the destination sub-rule chain of the connection, which should match one of the registered sub-rule chains in the rule engine.
      • type: The type of the connection, which determines when and how messages are sent from one node to another. It should match one of the supported connection types by the source node type.

Import the RuleGo package and create a rule engine instance:

import "github.com/xyzbit/rulego"

//Create a rule engine instance, each rule engine instance has only one root rule chain
ruleEngine, err := rulego.New("rule01", []byte(ruleFile))

Give the message, message type, and message metadata to the rule engine instance for processing:

//Define message metadata
metaData := types.NewMetadata()
metaData.PutValue("productType", "test01")
//Define message and message type
msg := types.NewMsg(0, "TELEMETRY_MSG", types.JSON, metaData, "{\"temperature\":35}")

//Give the message to the rule engine for processing
//The engine will process data based on the configuration of the rule chain, which supports hot updates
ruleEngine.OnMsg(msg)

Update rule chain

//Update root rule chain
err := ruleEngine.ReloadSelf([]byte(ruleFile))
//Update a node of the rule chain
ruleEngine.ReloadChild("rule_chain_test", nodeFile)

Rule engine instance management:

//Get the created rule engine instance by ID
ruleEngine, ok := rulego.Get("rule01")
//Delete the created rule engine instance
rulego.Del("rule01")
Configuration

See types.Config for details

//Create a default configuration
config := rulego.NewConfig()
//Debug node callback, the node configuration must be configured with debugMode:true to trigger the call
//Both node input and output information will call this callback function
config.OnDebug = func (flowType string, nodeId string, msg types.RuleMsg, relationType string, err error) {
}
//Global rule chain end callback
//If you just want to call for a single message, use the ruleEngine.OnMsgWithOptions method
//Note: If the rule chain has multiple branch endpoints, it will be called multiple times
config.OnEnd = func (msg types.RuleMsg, err error) {
}
//Use Configuration 
ruleEngine, err := rulego.New("rule01", []byte(ruleFile), rulego.WithConfig(config))

About rule chain

Rule node

Rule node is the basic component of the rule engine, it processes a single incoming message at a time and generates one or more outgoing messages. Rule node is the main logic unit of the rule engine. Rule nodes can filter, enrich, transform incoming messages, execute actions or communicate with external systems. You can easily encapsulate your business into RuleGo node components, and then flexibly configure and reuse them, like building blocks to achieve your business needs.

Rule chain

Rule chain is a logical group of rule nodes and their relationTypes. It receives outbound messages from nodes and sends them to the next node or nodes through a specified relationship. Here are some common rule chain examples:

Sequential execution:

Asynchronous + sequential execution:

Using sub-rule chain method:

Some complex examples:

Data Integration

RuleGo provides Endpoint module for unified data integration and processing of heterogeneous systems.
For more details, please refer to: Endpoint

Performance

RuleGo almost does not increase system overhead, resource consumption is extremely low, because it uses object coroutine pool and object pool, even higher performance than directly calling business methods, especially suitable for running on edge servers.


Machine: Raspberry Pi 2 (900MHz Cortex-A7*4,1GB LPDDR2)
Data size: 260B
Rule chain: JS script filtering->JS complex transformation->HTTP push
Test results: 100 concurrent and 500 concurrent, memory consumption does not change much around 19M

Contribution

Any form of contribution is welcome, including submitting issues, suggestions, documentation, tests or code. Please follow these steps:

  • Clone the project repository to your local machine
  • Create a new branch and make modifications
  • Submit a merge request to the main branch
  • Wait for review and feedback

License

RuleGo uses Apache 2.0 license, please refer to LICENSE file for details.

Documentation

Index

Constants

View Source
const PluginsSymbol = "Plugins"

PluginsSymbol 插件检查点 Symbol

Variables

View Source
var DefaultRuleGo = &RuleGo{}

Registry 规则引擎组件默认注册器

Functions

func Del

func Del(id string)

Del 删除指定ID规则引擎实例

func Load

func Load(folderPath string, opts ...RuleEngineOption) error

Load 加载指定文件夹及其子文件夹所有规则链配置(与.json结尾文件),到规则引擎实例池 规则链ID,使用文件配置的 ruleChain.id

func NewConfig

func NewConfig(opts ...types.Option) types.Config

NewConfig creates a new Config and applies the options.

func OnMsg

func OnMsg(msg types.RuleMsg)

OnMsg 调用所有规则引擎实例处理消息 规则引擎实例池所有规则链都会去尝试处理该消息

func Stop

func Stop()

Stop 释放所有规则引擎实例

Types

type DefaultRuleContext

type DefaultRuleContext struct {
	// contains filtered or unexported fields
}

DefaultRuleContext 默认规则引擎消息处理上下文

func NewRuleContext

func NewRuleContext(config types.Config, ruleChainCtx *RuleChainCtx, from types.NodeCtx, self types.NodeCtx, pool types.Pool, onEnd func(msg types.RuleMsg, err error), context context.Context) *DefaultRuleContext

NewRuleContext 创建一个默认规则引擎消息处理上下文实例

func (*DefaultRuleContext) Config

func (ctx *DefaultRuleContext) Config() types.Config

func (*DefaultRuleContext) GetContext

func (ctx *DefaultRuleContext) GetContext() context.Context

func (*DefaultRuleContext) GetEndFunc

func (ctx *DefaultRuleContext) GetEndFunc() func(msg types.RuleMsg, err error)

func (*DefaultRuleContext) GetSelfId

func (ctx *DefaultRuleContext) GetSelfId() string

func (*DefaultRuleContext) NewMsg

func (ctx *DefaultRuleContext) NewMsg(msgType string, metaData types.Metadata, data string) types.RuleMsg

func (*DefaultRuleContext) NewNextNodeRuleContext

func (ctx *DefaultRuleContext) NewNextNodeRuleContext(nextNode types.NodeCtx) *DefaultRuleContext

NewNextNodeRuleContext 创建下一个节点的规则引擎消息处理上下文实例RuleContext

func (*DefaultRuleContext) SetAllCompletedFunc

func (ctx *DefaultRuleContext) SetAllCompletedFunc(f func()) types.RuleContext

func (*DefaultRuleContext) SetContext

func (ctx *DefaultRuleContext) SetContext(c context.Context) types.RuleContext

func (*DefaultRuleContext) SetEndFunc

func (ctx *DefaultRuleContext) SetEndFunc(onEndFunc func(msg types.RuleMsg, err error)) types.RuleContext

func (*DefaultRuleContext) SubmitTack

func (ctx *DefaultRuleContext) SubmitTack(task func())

func (*DefaultRuleContext) TellFailure

func (ctx *DefaultRuleContext) TellFailure(msg types.RuleMsg, err error)

func (*DefaultRuleContext) TellNext

func (ctx *DefaultRuleContext) TellNext(msg types.RuleMsg, relationTypes ...string)

func (*DefaultRuleContext) TellSelf

func (ctx *DefaultRuleContext) TellSelf(msg types.RuleMsg, delayMs int64)

func (*DefaultRuleContext) TellSuccess

func (ctx *DefaultRuleContext) TellSuccess(msg types.RuleMsg)

type JsonParser

type JsonParser struct{}

JsonParser Json

func (*JsonParser) DecodeRuleChain

func (p *JsonParser) DecodeRuleChain(config types.Config, dsl []byte) (types.Node, error)

func (*JsonParser) DecodeRuleNode

func (p *JsonParser) DecodeRuleNode(config types.Config, dsl []byte) (types.Node, error)

func (*JsonParser) EncodeRuleChain

func (p *JsonParser) EncodeRuleChain(def interface{}) ([]byte, error)

func (*JsonParser) EncodeRuleNode

func (p *JsonParser) EncodeRuleNode(def interface{}) ([]byte, error)

type NodeAdditionalInfo

type NodeAdditionalInfo struct {
	Description string `json:"description"`
	LayoutX     int    `json:"layoutX"`
	LayoutY     int    `json:"layoutY"`
}

NodeAdditionalInfo 用于可视化位置信息(预留字段)

type NodeConnection

type NodeConnection struct {
	// 连接的源节点的id,应该与nodes数组中的某个节点id匹配。
	FromId string `json:"fromId"`
	// 连接的目标节点的id,应该与nodes数组中的某个节点id匹配
	ToId string `json:"toId"`
	// 连接的类型,决定了什么时候以及如何把消息从一个节点发送到另一个节点。它应该与源节点类型支持的连接类型之一匹配。
	// 例如,一个JS过滤器节点可能支持两种连接类型:"True"和"False",表示消息是否通过或者失败过滤条件。
	Type string `json:"type"`
}

NodeConnection 规则链节点连接定义 每个对象代表规则链中两个节点之间的连接

type PluginComponentRegistry

type PluginComponentRegistry struct {
	// contains filtered or unexported fields
}

PluginComponentRegistry go plugin组件初始化器

func (*PluginComponentRegistry) Components

func (p *PluginComponentRegistry) Components() []types.Node

func (*PluginComponentRegistry) Init

func (p *PluginComponentRegistry) Init() error

type RelationCache

type RelationCache struct {
	// contains filtered or unexported fields
}

type RuleChain

type RuleChain struct {
	// 规则链基础信息定义
	RuleChain RuleChainBaseInfo `json:"ruleChain"`
	// 包含了规则链中节点和连接的信息
	Metadata RuleMetadata `json:"metadata"`
}

RuleChain 规则链定义

func ParserRuleChain

func ParserRuleChain(rootRuleChain []byte) (RuleChain, error)

ParserRuleChain 通过json解析规则链结构体

type RuleChainBaseInfo

type RuleChainBaseInfo struct {
	// 规则链ID
	ID string `json:"id"`
	// 扩展字段
	AdditionalInfo map[string]string `json:"additionalInfo"`
	// Name 规则链的名称
	Name string `json:"name"`
	// 表示这个节点是否处于调试模式。如果为真,当节点处理消息时,会触发调试回调函数。
	// 优先使用子节点的DebugMode配置
	DebugMode bool `json:"debugMode"`
	// Root 表示这个规则链是根规则链还是子规则链。(只做标记使用,非应用在实际逻辑)
	Root bool `json:"root"`
	// Configuration 规则链配置信息
	Configuration types.Configuration `json:"configuration"`
}

RuleChainBaseInfo 规则链基础信息定义

type RuleChainConnection

type RuleChainConnection struct {
	// 连接的源节点的id,应该与nodes数组中的某个节点id匹配。
	FromId string `json:"fromId"`
	// 连接的目标子规则链的id,应该与规则引擎中注册的子规则链之一匹配。
	ToId string `json:"toId"`
	// 连接的类型,决定了什么时候以及如何把消息从一个节点发送到另一个节点。它应该与源节点类型支持的连接类型之一匹配。
	Type string `json:"type"`
}

RuleChainConnection 子规则链连接定义 每个对象代表规则链中一个节点和一个子规则链之间的连接

type RuleChainCtx

type RuleChainCtx struct {
	// 节点ID
	Id types.RuleNodeId
	// 规则链定义
	SelfDefinition *RuleChain
	// 规则引擎配置
	Config types.Config

	sync.RWMutex
	// contains filtered or unexported fields
}

RuleChainCtx 规则链实例定义 初始化所有节点 记录规则链,所有节点路由关系

func InitRuleChainCtx

func InitRuleChainCtx(config types.Config, ruleChainDef *RuleChain) (*RuleChainCtx, error)

InitRuleChainCtx 初始化RuleChainCtx

func (*RuleChainCtx) Copy

func (rc *RuleChainCtx) Copy(newCtx *RuleChainCtx)

Copy 复制

func (*RuleChainCtx) DSL

func (rc *RuleChainCtx) DSL() []byte

func (*RuleChainCtx) Destroy

func (rc *RuleChainCtx) Destroy()

func (*RuleChainCtx) GetFirstNode

func (rc *RuleChainCtx) GetFirstNode() (types.NodeCtx, bool)

GetFirstNode 获取第一个节点,消息从该节点开始流转。默认是index=0的节点

func (*RuleChainCtx) GetNextNodes

func (rc *RuleChainCtx) GetNextNodes(id types.RuleNodeId, relationType string) ([]types.NodeCtx, bool)

GetNextNodes 获取当前节点指定关系的子节点

func (*RuleChainCtx) GetNodeById

func (rc *RuleChainCtx) GetNodeById(id types.RuleNodeId) (types.NodeCtx, bool)

func (*RuleChainCtx) GetNodeByIndex

func (rc *RuleChainCtx) GetNodeByIndex(index int) (types.NodeCtx, bool)

func (*RuleChainCtx) GetNodeId

func (rc *RuleChainCtx) GetNodeId() types.RuleNodeId

func (*RuleChainCtx) GetNodeRoutes

func (rc *RuleChainCtx) GetNodeRoutes(id types.RuleNodeId) ([]types.RuleNodeRelation, bool)

func (*RuleChainCtx) GetRuleChainPool

func (rc *RuleChainCtx) GetRuleChainPool() *RuleGo

GetRuleChainPool 获取子规则链池

func (*RuleChainCtx) Init

func (rc *RuleChainCtx) Init(_ types.Config, configuration types.Configuration) error

Init 初始化

func (*RuleChainCtx) IsDebugMode

func (rc *RuleChainCtx) IsDebugMode() bool

func (*RuleChainCtx) New

func (rc *RuleChainCtx) New() types.Node

func (*RuleChainCtx) OnMsg

func (rc *RuleChainCtx) OnMsg(ctx types.RuleContext, msg types.RuleMsg) error

OnMsg 处理消息

func (*RuleChainCtx) ReloadChild

func (rc *RuleChainCtx) ReloadChild(ruleNodeId types.RuleNodeId, def []byte) error

func (*RuleChainCtx) ReloadSelf

func (rc *RuleChainCtx) ReloadSelf(def []byte) error

func (*RuleChainCtx) SetRuleChainPool

func (rc *RuleChainCtx) SetRuleChainPool(ruleChainPool *RuleGo)

SetRuleChainPool 设置子规则链池

func (*RuleChainCtx) Type

func (rc *RuleChainCtx) Type() string

Type 组件类型

type RuleComponentRegistry

type RuleComponentRegistry struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

RuleComponentRegistry 组件注册器

func (*RuleComponentRegistry) GetComponentForms

func (r *RuleComponentRegistry) GetComponentForms() types.ComponentFormList

func (*RuleComponentRegistry) GetComponents

func (r *RuleComponentRegistry) GetComponents() map[string]types.Node

func (*RuleComponentRegistry) NewNode

func (r *RuleComponentRegistry) NewNode(nodeType string) (types.Node, error)

NewNode 获取规则引擎节点组件

func (*RuleComponentRegistry) Register

func (r *RuleComponentRegistry) Register(node types.Node) error

Register 注册规则引擎节点组件

func (*RuleComponentRegistry) RegisterPlugin

func (r *RuleComponentRegistry) RegisterPlugin(name string, file string) error

RegisterPlugin 注册规则引擎节点组件

func (*RuleComponentRegistry) Unregister

func (r *RuleComponentRegistry) Unregister(componentType string) error

type RuleEngine

type RuleEngine struct {
	// 规则引擎实例标识
	Id string
	// 配置
	Config types.Config
	// 子规则链池
	RuleChainPool *RuleGo
	// contains filtered or unexported fields
}

RuleEngine 规则引擎 每个规则引擎实例只有一个根规则链,如果没设置规则链则无法处理数据

func Get

func Get(id string) (*RuleEngine, bool)

Get 获取指定ID规则引擎实例

func New

func New(id string, rootRuleChainSrc []byte, opts ...RuleEngineOption) (*RuleEngine, error)

New 创建一个新的RuleEngine并将其存储在RuleGo规则链池中

func (*RuleEngine) DSL

func (e *RuleEngine) DSL() []byte

DSL 获取根规则链配置

func (*RuleEngine) Initialized

func (e *RuleEngine) Initialized() bool

func (*RuleEngine) NodeDSL

func (e *RuleEngine) NodeDSL(chainId types.RuleNodeId, childNodeId types.RuleNodeId) []byte

NodeDSL 获取规则链节点配置

func (*RuleEngine) OnMsg

func (e *RuleEngine) OnMsg(msg types.RuleMsg)

OnMsg 把消息交给规则引擎处理,异步执行 根据规则链节点配置和连接关系处理消息

func (*RuleEngine) OnMsgAndWait

func (e *RuleEngine) OnMsgAndWait(msg types.RuleMsg, opts ...types.RuleContextOption)

OnMsgAndWait 把消息交给规则引擎处理,同步执行,等规则链所有节点执行完,返回

func (*RuleEngine) OnMsgWithEndFunc

func (e *RuleEngine) OnMsgWithEndFunc(msg types.RuleMsg, endFunc func(msg types.RuleMsg, err error))

OnMsgWithEndFunc 把消息交给规则引擎处理,异步执行 endFunc 用于数据经过规则链执行完的回调,用于获取规则链处理结果数据。注意:如果规则链有多个结束点,回调函数则会执行多次

func (*RuleEngine) OnMsgWithOptions

func (e *RuleEngine) OnMsgWithOptions(msg types.RuleMsg, opts ...types.RuleContextOption)

OnMsgWithOptions 把消息交给规则引擎处理,异步执行 可以携带context选项和结束回调选项 context 用于不同组件实例数据共享 endFunc 用于数据经过规则链执行完的回调,用于获取规则链处理结果数据。注意:如果规则链有多个结束点,回调函数则会执行多次

func (*RuleEngine) ReloadChild

func (e *RuleEngine) ReloadChild(ruleNodeId string, dsl []byte) error

ReloadChild 更新根规则链或者其下某个节点 如果ruleNodeId为空更新根规则链,否则更新指定的子节点 dsl 根规则链/子节点配置

func (*RuleEngine) ReloadSelf

func (e *RuleEngine) ReloadSelf(def []byte, opts ...RuleEngineOption) error

ReloadSelf 重新加载规则链

func (*RuleEngine) RootRuleChainCtx

func (e *RuleEngine) RootRuleChainCtx() *RuleChainCtx

RootRuleChainCtx 获取根规则链

func (*RuleEngine) Stop

func (e *RuleEngine) Stop()

type RuleEngineOption

type RuleEngineOption func(*RuleEngine) error

RuleEngineOption is a function type that modifies the RuleEngine.

func WithConfig

func WithConfig(config types.Config) RuleEngineOption

WithConfig is an option that sets the Config of the RuleEngine.

func WithRuleChainPool

func WithRuleChainPool(ruleChainPool *RuleGo) RuleEngineOption

WithRuleChainPool 子规则链池

type RuleGo

type RuleGo struct {
	// contains filtered or unexported fields
}

RuleGo 规则引擎实例池

func (*RuleGo) Del

func (g *RuleGo) Del(id string)

Del 删除指定ID规则引擎实例

func (*RuleGo) Get

func (g *RuleGo) Get(id string) (*RuleEngine, bool)

Get 获取指定ID规则引擎实例

func (*RuleGo) Load

func (g *RuleGo) Load(folderPath string, opts ...RuleEngineOption) error

Load 加载指定文件夹及其子文件夹所有规则链配置(与.json结尾文件),到规则引擎实例池 规则链ID,使用规则链文件配置的ruleChain.id

func (*RuleGo) New

func (g *RuleGo) New(id string, rootRuleChainSrc []byte, opts ...RuleEngineOption) (*RuleEngine, error)

New 创建一个新的RuleEngine并将其存储在RuleGo规则链池中 如果指定id="",则使用规则链文件的ruleChain.id

func (*RuleGo) OnMsg

func (g *RuleGo) OnMsg(msg types.RuleMsg)

OnMsg 调用所有规则引擎实例处理消息 规则引擎实例池所有规则链都会去尝试处理该消息

func (*RuleGo) Stop

func (g *RuleGo) Stop()

Stop 释放所有规则引擎实例

type RuleMetadata

type RuleMetadata struct {
	// 数据流转的第一个节点,默认:0
	FirstNodeIndex int `json:"firstNodeIndex"`
	// 节点组件定义
	// 每个对象代表规则链中的一个规则节点
	Nodes []*RuleNode `json:"nodes"`
	// 连接定义
	// 每个对象代表规则链中两个节点之间的连接
	Connections []NodeConnection `json:"connections"`
	// 子规则链链接
	// 每个对象代表规则链中一个节点和一个子规则链之间的连接
	RuleChainConnections []RuleChainConnection `json:"ruleChainConnections"`
}

RuleMetadata 规则链元数据定义,包含了规则链中节点和连接的信息

type RuleNode

type RuleNode struct {
	// 节点的唯一标识符,可以是任意字符串
	Id string `json:"Id"`
	// 扩展字段
	AdditionalInfo NodeAdditionalInfo `json:"additionalInfo"`
	// 节点的类型,决定了节点的逻辑和行为。它应该与规则引擎中注册的节点类型之一匹配。
	Type string `json:"type"`
	// 节点的名称,可以是任意字符串
	Name string `json:"name"`
	// 表示这个节点是否处于调试模式。如果为真,当节点处理消息时,会触发调试回调函数。
	DebugMode bool `json:"debugMode"`
	// 包含了节点的配置参数,具体内容取决于节点类型。
	// 例如,一个JS过滤器节点可能有一个`jsScript`字段,定义了过滤逻辑,
	// 而一个REST API调用节点可能有一个`restEndpointUrlPattern`字段,定义了要调用的URL。
	Configuration types.Configuration `json:"configuration"`
}

RuleNode 规则链节点信息定义

func ParserRuleNode

func ParserRuleNode(rootRuleChain []byte) (RuleNode, error)

ParserRuleNode 通过json解析节点结构体

type RuleNodeCtx

type RuleNodeCtx struct {
	// 组件实例
	types.Node
	// 组件配置
	SelfDefinition *RuleNode
	// 规则引擎配置
	Config types.Config
}

RuleNodeCtx 节点组件实例定义

func InitRuleNodeCtx

func InitRuleNodeCtx(config types.Config, selfDefinition *RuleNode) (*RuleNodeCtx, error)

InitRuleNodeCtx 初始化RuleNodeCtx

func (*RuleNodeCtx) Copy

func (rn *RuleNodeCtx) Copy(newCtx *RuleNodeCtx)

Copy 复制

func (*RuleNodeCtx) DSL

func (rn *RuleNodeCtx) DSL() []byte

func (*RuleNodeCtx) GetNodeById

func (rn *RuleNodeCtx) GetNodeById(_ types.RuleNodeId) (types.NodeCtx, bool)

func (*RuleNodeCtx) GetNodeId

func (rn *RuleNodeCtx) GetNodeId() types.RuleNodeId

func (*RuleNodeCtx) IsDebugMode

func (rn *RuleNodeCtx) IsDebugMode() bool

func (*RuleNodeCtx) ReloadChild

func (rn *RuleNodeCtx) ReloadChild(_ types.RuleNodeId, _ []byte) error

func (*RuleNodeCtx) ReloadSelf

func (rn *RuleNodeCtx) ReloadSelf(def []byte) error

Directories

Path Synopsis
api
components
js
examples
db_client command
delay_node command
functions_node command
hot_loading command
http_endpoint command
js_transform command
mqtt_client command
msg_type_switch command
node_config command
plugin command
router command
server command
ssh_node command
ui_api command
web_api command
Package pool Note: This file is inspired by: Valyala, A. (2023) workerpool.go (Version 1.48.0) [Source code].
Package pool Note: This file is inspired by: Valyala, A. (2023) workerpool.go (Version 1.48.0) [Source code].
testcases
plugin command
utils
fs
str

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL