types

package
v0.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: Apache-2.0 Imports: 9 Imported by: 10

Documentation

Index

Constants

View Source
const (
	CallbackFuncOnRuleChainCompleted = "onRuleChainCompleted"
	CallbackFuncOnNodeCompleted      = "onNodeCompleted"
	CallbackFuncDebug                = "onDebug"
)
View Source
const (
	Global  = "global"
	Vars    = "vars"
	Secrets = "secrets"
)
View Source
const (
	JSON   = DataType("JSON")
	TEXT   = DataType("TEXT")
	BINARY = DataType("BINARY")
)
View Source
const (
	MsgKey      = "msg"
	MetadataKey = "metadata"
	MsgTypeKey  = "msgType"
	DataTypeKey = "dataType"
)
View Source
const (
	Success = "Success"
	Failure = "Failure"
	True    = "True"
	False   = "False"
)

关系 节点与节点连接的关系,以下是常用的关系,可以自定义 relation types

View Source
const (
	In  = "IN"
	Out = "OUT"
	Log = "Log"
)

flow direction type 流向 消息流入、流出节点方向

View Source
const (
	Js     = "Js"
	Lua    = "Lua"
	Python = "Python"
)

脚本类型

View Source
const ScriptFuncSeparator = "#"

ScriptFuncSeparator 脚本函数名分割符

Variables

View Source
var EmptyRuleNodeId = RuleNodeId{}

EmptyRuleNodeId 空节点ID

View Source
var OnDebug func(ruleChainId string, flowType string, nodeId string, msg RuleMsg, relationType string, err error)

Functions

func DefaultLogger

func DefaultLogger() *log.Logger

DefaultLogger returns a `Logger` implementation

Types

type AfterAspect added in v0.18.0

type AfterAspect interface {
	NodeAspect
	//After is the advice that executes after the node OnMsg method. The returned Msg will be used as the input for the next advice and the next node OnMsg method.
	//After 节点 OnMsg 方法执行之后的增强点。返回的Msg将作为下一个增强点和下一个节点 OnMsg 方法的入参。
	After(ctx RuleContext, msg RuleMsg, err error, relationType string) RuleMsg
}

AfterAspect is the interface for node post-execution advice AfterAspect 节点 OnMsg 方法执行后置增强点接口

type AroundAspect added in v0.18.0

type AroundAspect interface {
	NodeAspect
	//Around is the advice that executes around the node OnMsg method. The returned Msg will be used as the input for the next advice and the next node OnMsg method.
	//Around 节点 OnMsg 方法执行环绕的增强点。返回的Msg将作为下一个增强点和下一个节点 OnMsg 方法的入参。
	//If it returns false: the engine will not call the next node's OnMsg method, and the aspect needs to execute the tellNext method, otherwise the rule chain will not end.
	//如果返回false:引擎不会调用下一个节点的OnMsg方法,需要切面执行tellNext方法,否则规则链不会结束。
	//If it returns true: the engine will call the next node's OnMsg method.
	//如果返回true:引擎会调用下一个节点的OnMsg方法。
	Around(ctx RuleContext, msg RuleMsg, relationType string) (RuleMsg, bool)
}

AroundAspect is the interface for node around-execution advice AroundAspect 节点 OnMsg 方法执行环绕增强点接口

type Aspect added in v0.18.0

type Aspect interface {
	//Order returns the execution order, the smaller the value, the higher the priority
	//Order 返回执行顺序,值越小,优先级越高
	Order() int
}

Aspect is the base interface for advice Aspect 增强点接口的基类

type BeforeAspect added in v0.18.0

type BeforeAspect interface {
	NodeAspect
	// Before is the advice that executes before the node OnMsg method. The returned Msg will be used as the input for the next advice and the node OnMsg method.
	// Before 节点 OnMsg 方法执行之前的增强点。返回的Msg将作为下一个增强点和节点 OnMsg 方法的入参。
	Before(ctx RuleContext, msg RuleMsg, relationType string) RuleMsg
}

BeforeAspect is the interface for node pre-execution advice BeforeAspect 节点 OnMsg 方法执行之前的增强点接口

type CompletedAspect added in v0.18.0

type CompletedAspect interface {
	NodeAspect
	// Completed is the advice that executes after the rule engine OnMsg method and all branch chain execution ends. The returned Msg will be used as the input for the next advice.
	// Completed 规则引擎 OnMsg 方法执行之后,所有分支链执行结束的增强点。返回的Msg将作为下一个增强点的入参。
	Completed(ctx RuleContext, msg RuleMsg) RuleMsg
}

CompletedAspect is the interface for rule engine all branch execution end advice CompletedAspect 规则引擎 OnMsg 方法执行之后,所有分支链执行结束的增强点接口

type ComponentDefGetter added in v0.15.0

type ComponentDefGetter interface {
	Def() ComponentForm
}

ComponentDefGetter 该接口是可选的,组件可以实现该接口,提供可视化需要的信息, 例如:Label,Desc,RelationTypes。否则使用约定规则提供可视化表单定义

type ComponentForm added in v0.15.0

type ComponentForm struct {
	//Type 组件类型
	Type string `json:"type"`
	//Category 组件分类
	Category string `json:"category"`
	//配置字段,获取组件`Config`字段的所有公有字段
	Fields ComponentFormFieldList `json:"fields"`
	//Label 组件展示名称,预留,目前没值
	Label string `json:"label"`
	//Desc 组件说明,预留,目前没值
	Desc string `json:"desc"`
	//Icon 图标,预留,如果没值则取type。
	Icon string `json:"icon"`
	//RelationTypes 和下一个节点能产生的连接名称列表,
	//过滤器节点类型默认是:True/False/Failure;其他节点类型默认是Success/Failure
	//如果是空,表示用户可以自定义连接关系
	RelationTypes *[]string `json:"relationTypes"`
}

ComponentForm 组件表单,用于可视化加载组件表单

type ComponentFormField added in v0.15.0

type ComponentFormField struct {
	//Name 字段名称
	Name string `json:"name"`
	//Type 字段类型
	Type string `json:"type"`
	//默认值,组件实现的方法node.New(), Config对应的字段,提供了默认值会填充到该值
	DefaultValue interface{} `json:"defaultValue"`
	//Label 字段展示名称,通过tag:label获取
	Label string `json:"label"`
	//Desc 字段说明,通过tag:desc获取
	Desc string `json:"desc"`
	//Validate 校验规则,通过tag:validate获取
	Validate string `json:"validate"`
	//Fields 嵌套字段
	Fields ComponentFormFieldList `json:"fields"`
}

ComponentFormField 组件配置字段

type ComponentFormFieldList added in v0.15.0

type ComponentFormFieldList []ComponentFormField

ComponentFormFieldList 字段列表类型

func (ComponentFormFieldList) GetField added in v0.15.0

type ComponentFormList added in v0.15.0

type ComponentFormList map[string]ComponentForm

ComponentFormList 组件表单类别类型

func (ComponentFormList) GetComponent added in v0.15.0

func (c ComponentFormList) GetComponent(name string) (ComponentForm, bool)

func (ComponentFormList) Values added in v0.15.0

func (c ComponentFormList) Values() []ComponentForm

type ComponentRegistry

type ComponentRegistry interface {
	//Register 注册组件,如果`node.Type()`已经存在则返回一个`已存在`错误
	Register(node Node) error
	//RegisterPlugin 通过plugin机制加载外部.so文件注册组件,
	//如果`name`已经存在或者插件提供的组件列表`node.Type()`已经存在则返回一个`已存在`错误
	RegisterPlugin(name string, file string) error
	//Unregister 删除组件或者通过插件名称删除一批组件
	Unregister(componentType string) error
	//NewNode 通过nodeType创建一个新的node实例
	NewNode(nodeType string) (Node, error)
	//GetComponents 获取所有注册组件列表
	GetComponents() map[string]Node
	//GetComponentForms 获取所有注册组件配置表单,用于可视化配置
	GetComponentForms() ComponentFormList
}

ComponentRegistry 节点组件注册器

type ComponentType

type ComponentType int

ComponentType 组件类型:规则节点或者子规则链

const (
	NODE ComponentType = iota
	CHAIN
)

type Config

type Config struct {
	//OnDebug 节点调试信息回调函数,只有节点debugMode=true才会调用
	//ruleChainId 规则链ID
	//flowType IN/OUT,流入(IN)该组件或者流出(OUT)该组件事件类型
	//nodeId 节点ID
	//msg 当前msg
	//relationType 如果flowType=IN,则代表上一个节点和该节点的连接关系,例如(True/False);如果flowType=OUT,则代表该节点和下一个节点的连接关系,例如(True/False)
	//err 错误信息
	OnDebug func(ruleChainId string, flowType string, nodeId string, msg RuleMsg, relationType string, err error)
	//Deprecated
	//使用types.WithEndFunc方式代替
	//OnEnd 规则链执行完成回调函数,如果有多个结束点,则执行多次
	OnEnd func(msg RuleMsg, err error)
	//ScriptMaxExecutionTime 脚本执行超时时间,默认2000毫秒
	ScriptMaxExecutionTime time.Duration
	//Pool 协程池接口
	//如果不配置,则使用 go func 方式
	//默认使用`pool.WorkerPool`。兼容ants协程池,可以使用ants协程池实现
	//例如:
	//	pool, _ := ants.NewPool(math.MaxInt32)
	//	config := rulego.NewConfig(types.WithPool(pool))
	Pool Pool
	//ComponentsRegistry 组件库
	//默认使用`rulego.Registry`
	ComponentsRegistry ComponentRegistry
	//规则链解析接口,默认使用:`rulego.JsonParser`
	Parser Parser
	//Logger 日志记录接口,默认使用:`DefaultLogger()`
	Logger Logger
	//Properties 全局属性,key-value形式
	//规则链节点配置可以通过${global.propertyKey}方式替换Properties值
	//节点初始化时候替换,只替换一次
	Properties Metadata
	//Udf 注册自定义Golang函数和原生脚本,js等脚本引擎运行时可以调用
	//不同脚本类型函数名可以重复
	Udf map[string]interface{}
	//Aspects AOP切面列表
	Aspects []Aspect
	// SecretKey AES-256 32长度密钥,用于解密规则链`Secrets`配置
	SecretKey string
}

Config 规则引擎配置

func NewConfig

func NewConfig(opts ...Option) Config

func (*Config) GetChainAspects added in v0.18.0

func (c *Config) GetChainAspects() ([]StartAspect, []EndAspect, []CompletedAspect)

GetChainAspects 获取规则链执行类型增强点切面列表

func (*Config) GetEngineAspects added in v0.18.0

func (c *Config) GetEngineAspects() ([]OnCreatedAspect, []OnReloadAspect, []OnDestroyAspect)

GetEngineAspects 获取规则引擎类型增强点切面列表

func (*Config) GetNodeAspects added in v0.18.0

func (c *Config) GetNodeAspects() ([]AroundAspect, []BeforeAspect, []AfterAspect)

GetNodeAspects 获取节点执行类型增强点切面列表

func (*Config) RegisterUdf added in v0.15.0

func (c *Config) RegisterUdf(name string, value interface{})

RegisterUdf 注册自定义函数 不同脚本类型函数名可以重复

type Configuration

type Configuration map[string]interface{}

Configuration 组件配置类型

type DataType

type DataType string

DataType 消息数据类型

type EndAspect added in v0.18.0

type EndAspect interface {
	NodeAspect
	// End is the advice that executes after the rule engine OnMsg method and the branch chain execution ends. The returned Msg will be used as the input for the next advice.
	// End 规则引擎 OnMsg 方法执行之后,分支链执行结束的增强点。返回的Msg将作为下一个增强点的入参。
	End(ctx RuleContext, msg RuleMsg, err error, relationType string) RuleMsg
}

EndAspect is the interface for rule engine post-execution advice EndAspect 规则引擎 OnMsg 方法执行之后,分支链执行结束的增强点接口

type JsEngine

type JsEngine interface {
	//Execute 执行js脚本指定函数,js脚本在JsEngine实例化的时候进行初始化
	//functionName 执行的函数名
	//argumentList 函数参数列表
	Execute(functionName string, argumentList ...interface{}) (interface{}, error)
	//Stop 释放js引擎资源
	Stop()
}

JsEngine JavaScript脚本引擎

type Logger

type Logger interface {
	Printf(format string, v ...interface{})
}

func NewLogger

func NewLogger(custom Logger) Logger

type Metadata

type Metadata map[string]string

Metadata 规则引擎消息元数据

func BuildMetadata

func BuildMetadata(data Metadata) Metadata

BuildMetadata 通过map,创建一个新的规则引擎消息元数据实例

func NewMetadata

func NewMetadata() Metadata

NewMetadata 创建一个新的规则引擎消息元数据实例

func (Metadata) Copy

func (md Metadata) Copy() Metadata

Copy 复制

func (Metadata) GetValue

func (md Metadata) GetValue(key string) string

GetValue 通过key获取值

func (Metadata) Has added in v0.13.0

func (md Metadata) Has(key string) bool

Has 是否存在某个key

func (Metadata) PutValue

func (md Metadata) PutValue(key, value string)

PutValue 设置值

func (Metadata) Values

func (md Metadata) Values() map[string]string

Values 获取所有值

type Node

type Node interface {
	//New 创建一个组件新实例
	//每个规则链里的规则节点都会创建一个新的实例,数据是独立的
	New() Node
	//Type 组件类型,类型不能重复。
	//用于规则链,node.type配置,初始化对应的组件
	//建议使用`/`区分命名空间,防止冲突。例如:x/httpClient
	Type() string
	//Init 组件初始化,一般做一些组件参数配置或者客户端初始化操作
	//规则链里的规则节点初始化会调用一次
	Init(ruleConfig Config, configuration Configuration) error
	//OnMsg 处理消息,每条流入组件的数据会经过该函数处理
	//ctx:规则引擎处理消息上下文
	//msg:消息
	//执行完逻辑后,调用ctx.TellSuccess/ctx.TellFailure/ctx.TellNext通知下一个节点,否则会导致规则链无法结束
	OnMsg(ctx RuleContext, msg RuleMsg)
	//Destroy 销毁,做一些资源释放操作
	Destroy()
}

Node 规则引擎节点组件接口 把业务封或者通用逻辑装成组件,然后通过规则链配置方式调用该组件 实现方式参考`components`包 然后注册到`RuleGo`默认注册器 rulego.Registry.Register(&MyNode{})

type NodeAdditionalInfo added in v0.20.0

type NodeAdditionalInfo struct {
	Description string `json:"description"`
	LayoutX     int    `json:"layoutX"`
	LayoutY     int    `json:"layoutY"`
}

NodeAdditionalInfo 用于可视化位置信息(预留字段)

type NodeAspect added in v0.18.0

type NodeAspect interface {
	Aspect
	//PointCut declares a cut-in point, used to determine whether to execute the advice
	//PointCut 声明一个切入点,用于判断是否需要执行增强点
	//For example: specify some component types or relationType to execute the aspect logic;return ctx.Self().Type()=="mqttClient"
	//例如:指定某些组件类型或者relationType才执行切面逻辑;return ctx.Self().Type()=="mqttClient"
	PointCut(ctx RuleContext, msg RuleMsg, relationType string) bool
}

NodeAspect is the base interface for node advice NodeAspect 节点增强点接口的基类

type NodeConnection added in v0.20.0

type NodeConnection struct {
	//连接的源节点的id,应该与nodes数组中的某个节点id匹配。
	FromId string `json:"fromId"`
	//连接的目标节点的id,应该与nodes数组中的某个节点id匹配
	ToId string `json:"toId"`
	//连接的类型,决定了什么时候以及如何把消息从一个节点发送到另一个节点。它应该与源节点类型支持的连接类型之一匹配。
	//例如,一个JS过滤器节点可能支持两种连接类型:"True"和"False",表示消息是否通过或者失败过滤条件。
	Type string `json:"type"`
}

NodeConnection 规则链节点连接定义 每个对象代表规则链中两个节点之间的连接

type NodeCtx

type NodeCtx interface {
	Node
	//IsDebugMode 该节点是否是调试模式
	//True:消息流入和流出该节点,会调用config.OnDebug回调函数,否则不会
	IsDebugMode() bool
	//GetNodeId 获取组件ID
	GetNodeId() RuleNodeId
	//ReloadSelf 刷新该组件配置
	ReloadSelf(def []byte) error
	//ReloadChild
	//如果是子规则链类型,则刷新该子规则链指定ID组件配置
	//如果是节点类型,则不支持该方法
	ReloadChild(nodeId RuleNodeId, def []byte) error
	//GetNodeById
	//如果是子规则链类型,则获取该子规则链指定ID组件配置
	//如果是节点类型,则不支持该方法
	GetNodeById(nodeId RuleNodeId) (NodeCtx, bool)
	//DSL 返回该节点配置DSL
	DSL() []byte
}

NodeCtx 规则节点实例化上下文

type OnCreatedAspect added in v0.18.0

type OnCreatedAspect interface {
	Aspect
	// OnCreated is the advice that executes after the rule engine is successfully created.
	// OnCreated 规则引擎成功创建之后的增强点
	OnCreated(chainCtx NodeCtx)
}

OnCreatedAspect is the interface for rule engine creation success advice OnCreatedAspect 规则引擎成功创建之后增强点接口

type OnDestroyAspect added in v0.18.0

type OnDestroyAspect interface {
	Aspect
	// OnDestroy is the advice that executes after the rule engine instance is destroyed.
	// OnDestroy 规则引擎实例销毁执行之后增强点
	OnDestroy(chainCtx NodeCtx)
}

OnDestroyAspect is the interface for rule engine instance destruction advice OnDestroyAspect 规则引擎实例销毁执行之后增强点接口

type OnEndFunc added in v0.17.0

type OnEndFunc = func(ctx RuleContext, msg RuleMsg, err error, relationType string)

OnEndFunc 规则链分支执行完函数

type OnReloadAspect added in v0.18.0

type OnReloadAspect interface {
	Aspect
	// OnReload is the advice that executes after the rule engine reloads the rule chain or child node configuration.
	// OnReload 规则引擎重新加载规则链或者子节点配置之后的增强点。规则链更新会同时触发OnDestroy和OnReload
	// If the rule chain is updated, then chainCtx=ctx
	// 如果更新规则链,则chainCtx=ctx
	OnReload(parentCtx NodeCtx, ctx NodeCtx, err error)
}

OnReloadAspect is the interface for rule engine reload rule chain or child node configuration advice OnReloadAspect 规则引擎重新加载规则链或者子节点配置之后增强点接口

type Option

type Option func(*Config) error

Option is a function type that modifies the Config.

func WithAspects added in v0.18.0

func WithAspects(aspects ...Aspect) Option

WithAspects is an option that sets the aspects of the Config.

func WithComponentsRegistry

func WithComponentsRegistry(componentsRegistry ComponentRegistry) Option

WithComponentsRegistry is an option that sets the components registry of the Config.

func WithDefaultPool

func WithDefaultPool() Option

func WithLogger

func WithLogger(logger Logger) Option

WithLogger is an option that sets the logger of the Config.

func WithOnDebug

func WithOnDebug(onDebug func(ruleChainId string, flowType string, nodeId string, msg RuleMsg, relationType string, err error)) Option

WithOnDebug is an option that sets the on debug callback of the Config.

func WithParser

func WithParser(parser Parser) Option

WithParser is an option that sets the parser of the Config.

func WithPool

func WithPool(pool Pool) Option

WithPool is an option that sets the pool of the Config.

func WithScriptMaxExecutionTime added in v0.17.0

func WithScriptMaxExecutionTime(scriptMaxExecutionTime time.Duration) Option

WithScriptMaxExecutionTime is an option that sets the js max execution time of the Config.

func WithSecretKey added in v0.20.0

func WithSecretKey(secretKey string) Option

WithSecretKey is an option that sets the secret key of the Config.

type Parser

type Parser interface {
	// DecodeRuleChain 从描述文件解析规则链结构体
	//parses a chain from an input source.
	DecodeRuleChain(config Config, dsl []byte) (Node, error)
	// DecodeRuleNode 从描述文件解析规则节点结构体
	//parses a node from an input source.
	DecodeRuleNode(config Config, dsl []byte, chainCtx Node) (Node, error)
	//EncodeRuleChain 把规则链结构体转换成描述文件
	EncodeRuleChain(def interface{}) ([]byte, error)
	//EncodeRuleNode 把规则节点结构体转换成描述文件
	EncodeRuleNode(def interface{}) ([]byte, error)
}

Parser 规则链定义文件DSL解析器 默认使用json方式,如果使用其他方式定义规则链,可以实现该接口 然后通过该方式注册到规则引擎中:`rulego.NewConfig(WithParser(&MyParser{})`

type PluginRegistry

type PluginRegistry interface {
	//Init 初始化
	Init() error
	//Components 组件列表
	Components() []Node
}

PluginRegistry go plugin 方式提供节点组件接口 示例: package main var Plugins MyPlugins// plugin entry point type MyPlugins struct{}

func (p *MyPlugins) Init() error {
	return nil
}

func (p *MyPlugins) Components() []types.Node {
	return []types.Node{&UpperNode{}, &TimeNode{}, &FilterNode{}}//一个插件可以提供多个组件
}

go build -buildmode=plugin -o plugin.so plugin.go # 编译插件,生成plugin.so文件 rulego.Registry.RegisterPlugin("test", "./plugin.so")//注册到RuleGo默认注册器9

type Pool

type Pool interface {
	//Submit 往协程池提交一个任务
	//如果协程池满返回错误
	Submit(task func()) error
	//Release 释放
	Release()
}

Pool 协程池

func DefaultPool

func DefaultPool() Pool

type RuleChain added in v0.20.0

type RuleChain struct {
	//规则链基础信息定义
	RuleChain RuleChainBaseInfo `json:"ruleChain"`
	//包含了规则链中节点和连接的信息
	Metadata RuleMetadata `json:"metadata"`
}

RuleChain 规则链定义

type RuleChainBaseInfo added in v0.20.0

type RuleChainBaseInfo struct {
	//规则链ID
	ID string `json:"id"`
	//Name 规则链的名称
	Name string `json:"name"`
	//表示这个节点是否处于调试模式。如果为真,当节点处理消息时,会触发调试回调函数。
	//该配置会覆盖节点`DebugMode`配置
	DebugMode bool `json:"debugMode"`
	//Root 表示这个规则链是根规则链还是子规则链。(只做标记使用,非应用在实际逻辑)
	Root bool `json:"root"`
	//Configuration 规则链配置信息
	Configuration Configuration `json:"configuration,omitempty"`
	//扩展字段
	AdditionalInfo map[string]string `json:"additionalInfo,omitempty"`
}

RuleChainBaseInfo 规则链基础信息定义

func (RuleChainBaseInfo) GetAdditionalInfo added in v0.20.0

func (r RuleChainBaseInfo) GetAdditionalInfo(key string) (string, bool)

func (RuleChainBaseInfo) PutAdditionalInfo added in v0.20.0

func (r RuleChainBaseInfo) PutAdditionalInfo(key, value string)

type RuleChainConnection added in v0.20.0

type RuleChainConnection struct {
	//连接的源节点的id,应该与nodes数组中的某个节点id匹配。
	FromId string `json:"fromId"`
	//连接的目标子规则链的id,应该与规则引擎中注册的子规则链之一匹配。
	ToId string `json:"toId"`
	//连接的类型,决定了什么时候以及如何把消息从一个节点发送到另一个节点。它应该与源节点类型支持的连接类型之一匹配。
	Type string `json:"type"`
}

RuleChainConnection 子规则链连接定义 每个对象代表规则链中一个节点和一个子规则链之间的连接

type RuleChainRunSnapshot added in v0.20.0

type RuleChainRunSnapshot struct {
	RuleChain
	// Id 执行ID
	Id string `json:"id"`
	// StartTs 执行开始时间
	StartTs int64 `json:"startTs"`
	// EndTs 执行结束时间
	EndTs int64 `json:"endTs"`
	// Logs 每个节点的日志
	Logs []RuleNodeRunLog `json:"logs"`
	//扩展字段
	AdditionalInfo map[string]string `json:"additionalInfo,omitempty"`
}

RuleChainRunSnapshot 规则链运行日志快照

type RuleContext

type RuleContext interface {
	//TellSuccess 通知规则引擎处理当前消息处理成功,并把消息通过`Success`关系发送到下一个节点
	TellSuccess(msg RuleMsg)
	//TellFailure 通知规则引擎处理当前消息处理失败,并把消息通过`Failure`关系发送到下一个节点
	TellFailure(msg RuleMsg, err error)
	//TellNext 使用指定的relationTypes,把消息发送到下一个节点
	//Send the message to the next node
	TellNext(msg RuleMsg, relationTypes ...string)
	//TellSelf 以指定的延迟(毫秒)向当前节点发送消息。
	TellSelf(msg RuleMsg, delayMs int64)
	//TellFlow 执行子规则链
	//ruleChainId 规则链ID
	//onEndFunc 子规则链链分支执行完的回调,并返回该链执行结果,如果同时触发多个分支链,则会调用多次
	//onAllNodeCompleted 所以节点执行完之后的回调,无结果返回
	//如果找不到规则链,并把消息通过`Failure`关系发送到下一个节点
	TellFlow(msg RuleMsg, ruleChainId string, endFunc OnEndFunc, onAllNodeCompleted func())
	//NewMsg 创建新的消息实例
	NewMsg(msgType string, metaData Metadata, data string) RuleMsg
	//GetSelfId 获取当前节点ID
	GetSelfId() string
	//Self 获取当前节点实例
	Self() NodeCtx
	//From 获取消息流入该节点的节点实例
	From() NodeCtx
	//RuleChain 获取当前节点所在的规则链实例
	RuleChain() NodeCtx
	//Config 获取规则引擎配置
	Config() Config
	//SubmitTack 异步执行任务
	SubmitTack(task func())
	//SetEndFunc 设置当前消息处理结束回调函数
	SetEndFunc(f OnEndFunc) RuleContext
	//GetEndFunc 获取当前消息处理结束回调函数
	GetEndFunc() OnEndFunc
	//SetContext 设置用于不同组件实例共享信号量或者数据的上下文
	SetContext(c context.Context) RuleContext
	//GetContext 获取用于不同组件实例共享信号量或者数据的上下文
	GetContext() context.Context
	//SetOnAllNodeCompleted 设置所有节点执行完回调
	SetOnAllNodeCompleted(onAllNodeCompleted func())
	//ExecuteNode 从指定节点开始执行,如果 skipTellNext=true 则只执行当前节点,不通知下一个节点。
	//onEnd 查看获得最终执行结果
	ExecuteNode(chanCtx context.Context, nodeId string, msg RuleMsg, skipTellNext bool, onEnd OnEndFunc)
	//DoOnEnd 触发 OnEnd 回调函数
	DoOnEnd(msg RuleMsg, err error, relationType string)
	//SetCallbackFunc 设置回调函数
	SetCallbackFunc(functionName string, f interface{})
	//GetCallbackFunc 获取回调函数
	GetCallbackFunc(functionName string) interface{}
	//OnDebug 调用配置的OnDebug回调函数
	OnDebug(ruleChainId string, flowType string, nodeId string, msg RuleMsg, relationType string, err error)
}

RuleContext 规则引擎消息处理上下文接口 处理把消息流转到下一个或者多个节点逻辑 根据规则链连接关系查找当前节点的下一个或者多个节点,然后调用对应节点:nextNode.OnMsg(ctx, msg)触发下一个节点的业务逻辑 另外处理节点OnDebug和OnEnd回调逻辑

type RuleContextOption added in v0.13.0

type RuleContextOption func(RuleContext)

RuleContextOption 修改RuleContext选项的函数

func WithContext added in v0.13.0

func WithContext(c context.Context) RuleContextOption

WithContext 上下文 用于不同组件实例数据或者信号量共享 用于超时取消

func WithEndFunc added in v0.13.0

func WithEndFunc(endFunc func(ctx RuleContext, msg RuleMsg, err error)) RuleContextOption

WithEndFunc 规则链分支链执行完回调函数 注意:如果规则链有多个结束点,回调函数则会执行多次 Deprecated 使用`types.WithOnEnd`代替

func WithOnAllNodeCompleted added in v0.17.0

func WithOnAllNodeCompleted(onAllNodeCompleted func()) RuleContextOption

WithOnAllNodeCompleted 规则链执行完回调函数

func WithOnEnd

func WithOnEnd(endFunc func(ctx RuleContext, msg RuleMsg, err error, relationType string)) RuleContextOption

WithOnEnd 规则链分支链执行完回调函数 注意:如果规则链有多个结束点,回调函数则会执行多次

func WithOnNodeCompleted added in v0.20.0

func WithOnNodeCompleted(onCallback func(ctx RuleContext, nodeRunLog RuleNodeRunLog)) RuleContextOption

WithOnNodeCompleted 节点执行完回调函数,并收集节点的运行日志

func WithOnNodeDebug added in v0.20.0

func WithOnNodeDebug(onDebug func(ruleChainId string, flowType string, nodeId string, msg RuleMsg, relationType string, err error)) RuleContextOption

WithOnNodeDebug 节点调试日志回调函数,实时异步调用,必须节点配置开启debugMode才会触发

func WithOnRuleChainCompleted added in v0.20.0

func WithOnRuleChainCompleted(onCallback func(ctx RuleContext, snapshot RuleChainRunSnapshot)) RuleContextOption

WithOnRuleChainCompleted 规则链执行完回调函数,并收集每个节点的运行日志

type RuleMetadata added in v0.20.0

type RuleMetadata struct {
	//数据流转的第一个节点,默认:0
	FirstNodeIndex int `json:"firstNodeIndex"`
	//节点组件定义
	//每个对象代表规则链中的一个规则节点
	Nodes []*RuleNode `json:"nodes"`
	//连接定义
	//每个对象代表规则链中两个节点之间的连接
	Connections []NodeConnection `json:"connections"`

	//Deprecated
	//使用 Flow Node代替
	//子规则链链接
	//每个对象代表规则链中一个节点和一个子规则链之间的连接
	RuleChainConnections []RuleChainConnection `json:"ruleChainConnections,omitempty"`
}

RuleMetadata 规则链元数据定义,包含了规则链中节点和连接的信息

type RuleMsg

type RuleMsg struct {
	// 消息时间戳
	Ts int64 `json:"ts"`
	// 消息ID,同一条消息再规则引擎流转,整个过程是唯一的
	Id string `json:"id"`
	//数据类型
	DataType DataType `json:"dataType"`
	//消息类型,规则引擎分发数据的重要字段
	//一般把消息交给规则引擎处理`ruleEngine.OnMsg(msg)`,需要把消息分类并指定其Type
	//例如:POST_TELEMETRY、ACTIVITY_EVENT、INACTIVITY_EVENT、CONNECT_EVENT、DISCONNECT_EVENT
	//ENTITY_CREATED、ENTITY_UPDATED、ENTITY_DELETED、DEVICE_ALARM、POST_DEVICE_DATA
	Type string `json:"type"`
	//消息内容
	Data string `json:"data"`
	//消息元数据
	Metadata Metadata `json:"metadata"`
}

RuleMsg 规则引擎消息

func NewMsg

func NewMsg(ts int64, msgType string, dataType DataType, metaData Metadata, data string) RuleMsg

NewMsg 创建一个新的消息实例,并通过uuid生成消息ID

func (*RuleMsg) Copy

func (m *RuleMsg) Copy() RuleMsg

Copy 复制

type RuleNode added in v0.20.0

type RuleNode struct {
	//节点的唯一标识符,可以是任意字符串
	Id string `json:"id"`
	//扩展字段
	AdditionalInfo NodeAdditionalInfo `json:"additionalInfo,omitempty"`
	//节点的类型,决定了节点的逻辑和行为。它应该与规则引擎中注册的节点类型之一匹配。
	Type string `json:"type"`
	//节点的名称,可以是任意字符串
	Name string `json:"name"`
	//表示这个节点是否处于调试模式。如果为真,当节点处理消息时,会触发调试回调函数。
	//该配置会被RuleChain`DebugMode`配置覆盖
	DebugMode bool `json:"debugMode"`
	//包含了节点的配置参数,具体内容取决于节点类型。
	//例如,一个JS过滤器节点可能有一个`jsScript`字段,定义了过滤逻辑,
	//而一个REST API调用节点可能有一个`restEndpointUrlPattern`字段,定义了要调用的URL。
	Configuration Configuration `json:"configuration"`
}

RuleNode 规则链节点信息定义

type RuleNodeId

type RuleNodeId struct {
	//节点ID
	Id string
	//节点类型,节点/子规则链
	Type ComponentType
}

RuleNodeId 组件ID类型定义

type RuleNodeRelation

type RuleNodeRelation struct {
	//入组件ID
	InId RuleNodeId
	//出组件ID
	OutId RuleNodeId
	//关系 如:True、False、Success、Failure 或者其他自定义关系
	RelationType string
}

RuleNodeRelation 节点与节点之间关系

type RuleNodeRunLog added in v0.20.0

type RuleNodeRunLog struct {
	// Id 节点ID
	Id string `json:"nodeId"`
	// InMsg 输入消息
	InMsg RuleMsg `json:"inMsg"`
	// OutMsg 输出消息
	OutMsg RuleMsg `json:"outMsg"`
	// RelationType 和下一个节点连接类型
	RelationType string `json:"relationType"`
	// Err 错误信息
	Err string `json:"err"`
	// LogItems 执行过程中的日志
	LogItems []string `json:"logItems"`
	// StartTs 执行开始时间
	StartTs int64 `json:"startTs"`
	// EndTs 执行结束时间
	EndTs int64 `json:"endTs"`
}

RuleNodeRunLog 节点日志

type SafeComponentSlice

type SafeComponentSlice struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SafeComponentSlice 安全的组件列表切片

func (*SafeComponentSlice) Add

func (p *SafeComponentSlice) Add(nodes ...Node)

Add 线程安全地添加元素

func (*SafeComponentSlice) Components

func (p *SafeComponentSlice) Components() []Node

Components 获取组件列表

type Script added in v0.17.0

type Script struct {
	//Type 脚本类型,默认Js
	Type string
	//Content 脚本内容或者自定义函数
	Content interface{}
}

Script 脚本 用于注册原生函数或者使用go定义的自定义函数

type StartAspect added in v0.18.0

type StartAspect interface {
	NodeAspect
	//Start is the advice that executes before the rule engine OnMsg method. The returned Msg will be used as the input for the next advice and the next node OnMsg method.
	//Start 规则引擎 OnMsg 方法执行之前的增强点。返回的Msg将作为下一个增强点和下一个节点 OnMsg 方法的入参。
	Start(ctx RuleContext, msg RuleMsg) RuleMsg
}

StartAspect is the interface for rule engine pre-execution advice StartAspect 规则引擎 OnMsg 方法执行之前的增强点接口

type WrapperMsg added in v0.19.0

type WrapperMsg struct {
	//Msg 消息
	Msg RuleMsg `json:"msg"`
	//Err 错误
	Err string `json:"err"`
	//NodeId 结束节点ID
	NodeId string `json:"nodeId"`
}

WrapperMsg 节点执行结果封装,用于封装多个节点执行结果

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL