node_pool

package
v0.0.0-...-56eedac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package node_pool provides shared node resource management for efficient connection reuse across different rule chains and components. It enables network connection-type components to share their instantiated connection resources (clients) with other components, achieving the purpose of saving system resources.

Package node_pool 提供共享节点资源管理,实现不同规则链和组件间的高效连接复用。 它使网络连接类型的组件能够将其实例化的连接资源(客户端)与其他组件共享, 达到节省系统资源的目的。

Connection Reuse Scenarios: 连接复用场景:

Multiple components can reuse the same connection resources: 多个组件可以复用相同的连接资源:

  • Multiple MQTT clients sharing the same MQTT connection 多个MQTT客户端共享同一个MQTT连接
  • Multiple database operations sharing the same database connection 多个数据库操作共享同一个数据库连接
  • Multiple HTTP endpoints sharing the same port 多个HTTP端点共享同一个端口
  • Message queue clients sharing connection pools 消息队列客户端共享连接池

SharedNode Interface Requirement: SharedNode 接口要求:

Components that support connection sharing must implement the SharedNode interface: 支持连接共享的组件必须实现 SharedNode 接口:

type SharedNode interface {
	GetInstance() (interface{}, error)
	// ... other methods
}

Most officially provided network connection components support this pattern. 大部分官方提供的网络连接组件都支持这种模式。

Usage Pattern: 使用模式:

  1. Initialize shared resource nodes by loading a rule chain definition: 通过加载规则链定义初始化共享资源节点:

    node_pool.DefaultNodePool.Load(dsl []byte)

  2. Reference shared resources using the ref://{resourceId} pattern: 使用 ref://{resourceId} 模式引用共享资源:

    { "id": "node_2", "type": "mqttClient", "configuration": { "server": "ref://local_mqtt_client", "topic": "/device/msg" } }

Node Pool Configuration Example: 节点池配置示例:

{
	"ruleChain": {
		"id": "default_node_pool",
		"name": "全局共享节点池"
	},
	"metadata": {
		"endpoints": [...],
		"nodes": [
			{
				"id": "local_mqtt_client",
				"type": "mqttClient",
				"configuration": {
					"server": "127.0.0.1:1883"
				}
			}
		]
	}
}

Difference from Node Reference: 与节点引用的区别:

  • Node Reference: Completely references the specified node instance, including all configurations 节点引用:完全引用指定节点实例,包括节点所有配置

  • Shared Resource Node: Reuses the node's connection instance, but other configurations are independent 共享资源节点:复用节点的连接实例,但是节点的其他配置是独立的

For example, with MQTT client nodes: 例如,对于MQTT客户端节点:

  • Shared: Connection configuration (MQTT address, reconnection interval, etc.) 共享:连接类配置(MQTT地址、重连间隔等)
  • Independent: Other configurations like publish topics for each node 独立:其他配置如每个节点的发布主题

RuleGo-Server Integration: RuleGo-Server 集成:

In RuleGo-Server, configure the node pool file in config.conf: 在 RuleGo-Server 中,在 config.conf 中配置节点池文件:

node_pool_file=./node_pool.json

Thread Safety: 线程安全:

The node pool implementation is thread-safe and supports concurrent access from multiple goroutines. All operations are protected by appropriate synchronization mechanisms. 节点池实现是线程安全的,支持多个 goroutine 的并发访问。 所有操作都受适当的同步机制保护。

Index

Constants

This section is empty.

Variables

View Source
var DefaultNodePool = NewNodePool(engine.NewConfig())

DefaultNodePool is the global default component resource pool manager. It provides a convenient singleton instance for managing shared node resources across the entire application. Most applications can use this default instance without creating custom node pools.

DefaultNodePool 是全局默认组件资源池管理器。 它为整个应用程序管理共享节点资源提供便捷的单例实例。 大多数应用程序可以使用此默认实例,无需创建自定义节点池。

Usage: 使用:

// Load shared nodes from JSON configuration
// 从JSON配置加载共享节点
DefaultNodePool.Load(jsonConfig)

// Get a shared connection instance
// 获取共享连接实例
if instance, err := DefaultNodePool.GetInstance("local_mqtt_client"); err == nil {
    // Use the shared MQTT client
}
View Source
var (
	// ErrNotImplemented is returned when a component does not implement the SharedNode interface.
	// Only components implementing SharedNode can be added to the node pool for connection sharing.
	//
	// ErrNotImplemented 当组件未实现 SharedNode 接口时返回。
	// 只有实现了 SharedNode 的组件才能添加到节点池中进行连接共享。
	ErrNotImplemented = errors.New("not SharedNode")
)

Functions

This section is empty.

Types

type NodePool

type NodePool struct {
	// Config provides the rule engine configuration used for creating and managing shared nodes.
	// This configuration determines how nodes are parsed, initialized, and managed.
	//
	// Config 提供用于创建和管理共享节点的规则引擎配置。
	// 此配置决定节点如何解析、初始化和管理。
	Config types.Config
	// contains filtered or unexported fields
}

NodePool is a thread-safe component resource pool manager that manages shared node instances and their connection resources. It enables efficient reuse of network connections and other expensive resources across multiple rule chains and components.

NodePool 是线程安全的组件资源池管理器,管理共享节点实例及其连接资源。 它使网络连接和其他昂贵资源能够在多个规则链和组件间高效复用。

Key Features: 主要功能:

  • Thread-safe concurrent access to shared resources 线程安全的共享资源并发访问
  • Support for both endpoint and rule node sharing 支持端点和规则节点的共享
  • Automatic resource lifecycle management 自动资源生命周期管理
  • JSON-based configuration loading 基于JSON的配置加载
  • Dynamic resource addition and removal 动态资源添加和删除

Resource Management: 资源管理: The pool maintains a mapping of resource IDs to shared node contexts, allowing components to reference shared resources by ID using the ref://{resourceId} pattern. 池维护资源ID到共享节点上下文的映射,允许组件使用ref://{resourceId}模式通过ID引用共享资源。

func NewNodePool

func NewNodePool(config types.Config) *NodePool

NewNodePool creates a new node pool instance with the specified configuration. The configuration determines how nodes are parsed, initialized, and managed within the pool.

NewNodePool 使用指定配置创建新的节点池实例。 配置决定节点在池中如何解析、初始化和管理。

Parameters: 参数:

  • config: Rule engine configuration for node management 用于节点管理的规则引擎配置

Returns: 返回:

  • *NodePool: New node pool instance 新的节点池实例

Usage: 使用:

config := engine.NewConfig()
pool := NewNodePool(config)

func (*NodePool) AddNode

func (n *NodePool) AddNode(node types.Node) (types.SharedNodeCtx, error)

func (*NodePool) Del

func (n *NodePool) Del(id string)

Del deletes a SharedNode instance by its ID.

func (*NodePool) Get

func (n *NodePool) Get(id string) (types.SharedNodeCtx, bool)

Get retrieves a SharedNode by its ID.

func (*NodePool) GetAll

func (n *NodePool) GetAll() []types.SharedNodeCtx

GetAll get all SharedNode instances

func (*NodePool) GetAllDef

func (n *NodePool) GetAllDef() (map[string][]*types.RuleNode, error)

func (*NodePool) GetInstance

func (n *NodePool) GetInstance(id string) (interface{}, error)

GetInstance retrieves a net client or server connection by its ID.

func (*NodePool) Load

func (n *NodePool) Load(dsl []byte) (types.NodePool, error)

Load parses and loads shared node definitions from JSON/DSL configuration data. This is the primary method for initializing a node pool from configuration files. The DSL should contain a rule chain definition with endpoints and nodes sections.

Load 从JSON/DSL配置数据解析并加载共享节点定义。 这是从配置文件初始化节点池的主要方法。 DSL应包含带有端点和节点部分的规则链定义。

Parameters: 参数:

  • dsl: JSON configuration data defining the shared nodes 定义共享节点的JSON配置数据

Returns: 返回:

  • types.NodePool: The node pool instance (self) for method chaining 用于方法链的节点池实例(自身)
  • error: Parse or initialization error if any 解析或初始化错误(如果有)

Configuration Format: 配置格式:

{
    "ruleChain": {"id": "pool_id", "name": "Pool Name"},
    "metadata": {
        "endpoints": [{"id": "ep1", "type": "endpoint/type", "configuration": {...}}],
        "nodes": [{"id": "node1", "type": "nodeType", "configuration": {...}}]
    }
}

Error Conditions: 错误条件:

  • Invalid JSON format 无效的JSON格式
  • Node type not found in registry 注册表中未找到节点类型
  • Duplicate node IDs 重复的节点ID
  • Component doesn't implement SharedNode interface 组件未实现SharedNode接口

func (*NodePool) LoadFromRuleChain

func (n *NodePool) LoadFromRuleChain(def types.RuleChain) (types.NodePool, error)

func (*NodePool) NewFromEndpoint

func (n *NodePool) NewFromEndpoint(def types.EndpointDsl) (types.SharedNodeCtx, error)

func (*NodePool) NewFromRuleNode

func (n *NodePool) NewFromRuleNode(def types.RuleNode) (types.SharedNodeCtx, error)

func (*NodePool) Range

func (n *NodePool) Range(f func(key, value any) bool)

Range iterates over all SharedNode instances in the pool.

func (*NodePool) Stop

func (n *NodePool) Stop()

Stop stops and releases all SharedNode instances.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL