tinyiotserver

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

README

一个轻量的物联网服务框架

它提供了一种简单的方式进行物联网服务端开发。可用于开发物联网控制中心。

架构图

Architecture Diagram

一、使用示例

我们以温度传感器控制空调为例,演示如何开发服务端的控制逻辑。示例代码在example/example_server

1、首先,创建处理器(handler)
	//处理温度事件,开空调
    //处理逻辑:如果温度>28度,开启空调制冷(制冷温度为26度)
    type TurnOnTheAirConditionerMsgHandler struct {
    }
    // msg 为温度传感器发送来的消息,内容示例:"35C";
    // newMsg 为控制逻辑处理后,生成的后续指令,供空调设备获取
    func (h *TurnOnTheAirConditionerMsgHandler) Handle(msg *tinyiotserver.Message) (newMsg *tinyiotserverMessage) {
        tag := "TurnOnTheAirConditionerMsgHandler"
        tmp := string(msg.Body.Payload)
        tmp = tmp[0 : len(tmp)-1]
        tmpInt, err := strconv.Atoi(tmp)
        if err != nil {
            return tinyiotserver.FailMsg("Temperature content is not a number")
        }
        //温度>28度
        if tmpInt > 28 {
            newMsg = msg.Copy()
            //生成空调消息(指令)
            newMsg.SetTopic("AirConditioner")
            newMsg.SetPayload([]byte("26C"))
            log.Printf("[%s] generate TurnOnAirConditioner msg:\n%s\n", tag, newMsg)
        }
        return
    }
2、注册处理器(handler)并启动服务
    func main() {
        //创建server
        server := tinyiotserver.NewServer(":7070")

        //创建“温度-空调”处理逻辑
        turnOnTheAirConditionerMsgHandler := &TurnOnTheAirConditionerMsgHandler{}
        //注册处理逻辑
        //Temperature是主题(topic)名,每个topic会对应一个消息队列,框架会根据msg中的topic来路由到合适的handler
        server.RegisterHandler("Temperature", turnOnTheAirConditionerMsgHandler)
        //启动服务
        server.Start()
    }
3、模拟温度传感器提交数据
    serverAddress := "127.0.0.1:7070"
    c, err := tinyiotserver.NewClient(serverAddress, 2)
	require.NoError(t, err)
	defer c.Close()

	//10000,单位ms,代表消息时效为10s
	err := c.Submit("Temperature", []byte("35C"), 10000)
    require.NoError(t, err)
4、模拟空调拉取温度调控指令
    serverAddress := "127.0.0.1:7070"
    c, err := tinyiotserver.NewClient(serverAddress, 2)
	require.NoError(t, err)
	defer c.Close()

	res, err = c.Query("AirConditioner", nil)
	require.NoError(t, err)
	log.Println("Query res: ", res)

二、客户端接入授权

示例代码在example/example_server_with_auth。示例中提供了一个简单的鉴权逻辑。大家可以仿照定制。

三、性能

1、硬件配置

HUAWEI nova 2手机

cpu:Hisilicon Kirin 659(arm64:4个2.36GHz A53 + 4个1.7GHz A51);内存:4GB;闪存:64GB

2、benchmark,局域网,示例代码:example/example_bench
    go test -bench=BenchmarkClient_Submit_S -benchtime=5s
    输出:
    BenchmarkClient_Submit_S-4        140514             91199 ns/op

    HUAWEI nova 2 CPU占用最大情况(%CPU总 = 800%):
    USER    %CPU    %MEM    CMDLINE                 VIRT    RES
    shell   103     0.1     tiny-iot-server-arm64   970M    7.2M

四、设计理念

简洁、高效、高度可定制。

Documentation

Index

Constants

View Source
const (
	//请求授权
	REQ_AUTH string = "REQ_AUTH"
	//响应授权
	RES_AUTH string = "RES_AUTH"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	// Close 关闭连接
	Close() error

	// Submit 发起请求,不需要等待服务器返回处理结果(并行安全),
	// expireAfterNowMillis用于设置消息的过期时间,单位为ms。过期时间 = 服务端接收消息的时间 + expireAfterNowMillis,
	// 当expireAfterNowMillis=0时,提交的消息无过期时间。
	Submit(topic string, payload []byte, expireAfterNowMillis uint32) error

	// SubmitWithAsyncCallback 发起请求,不等待服务器返回处理结果,通过callback异步处理服务端响应(并行不安全),
	// expireAfterNowMillis用于设置消息的过期时间,单位为ms。过期时间 = 服务端接收消息的时间 + expireAfterNowMillis,
	// 当expireAfterNowMillis=0时,提交的消息无过期时间。
	// callback 异步处理服务端响应。
	SubmitWithAsyncCallback(topic string, payload []byte, expireAfterNowMillis uint32, callback func(msg *common.Msg, occuredErr error)) error

	// SubmitWaitReply 发起请求,需要等待服务器返回处理结果(并行不安全)
	// expireAfterNowMillis用于设置消息的过期时间,单位为ms。过期时间 = 服务端接收消息的时间 + expireAfterNowMillis
	// 当expireAfterNowMillis=0时,提交的消息无过期时间
	SubmitWaitReply(topic string, payload []byte, expireAfterNowMillis uint32) (*common.Msg, error)

	// Query 查询topic对应的消息,不删除消息(并行不安全)
	Query(topic string, payload []byte) (*common.Msg, error)

	// QueryPop查询topic对应的消息,并删除消息(并行不安全)
	QueryPop(topic string, payload []byte) (*common.Msg, error)

	// QueryStream 发起流请求,通过Receive接收服务器返回处理结果,一个消息只能被一个client查询(并行不安全)
	QueryStream(topic string, payload []byte) (receive func() (*common.Msg, error), err error)

	// QueryStreamGroup 发起流请求,通过Receive接收服务器返回处理结果,每个消息可以被多个client查询(并行不安全)
	QueryStreamGroup(topic string, payload []byte) (receive func() (*common.Msg, error), err error)

	// Auth 请求授权
	Auth(payload []byte) error

	SetTimeOut(timeout time.Duration)
}

func NewClient

func NewClient(serverAddress string, clientId uint32) (Client, error)

func NewClientByConn added in v1.0.0

func NewClientByConn(conn TimeoutReadWriteCloser, clientId uint32) Client

func NewUnixClient added in v1.1.0

func NewUnixClient(serverAddress string, clientId uint32) (Client, error)

type HandlerRegistrar

type HandlerRegistrar interface {
	// RegisterGlobalPreHandler 注册全局的消息处理器,优先执行。
	RegisterGlobalPreHandler(handlers ...MessageHandler)

	// RegisterHandler 具体的消息处理器,在全局的消息处理器后执行。
	RegisterHandler(topic string, handlers ...MessageHandler)

	GetHandlers(topic string) []MessageHandler
}

HandlerRegistrar 消息处理器,注册中心。

type Message

type Message struct {
	*common.Msg
	ReceivedTime time.Time
	ExpiredTime  time.Time
	No           int64
}

func FailMsg

func FailMsg(topic string, errorInfo string) *Message

func ResponseMsg

func ResponseMsg(topic string, payload []byte) *Message

func SuccessMsg

func SuccessMsg(topic string) *Message

func (*Message) Copy

func (m *Message) Copy() *Message

func (*Message) IsEmpty added in v1.1.1

func (m *Message) IsEmpty() bool

func (*Message) IsExpired

func (m *Message) IsExpired() bool

func (*Message) SetPayload

func (m *Message) SetPayload(payload []byte)

func (*Message) SetTopic

func (m *Message) SetTopic(topic string)

func (*Message) String

func (msg *Message) String() string

type MessageHandler

type MessageHandler interface {
	// OnReceive 处理接收到的消息。
	// 如果返回的newMessage为nil,则继续执行后续的Handler。
	// 如果newMessage的Type为RESPONSE,则直接将newMessage通知客户端,终止后续handler。
	// 如果newMessage不为nil,则将newMessage视为新的消息(或指令),放入队列。
	OnReceive(requestMsg *Message) (newMessage *Message)

	// OnResponse 当服务端向客户端返回消息时,执行。当存在多个MessageHandler时,按照注册时的反序执行。
	OnResponse(responseMsg *Message, requestMsg *Message) (replyToClientMsg *Message)
}

MessageHandler 消息处理

var DefaultMsgHandler MessageHandler = &defaultMsgHandler{}

DefaultMsgHandler 默认消息处理,什么都不做。

var SuccessResponseMsgHandler MessageHandler = &successResponseMsgHandler{}

SuccessResponseMsgHandler POST请求,默认返回成功。

func NewSimpleAuthMsgHandler

func NewSimpleAuthMsgHandler(authorizeLogic func(msg *Message) bool, authCodeGen func(msg *Message) string) MessageHandler

NewSimpleAuthMsgHandler 授权设备接入。 创建设备接入授权Handler(简单版本,可仿造该版本随意定制)。 authorizeLogic:决定是否授权。 authCodeGen:生成授权码。

type MsgHandlerAdapter added in v1.1.0

type MsgHandlerAdapter struct {
}

MsgHandlerAdapter 实现了默认的OnResponse方法。

func (MsgHandlerAdapter) OnResponse added in v1.1.0

func (h MsgHandlerAdapter) OnResponse(responseMsg *Message, requestMsg *Message) (replyToClientMsg *Message)

type MsgHandlerSelector added in v1.1.1

type MsgHandlerSelector interface {
	// DefineProcessorInTaskChan 为每个processor定义一个输入chan,返回值为输入chan的id
	DefineProcessorInTaskChan() (processorInTaskChanIndex []int)
	// SelectInTaskChanIndex 将task分配给某个chan。
	SelectInTaskChanIndex(topic string, payload []byte) (inTaskChanIndex int)
}

type NetListenHandler

type NetListenHandler struct {
	flowprocess.TaskHandlerAdapter
	// contains filtered or unexported fields
}

Node0,监听端口

func (*NetListenHandler) Handle

func (h *NetListenHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error)

type ProcessMessageHandler

type ProcessMessageHandler struct {
	flowprocess.TaskHandlerAdapter
	// contains filtered or unexported fields
}

Node2 处理和回复消息

func (*ProcessMessageHandler) Handle

func (h *ProcessMessageHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) error

type Queue

type Queue interface {
	Len() int
	PushFront(msg *Message)
	PushBack(msg *Message)
	PopFront() (*Message, bool)
	PopBack() (*Message, bool)
	Front() (*Message, bool)
	Back() (*Message, bool)
	Wait() <-chan struct{}
	Size() int
}

func NewListQueue added in v1.1.0

func NewListQueue(maxLen int) Queue

func NewOrderQueue added in v1.1.0

func NewOrderQueue(maxLen int) Queue

func NewSyncBlockingQueue added in v1.1.0

func NewSyncBlockingQueue(q Queue) Queue

同步阻塞队列

func NewSyncQueue added in v1.0.0

func NewSyncQueue(q Queue) Queue

同步队列

type QueueBroadcastor added in v1.1.0

type QueueBroadcastor struct {
	// contains filtered or unexported fields
}

QueueBroadcastor 将sourceQueue里的数据,广播给多个接收者。例如: sourceQueue --msgs(goroutine0)--|---msgs---> [splitQueue1] --msgs(goroutine1)--> broadcast to Target1

|---msgs--->  [splitQueue2]  --msgs(goroutine2)-->  broadcast to Target2
|---msgs--->  [splitQueue3]  --msgs(goroutine3)-->  broadcast to Target3

func NewQueueBroadcastor added in v1.1.0

func NewQueueBroadcastor(ctx context.Context, sourceQueue Queue, topic string, heartbeatDuration time.Duration) *QueueBroadcastor

func (*QueueBroadcastor) Broadcast added in v1.1.0

func (qb *QueueBroadcastor) Broadcast(broadcastToTarget func(msg *Message) error, stopCondition func() <-chan struct{}) error

Broadcast 将sourceQueue里的数据,异步广播给broadcastToTarget。当stopCondition通道关闭时,停止广播。

type QueueMessageHandler

type QueueMessageHandler struct {
	flowprocess.TaskHandlerAdapter
	// contains filtered or unexported fields
}

Node3 将消息放入队列, 单goroutine运行,一般不用加锁

func NewQueueMessageHandler added in v1.1.0

func NewQueueMessageHandler(server *server) *QueueMessageHandler

func (*QueueMessageHandler) Handle

func (h *QueueMessageHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) error

type ReadMessageHandler

type ReadMessageHandler struct {
	flowprocess.TaskHandlerAdapter
	// contains filtered or unexported fields
}

Node1,组装消息

func (*ReadMessageHandler) Handle

func (h *ReadMessageHandler) Handle(inTask flowprocess.Task, dispatch func(outTask flowprocess.Task) error) (err error)

type Server

type Server interface {
	HandlerRegistrar
	Start() error
	// Submit 在Server启动后,直接提交消息
	Submit(msg *common.Msg) (*common.Msg, error)
	Close() error
}

func NewServer

func NewServer(address string, options ...ServerOption) Server

type ServerOption

type ServerOption func(s *server)

func WithDefaultQueueSize

func WithDefaultQueueSize(defaultQueueSize int) ServerOption

WithDefaultQueueSize 配置默认的topicQueueSize

func WithHeartbeatDuration added in v1.0.0

func WithHeartbeatDuration(heartbeatDuration time.Duration) ServerOption

WithHeartbeatDuration 心跳周期

func WithMsgHandlerSelector added in v1.1.1

func WithMsgHandlerSelector(msgHandlerSelector MsgHandlerSelector) ServerOption

func WithNetwork added in v1.1.0

func WithNetwork(network string) ServerOption

WithNetwork 必须是 "tcp", "tcp4", "tcp6", "unix", "unixpacket".

func WithSocketWriteDeadline

func WithSocketWriteDeadline(socketWriteDeadline time.Duration) ServerOption

WithSocketWriteDeadline 配置端口超时

func WithTopicQueueSize

func WithTopicQueueSize(topicQueueSize map[string]int) ServerOption

WithTopicQueueSize 配置topicQueueSize

type TimeoutReadWriteCloser added in v1.1.0

type TimeoutReadWriteCloser interface {
	io.ReadWriteCloser
	SetReadTimeout(timeout time.Duration)
	SetWriteTimeout(timeout time.Duration)
}

type TopicQueue added in v1.1.0

type TopicQueue struct {
	Queue
	Topic string
}

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL