endpoint

package
v0.0.0-...-a89011a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

README

Endpoint

English| 中文

endpoint is a package that abstracts different input source data routing and heterogeneous system data integration, it allows you to easily create and start different receiving services, such as HTTP or MQTT, then according to different requests or messages, perform transformation, processing, routing and other operations, and finally hand over to the rule chain or component for processing.

Usage

Create Router

Router is a type that defines routing rules, it can specify input end, transformation function, processing function, output end and so on. You can use NewRouter function to create a Router type pointer, then use From method to specify input end, return a From type pointer.

router := endpoint.NewRouter().From("/api/v1/msg/")
Add processing functions

From type has two methods to add processing functions: Transform and Process. Transform method is used to transform input message into RuleMsg type, Process method is used to process input or output message. These two methods both accept a Process type function as parameter, return a From type pointer. Process type function accepts an Exchange type pointer as parameter, returns a boolean value indicating whether to continue executing the next processing function. Exchange type is a struct that contains an input message and an output message, used to pass data in processing functions.

router := endpoint.NewRouter().From("/api/v1/msg/").Transform(func(exchange *endpoint.Exchange) bool {
    //transformation logic
    return true
}).Process(func(exchange *endpoint.Exchange) bool {
    //processing logic
    return true
})
Response

You can use the Exchange's Out message to respond to the client in the conversion or processing function

//Response StatusCode
exchange.Out.SetStatusCode(http.StatusMethodNotAllowed)
//Response header
exchange.Out.Headers().Set("Content-Type", "application/json")
//Response content
exchange.Out.SetBody([]byte("ok"))

Note: mqtt endpoint calling SetBody() will use the specified topic to public data to the broker, specify the topic using the following method

exchange.Out.Headers().Set("topic", "your topic")
Set output end

From type has two methods to set output end: To and ToComponent. To method is used to specify routing target path or component, ToComponent method is used to specify output component. These two methods both return a To type pointer.

To method's parameter is a string, representing component path, format is {executorType}:{path}. executorType is executor component type, path is component path. For example: "chain:{chainId}" means executing rule chain registered in rulego, "component:{nodeType}" means executing component registered in config.ComponentsRegistry. You can register custom executor component types in DefaultExecutorFactory. To method can also accept some component configuration parameters as optional parameters.

router := endpoint.NewRouter().From("/api/v1/msg/").Transform(func(exchange *endpoint.Exchange) bool {
    //transformation logic
    return true
}).To("chain:default")

ToComponent method's parameter is a types.Node type component, you can customize or use existing components.

router := endpoint.NewRouter().From("/api/v1/msg/").Transform(func(exchange *endpoint.Exchange) bool {
    //transformation logic
    return true
}).ToComponent(func() types.Node {
        //define log component, process data
        var configuration = make(types.Configuration)
        configuration["jsScript"] = `
        return 'log::Incoming message:\n' + JSON.stringify(msg) + '\nIncoming metadata:\n' + JSON.stringify(metadata);
        `
        logNode := &action.LogNode{}
        _ = logNode.Init(config, configuration)
        return logNode
}())

Use To method call component

router := endpoint.NewRouter().From("/api/v1/msg/").Transform(func(exchange *endpoint.Exchange) bool {
    //转换逻辑
    return true
}).To"component:log", types.Configuration{"jsScript": `
		return 'log::Incoming message:\n' + JSON.stringify(msg) + '\nIncoming metadata:\n' + JSON.stringify(metadata);
`})
End routing

To type has a method to end routing: End. End method returns a Router type pointer.

router := endpoint.NewRouter().From("/api/v1/msg/").Transform(func(exchange *endpoint.Exchange) bool {
    //transformation logic
    return true
}).To("chain:default").End()
Create RestEndPoint

RestEndPoint is a type that creates and starts HTTP receiving service, it can register different routes to handle different requests. You can create a Rest type pointer and specify service address and other configurations.

restEndpoint := &rest.Rest{Config: rest.Config{Addr: ":9090"}}

You can use restEndpoint.AddInterceptors method to add global interceptors for permission verification and other logic.

restEndpoint.AddInterceptors(func(exchange *endpoint.Exchange) bool {
		//permission verification logic
		return true
})

You can use restEndpoint.GET or restEndpoint.POST methods to register routes, corresponding to GET or POST request methods. These methods accept one or more Router type pointers as parameters.

restEndpoint.GET(router1, router2)
restEndpoint.POST(router3, router4)

You can use restEndpoint.Start method to start service.

_ = restEndpoint.Start()
Create MqttEndpoint

MqttEndpoint is a type that creates and starts MQTT receiving service, it can subscribe different topics to handle different messages. You can create a Mqtt type pointer and specify service address and other configurations.

mqttEndpoint := &mqtt.Mqtt{
        Config: mqtt.Config{
            Server: "127.0.0.1:1883",
        },
}

You can use mqttEndpoint.AddInterceptors method to add global interceptors for permission verification and other logic.

mqttEndpoint.AddInterceptors(func(exchange *endpoint.Exchange) bool {
        //permission verification logic
	return true
})

You can use the mqttEndpoint.AddRouter method to register routes, this method accepts a Router type pointer as a parameter.

_ = mqttEndpoint.AddRouter(router1)

You can use the mqttEndpoint.Start method to start the service.

_ = mqttEndpoint.Start()

Examples

Here are some examples of using the endpoint package:
RestEndpoint
MqttEndpoint

Extending endpoint

The endpoint package provides some built-in types of receiving services, such as Rest and Mqtt, but you can also customize or extend other types of receiving services, such as Kafka. To achieve this, you need to follow these steps:

  1. Implement the Message interface. The Message interface is an interface that abstracts different input source data, it defines some methods to get or set the message content, header, source, parameter, status code, etc. You need to implement this interface for your receiving service type, so that your message type can interact with other types in the endpoint package.
  2. Implement the EndPoint interface. The EndPoint interface is an interface that defines different receiving service types, it defines some methods to start, stop, add routes and interceptors, etc. You need to implement this interface for your receiving service type, so that your service type can interact with other types in the endpoint package.
  3. Register the Executor type. The Executor interface is an interface that defines different output end executors, it defines some methods to initialize, execute, get path, etc. You can implement this interface for your output end component, and register your Executor type in the DefaultExecutorFactory, so that your component can be called by other types in the endpoint package.

These are the basic steps to extend the endpoint package, you can refer to the existing implementations of Rest and Mqtt types in the endpoint package to write your own code.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultExecutorFactory = new(ExecutorFactory)

DefaultExecutorFactory 默认to端执行器注册器

Functions

This section is empty.

Types

type BaseEndpoint

type BaseEndpoint struct {

	//endpoint 路由存储器
	RouterStorage map[string]*Router
	sync.RWMutex
	// contains filtered or unexported fields
}

BaseEndpoint 基础端点 实现全局拦截器基础方法

func (*BaseEndpoint) AddInterceptors

func (e *BaseEndpoint) AddInterceptors(interceptors ...Process)

AddInterceptors 添加全局拦截器

func (*BaseEndpoint) DoProcess

func (e *BaseEndpoint) DoProcess(router *Router, exchange *Exchange)

func (*BaseEndpoint) OnMsg

func (e *BaseEndpoint) OnMsg(ctx types.RuleContext, msg types.RuleMsg) error

type ChainExecutor

type ChainExecutor struct {
}

ChainExecutor 规则链执行器

func (*ChainExecutor) Execute

func (ce *ChainExecutor) Execute(ctx context.Context, router *Router, exchange *Exchange)

func (*ChainExecutor) Init

func (*ChainExecutor) IsPathSupportVar

func (ce *ChainExecutor) IsPathSupportVar() bool

IsPathSupportVar to路径允许带变量

func (*ChainExecutor) New

func (ce *ChainExecutor) New() Executor

type ComponentExecutor

type ComponentExecutor struct {
	// contains filtered or unexported fields
}

ComponentExecutor node组件执行器

func (*ComponentExecutor) Execute

func (ce *ComponentExecutor) Execute(ctx context.Context, router *Router, exchange *Exchange)

func (*ComponentExecutor) Init

func (ce *ComponentExecutor) Init(config types.Config, configuration types.Configuration) error

func (*ComponentExecutor) IsPathSupportVar

func (ce *ComponentExecutor) IsPathSupportVar() bool

IsPathSupportVar to路径不允许带变量

func (*ComponentExecutor) New

func (ce *ComponentExecutor) New() Executor

type Endpoint

type Endpoint interface {
	//Node 继承node
	types.Node
	//Id 类型标识
	Id() string
	//Start 启动服务
	Start() error
	//AddRouterWithParams 添加路由,指定参数
	AddRouterWithParams(router *Router, params ...interface{}) error
	//RemoveRouterWithParams 删除路由,指定参数
	RemoveRouterWithParams(from string, params ...interface{}) error
}

type Exchange

type Exchange struct {
	//入数据
	In Message
	//出数据
	Out Message
}

Exchange 包含in 和out message

type Executor

type Executor interface {
	//New 创建新的实例
	New() Executor
	//IsPathSupportVar to路径是否支持${}变量方式,默认不支持
	IsPathSupportVar() bool
	//Init 初始化
	Init(config types.Config, configuration types.Configuration) error
	//Execute 执行逻辑
	Execute(ctx context.Context, router *Router, exchange *Exchange)
}

Executor to端执行器

type ExecutorFactory

type ExecutorFactory struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

ExecutorFactory to端执行器工厂

func (*ExecutorFactory) New

func (r *ExecutorFactory) New(name string) (Executor, bool)

New 根据类型创建to端执行器实例

func (*ExecutorFactory) Register

func (r *ExecutorFactory) Register(name string, executor Executor)

Register 注册to端执行器

type From

type From struct {
	//Config 配置
	Config types.Configuration
	//Router router指针
	Router *Router
	//来源路径
	From string
	// contains filtered or unexported fields
}

From from端

func (*From) End

func (f *From) End() *Router

End 结束返回*Router

func (*From) ExecuteProcess

func (f *From) ExecuteProcess(router *Router, exchange *Exchange) bool

ExecuteProcess 执行处理函数 true:执行To端逻辑,否则不执行

func (*From) GetProcessList

func (f *From) GetProcessList() []Process

GetProcessList 获取from端处理器列表

func (*From) GetTo

func (f *From) GetTo() *To

func (*From) Process

func (f *From) Process(process Process) *From

Process from端处理msg

func (*From) To

func (f *From) To(to string, configs ...types.Configuration) *To

To To端 参数是组件路径,格式{executorType}:{path} executorType:执行器组件类型,path:组件路径 如:chain:{chainId} 执行rulego中注册的规则链 component:{nodeType} 执行在config.ComponentsRegistry 中注册的组件 可在DefaultExecutorFactory中注册自定义执行器组件类型 componentConfigs 组件配置参数

func (*From) ToComponent

func (f *From) ToComponent(node types.Node) *To

ToComponent to组件 参数是types.Node类型组件

func (*From) ToString

func (f *From) ToString() string

func (*From) Transform

func (f *From) Transform(transform Process) *From

Transform from端转换msg

type Message

type Message interface {
	//Body message body
	Body() []byte
	Headers() textproto.MIMEHeader
	From() string
	//GetParam http.Request#FormValue
	GetParam(key string) string
	//SetMsg set RuleMsg
	SetMsg(msg *types.RuleMsg)
	//GetMsg 把接收数据转换成 RuleMsg
	GetMsg() *types.RuleMsg
	//SetStatusCode 响应 code
	SetStatusCode(statusCode int)
	//SetBody 响应 body
	SetBody(body []byte)
}

Message 接收端点数据抽象接口 不同输入源数据统一接口

type Process

type Process func(router *Router, exchange *Exchange) bool

Process 处理函数 true:执行下一个处理器,否则不执行

type Router

type Router struct {

	//规则链池,默认使用rulego.DefaultRuleGo
	RuleGo *rulego.RuleGo
	//Config ruleEngine Config
	Config types.Config
	// contains filtered or unexported fields
}

Router 路由,抽象不同输入源数据路由 把消息从输入端(From),经过转换(Transform)成RuleMsg结构,或者处理Process,然后交给规则链处理(To) 或者 把消息从输入端(From),经过转换(Transform),然后处理响应(Process) 用法: http endpoint endpoint.NewRouter().From("/api/v1/msg/").Transform().To("chain:xx") endpoint.NewRouter().From("/api/v1/msg/").Transform().Process().To("chain:xx") endpoint.NewRouter().From("/api/v1/msg/").Transform().Process().To("component:nodeType") endpoint.NewRouter().From("/api/v1/msg/").Transform().Process() mqtt endpoint endpoint.NewRouter().From("#").Transform().Process().To("chain:xx") endpoint.NewRouter().From("topic").Transform().Process().To("chain:xx")

func NewRouter

func NewRouter(opts ...RouterOption) *Router

NewRouter 创建新的路由

func (*Router) Disable

func (r *Router) Disable(disable bool) *Router

Disable 设置状态 true:不可用,false:可以

func (*Router) From

func (r *Router) From(from string, configs ...types.Configuration) *From

func (*Router) FromToString

func (r *Router) FromToString() string

func (*Router) GetFrom

func (r *Router) GetFrom() *From

func (*Router) IsDisable

func (r *Router) IsDisable() bool

IsDisable 是否是不可用状态 true:不可用,false:可以

type RouterOption

type RouterOption func(*Router) error

RouterOption 选项函数

func WithRuleConfig

func WithRuleConfig(config types.Config) RouterOption

WithRuleConfig 更改规则引擎配置

func WithRuleGo

func WithRuleGo(ruleGo *rulego.RuleGo) RouterOption

WithRuleGo 更改规则链池,默认使用rulego.DefaultRuleGo

type To

type To struct {
	//toPath是否有占位符变量
	HasVars bool
	//Config to组件配置
	Config types.Configuration
	//Router router指针
	Router *Router
	//流转目标路径,例如"chain:{chainId}",则是交给规则引擎处理数据
	To string
	//去掉to执行器标记的路径
	ToPath string
	// contains filtered or unexported fields
}

To to端

func (*To) End

func (t *To) End() *Router

End 结束返回*Router

func (*To) Execute

func (t *To) Execute(ctx context.Context, exchange *Exchange)

Execute 执行To端逻辑

func (*To) GetProcessList

func (t *To) GetProcessList() []Process

GetProcessList 获取执行To端逻辑 处理器

func (*To) Process

func (t *To) Process(process Process) *To

Process 执行To端逻辑 后处理

func (*To) ToString

func (t *To) ToString() string

func (*To) ToStringByDict

func (t *To) ToStringByDict(dict map[string]interface{}) string

ToStringByDict 转换路径中的变量,并返回最终字符串

func (*To) Transform

func (t *To) Transform(transform Process) *To

Transform 执行To端逻辑 后转换

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL