types

package
v0.0.0-...-a89011a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JSON   = DataType("JSON")
	TEXT   = DataType("TEXT")
	BINARY = DataType("BINARY")
)
View Source
const (
	MsgKey      = "msg"
	MetadataKey = "metadata"
	MsgTypeKey  = "msgType"
)
View Source
const (
	Success = "Success"
	Failure = "Failure"
	True    = "True"
	False   = "False"
)

关系 节点与节点连接的关系,以下是常用的关系,可以自定义 relation types

View Source
const (
	In  = "IN"
	Out = "OUT"
)

flow direction type 流向 消息流入、流出节点方向

Variables

View Source
var EmptyRuleNodeId = RuleNodeId{}

EmptyRuleNodeId 空节点ID

Functions

func DefaultLogger

func DefaultLogger() *log.Logger

DefaultLogger returns a `Logger` implementation

Types

type ComponentRegistry

type ComponentRegistry interface {
	//Register 注册组件,如果`node.Type()`已经存在则返回一个`已存在`错误
	Register(node Node) error
	//RegisterPlugin 通过plugin机制加载外部.so文件注册组件,
	//如果`name`已经存在或者插件提供的组件列表`node.Type()`已经存在则返回一个`已存在`错误
	RegisterPlugin(name string, file string) error
	//Unregister 删除组件或者通过插件名称删除一批组件
	Unregister(componentType string) error
	//NewNode 通过nodeType创建一个新的node实例
	NewNode(nodeType string) (Node, error)
	//GetComponents 获取所有组件列表
	GetComponents() map[string]Node
}

ComponentRegistry 节点组件注册器

type ComponentType

type ComponentType int

ComponentType 组件类型:规则节点或者子规则链

const (
	NODE ComponentType = iota
	CHAIN
)

type Config

type Config struct {
	//OnDebug 节点调试信息回调函数,只有节点debugMode=true才会调用
	OnDebug func(flowType string, nodeId string, msg RuleMsg, relationType string, err error)
	//OnEnd 规则链执行完成回调函数,如果有多个结束点,则执行多次
	OnEnd func(msg RuleMsg, err error)
	//JsMaxExecutionTime js脚本执行超时时间,默认2000毫秒
	JsMaxExecutionTime time.Duration
	//Pool 协程池接口
	//如果不配置,则使用 go func 方式
	//默认使用`pool.WorkerPool`。兼容ants协程池,可以使用ants协程池实现
	//例如:
	//	pool, _ := ants.NewPool(math.MaxInt32)
	//	config := rulego.NewConfig(types.WithPool(pool))
	Pool Pool
	//ComponentsRegistry 组件库
	//默认使用`rulego.Registry`
	ComponentsRegistry ComponentRegistry
	//规则链解析接口,默认使用:`rulego.JsonParser`
	Parser Parser
	//Logger 日志记录接口,默认使用:`DefaultLogger()`
	Logger Logger
}

Config 规则引擎配置

func NewConfig

func NewConfig(opts ...Option) Config

type Configuration

type Configuration map[string]interface{}

Configuration 组件配置类型

func (Configuration) GetToString

func (c Configuration) GetToString(key string) string

type DataType

type DataType string

DataType 消息数据类型

type JsEngine

type JsEngine interface {
	//Execute 执行js脚本指定函数,js脚本在JsEngine实例化的时候进行初始化
	//functionName 执行的函数名
	//argumentList 函数参数列表
	Execute(functionName string, argumentList ...interface{}) (interface{}, error)
	//Stop 释放js引擎资源
	Stop()
}

JsEngine JavaScript脚本引擎

type Logger

type Logger interface {
	Printf(format string, v ...interface{})
}

func NewLogger

func NewLogger(custom Logger) Logger

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

Metadata 规则引擎消息元数据

func BuildMetadata

func BuildMetadata(data map[string]interface{}) Metadata

BuildMetadata 通过map,创建一个新的规则引擎消息元数据实例

func NewMetadata

func NewMetadata() Metadata

NewMetadata 创建一个新的规则引擎消息元数据实例1

func (*Metadata) Copy

func (md *Metadata) Copy() Metadata

Copy 复制

func (*Metadata) GetValue

func (md *Metadata) GetValue(key string) interface{}

GetValue 通过key获取值

func (*Metadata) Has

func (md *Metadata) Has(key string) bool

Has 是否存在某个key

func (*Metadata) PutValue

func (md *Metadata) PutValue(key string, value interface{})

PutValue 设置值

func (*Metadata) Values

func (md *Metadata) Values() map[string]interface{}

Values 获取所有值

type Node

type Node interface {
	//New 创建一个组件新实例
	New() Node
	//Type 组件类型,类型不能重复。
	//用于规则链,node.type配置,初始化对应的组件
	//建议使用`/`区分命名空间,防止冲突。例如:lala/httpClient
	Type() string
	//Init 组件初始化
	Init(ruleConfig Config, configuration Configuration) error
	//OnMsg 处理消息
	//ctx:规则引擎处理消息上下文
	//msg:消息
	OnMsg(ctx RuleContext, msg RuleMsg) error
	//Destroy 销毁
	Destroy()
}

Node 规则引擎节点组件接口 把业务封或者常用工具装成组件,然后通过规则链配置方式调用该组件 实现方式参考`components`包 然后注册到`RuleGo`默认注册器 rulego.Registry.Register(&MyNode{})

type NodeCtx

type NodeCtx interface {
	Node
	//IsDebugMode 该节点是否是调试模式
	//True:消息流入和流出该节点,会调用config.OnDebug回调函数,否则不会
	IsDebugMode() bool
	//GetNodeId 获取组件ID
	GetNodeId() RuleNodeId
	//ReloadSelf 刷新该组件配置
	ReloadSelf(def []byte) error
	//ReloadChild
	//如果是子规则链类型,则刷新该子规则链指定ID组件配置
	//如果是节点类型,则不支持该方法
	ReloadChild(nodeId RuleNodeId, def []byte) error
	//GetNodeById
	//如果是子规则链类型,则获取该子规则链指定ID组件配置
	//如果是节点类型,则不支持该方法
	GetNodeById(nodeId RuleNodeId) (NodeCtx, bool)
	//DSL 返回该节点配置DSL
	DSL() []byte
}

NodeCtx 规则节点实例化上下文

type Option

type Option func(*Config) error

Option is a function type that modifies the Config.

func WithComponentsRegistry

func WithComponentsRegistry(componentsRegistry ComponentRegistry) Option

WithComponentsRegistry is an option that sets the components registry of the Config.

func WithDefaultPool

func WithDefaultPool() Option

func WithJsMaxExecutionTime

func WithJsMaxExecutionTime(jsMaxExecutionTime time.Duration) Option

WithJsMaxExecutionTime is an option that sets the js max execution time of the Config.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger is an option that sets the logger of the Config.

func WithOnDebug

func WithOnDebug(onDebug func(flowType string, nodeId string, msg RuleMsg, relationType string, err error)) Option

WithOnDebug is an option that sets the on debug callback of the Config.

func WithOnEnd

func WithOnEnd(onEnd func(msg RuleMsg, err error)) Option

WithOnEnd is an option that sets the on end callback of the Config.

func WithParser

func WithParser(parser Parser) Option

WithParser is an option that sets the parser of the Config.

func WithPool

func WithPool(pool Pool) Option

WithPool is an option that sets the pool of the Config.

type Parser

type Parser interface {
	// DecodeRuleChain 从描述文件解析规则链结构体
	//parses a chain from an input source.
	DecodeRuleChain(config Config, dsl []byte) (Node, error)
	// DecodeRuleNode 从描述文件解析规则节点结构体
	//parses a node from an input source.
	DecodeRuleNode(config Config, dsl []byte) (Node, error)
	//EncodeRuleChain 把规则链结构体转换成描述文件
	EncodeRuleChain(def interface{}) ([]byte, error)
	//EncodeRuleNode 把规则节点结构体转换成描述文件
	EncodeRuleNode(def interface{}) ([]byte, error)
}

Parser 规则链定义文件DSL解析器 默认使用json方式,如果使用其他方式定义规则链,可以实现该接口 然后通过该方式注册到规则引擎中:`rulego.NewConfig(WithParser(&MyParser{})`

type PluginRegistry

type PluginRegistry interface {
	//Init 初始化
	Init() error
	//Components 组件列表
	Components() []Node
}

PluginRegistry go plugin 方式提供节点组件接口 示例: package main var Plugins MyPlugins// plugin entry point type MyPlugins struct{}

func (p *MyPlugins) Init() error {
	return nil
}

func (p *MyPlugins) Components() []types.Node {
	return []types.Node{&UpperNode{}, &TimeNode{}, &FilterNode{}}//一个插件可以提供多个组件
}

go build -buildmode=plugin -o plugin.so plugin.go # 编译插件,生成plugin.so文件 rulego.Registry.RegisterPlugin("test", "./plugin.so")//注册到RuleGo默认注册器9

type Pool

type Pool interface {
	//Submit 往协程池提交一个任务
	//如果协程池满返回错误
	Submit(task func()) error
	//Release 释放
	Release()
}

Pool 协程池

func DefaultPool

func DefaultPool() Pool

type RuleContext

type RuleContext interface {
	//TellSuccess 通知规则引擎处理当前消息处理成功,并把消息通过`Success`关系发送到下一个节点
	TellSuccess(msg RuleMsg)
	//TellFailure 通知规则引擎处理当前消息处理失败,并把消息通过`Failure`关系发送到下一个节点
	TellFailure(msg RuleMsg, err error)
	//TellNext 使用指定的relationTypes,发送消息到下一个节点
	//Send the message to the next node
	TellNext(msg RuleMsg, relationTypes ...string)
	//TellSelf 以指定的延迟(毫秒)向当前规则节点发送消息。
	TellSelf(msg RuleMsg, delayMs int64)
	//NewMsg 创建新的消息实例
	NewMsg(msgType string, metaData Metadata, data string) RuleMsg
	//GetSelfId 获取当前节点ID
	GetSelfId() string
	//Config 获取规则引擎配置
	Config() Config
	//SubmitTack 异步执行任务
	SubmitTack(task func())
	//SetEndFunc 设置当前消息处理结束回调函数
	SetEndFunc(f func(msg RuleMsg, err error)) RuleContext
	//GetEndFunc 获取当前消息处理结束回调函数
	GetEndFunc() func(msg RuleMsg, err error)
	//SetContext 设置用于不同组件实例共享信号量或者数据的上下文
	SetContext(c context.Context) RuleContext
	//GetContext 获取用于不同组件实例共享信号量或者数据的上下文
	GetContext() context.Context
}

RuleContext 规则引擎消息处理上下文接口 处理把消息流转到下一个或者多个节点逻辑 根据规则链连接关系查找当前节点的下一个或者多个节点,然后调用对应节点:nextNode.OnMsg(ctx, msg)触发下一个节点的业务逻辑 另外处理节点OnDebug和OnEnd回调逻辑

type RuleContextOption

type RuleContextOption func(RuleContext)

RuleContextOption 修改RuleContext选项的函数

func WithContext

func WithContext(c context.Context) RuleContextOption

func WithEndFunc

func WithEndFunc(endFunc func(msg RuleMsg, err error)) RuleContextOption

type RuleMsg

type RuleMsg struct {
	// 消息时间戳
	Ts int64 `json:"ts"`
	// 消息ID,同一条消息再规则引擎流转,整个过程是唯一的
	Id string `json:"id"`
	//数据类型
	DataType DataType `json:"dataType"`
	//消息类型,规则引擎分发数据的重要字段
	//一般把消息交给规则引擎处理`ruleEngine.OnMsg(msg)`,需要把消息分类并指定其Type
	//例如:POST_TELEMETRY、ACTIVITY_EVENT、INACTIVITY_EVENT、CONNECT_EVENT、DISCONNECT_EVENT
	//ENTITY_CREATED、ENTITY_UPDATED、ENTITY_DELETED、DEVICE_ALARM、POST_DEVICE_DATA
	Type string `json:"type"`
	//消息内容
	Data string `json:"data"`
	//消息元数据
	Metadata Metadata
}

RuleMsg 规则引擎消息

func NewMsg

func NewMsg(ts int64, msgType string, dataType DataType, metaData Metadata, data string) RuleMsg

NewMsg 创建一个新的消息实例,并通过uuid生成消息ID

func (*RuleMsg) Copy

func (m *RuleMsg) Copy() RuleMsg

Copy 复制

type RuleNodeId

type RuleNodeId struct {
	//节点ID
	Id string
	//节点类型,节点/子规则链
	Type ComponentType
}

RuleNodeId 组件ID类型定义

type RuleNodeRelation

type RuleNodeRelation struct {
	//入组件ID
	InId RuleNodeId
	//出组件ID
	OutId RuleNodeId
	//关系 如:True、False、Success、Failure 或者其他自定义关系
	RelationType string
}

RuleNodeRelation 节点与节点之间关系

type SafeComponentSlice

type SafeComponentSlice struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SafeComponentSlice 安全的组件列表切片

func (*SafeComponentSlice) Add

func (p *SafeComponentSlice) Add(nodes ...Node)

Add 线程安全地添加元素

func (*SafeComponentSlice) Components

func (p *SafeComponentSlice) Components() []Node

Components 获取组件列表

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL