gmqtt

package module
v0.0.0-...-dfa80c7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2019 License: MIT Imports: 20 Imported by: 1

README

中文文档

Gmqtt Build Status codecov Go Report Card

Gmqtt provides:

  • MQTT broker that fully implements the MQTT protocol V3.1.1.
  • Golang MQTT broker package for secondary development.
  • MQTT protocol pack/unpack package for implementing MQTT clients or testing.
  • MQTT broker benchmark tool.

Change Log

2018.12.15

  • Supported go modules.
  • Restructured the package layout.

2018.12.2

  • Optimized data structure of subscriptions that increased QPS & reduced response times.
  • Updated benchmark results of optimization .

2018.11.25

  • Added benchmark tool.
  • Refacotoring & improved performance.
  • Performance improvement on message distributing
  • Added error information in OnClose()

2018.11.18

  • Removed sessions/messages persistence which need a redesign.
  • Added monitor/management API, added cmd/broker/restapi as an example.
  • Added publish/subscribe/unsubscribe API, added cmd/broker/restapi as an example.
  • Added session message queue.
  • Refactoring & bug fixed.

Features

  • Built-in hook methods so you can customized the behaviours of your project(Authentication, ACL, etc..)
  • Support tls/ssl and websocket
  • Provide monitor/management API
  • Provide publish/subscribe/unsubscribe API

Installation

$ go get -u github.com/gbl08ma/gmqtt

Get Started

Build-in MQTT broker

Build-in MQTT broker

Using gmqtt Library for Secondary Development

The features of build-in MQTT broker are not rich enough.It is not implementing some features such as Authentication, ACL etc.. So It is recommend to use gmqtt package to customized your broker:

func main() {

    s := gmqtt.NewServer()
   
    ln, err := net.Listen("tcp", ":1883")
    if err != nil {
        log.Fatalln(err.Error())
        return
    }
    crt, err := tls.LoadX509KeyPair("../testcerts/server.crt", "../testcerts/server.key")
    if err != nil {
        log.Fatalln(err.Error())
        return
    }
    tlsConfig := &tls.Config{}
    tlsConfig.Certificates = []tls.Certificate{crt}
    tlsln, err := tls.Listen("tcp", ":8883", tlsConfig)
    if err != nil {
        log.Fatalln(err.Error())
        return
    }
    s.AddTCPListenner(ln)
    s.AddTCPListenner(tlsln)
    //Configures and registers callback before s.Run()
    s.SetMaxInflightMessages(20)
    s.SetMaxQueueMessages(99999)
    s.RegisterOnSubscribe(func(client *gmqtt.Client, topic packets.Topic) uint8 {
        if topic.Name == "test/nosubscribe" {
            return packets.SUBSCRIBE_FAILURE
        }
        return topic.Qos
    })
    s.Run()
    fmt.Println("started...")
    signalCh := make(chan os.Signal, 1)
    signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
    <-signalCh
    s.Stop(context.Background())
    fmt.Println("stopped")
}

See /examples for more details.

Documentation

godoc

Hooks

Gmqtt implements the following hooks:

  • OnAccept (Only for tcp/ssl, not for ws/wss)
  • OnConnect
  • OnSubscribe
  • OnPublish
  • OnClose
  • OnStop

See /examples/hook for more detail.

OnAccept
// OnAccept will be called after a new connection established in TCP server. If returns false, the connection will be close directly.
type OnAccept func(conn net.Conn) bool

This hook may be used to block some invalid connections.(blacklist, rate-limiting, etc..)

OnConnect()
// OnConnect will be called when a valid connect packet is received.
// It returns the code of the connack packet.
type OnConnect func(client *Client) (code uint8)

This hook may be used to implement Authentication process.For example:

...
server.RegisterOnConnect(func(client *server.Client) (code uint8) {
  username := client.ClientOptions().Username
  password := client.ClientOptions().Password
  if validateUser(username, password) { //Authentication info may save in DB,File System, memory, etc.
    return packets.CodeAccepted
  } else {
    return packets.CodeBadUsernameorPsw
  }
})

OnSubscribe()

This method is called after receiving MQTT SUBSCRIBE packet. It returns the maximum QoS level that was granted to the subscription that was requested by the SUBSCRIBE packet.

/*
OnSubscribe returns the maximum available QoS for the topic:
 0x00 - Success - Maximum QoS 0
 0x01 - Success - Maximum QoS 1
 0x02 - Success - Maximum QoS 2
 0x80 - Failure
*/
type OnSubscribe func(client *Client, topic packets.Topic) uint8

This hook may be used to implement ACL(Access Control List) process.For example:

...
server.RegisterOnSubscribe(func(client *server.Client, topic packets.Topic) uint8 {
  if client.ClientOptions().Username == "root" { //alow root user to subscribe whatever he wants
    return topic.Qos
  } else {
    if topic.Qos <= packets.QOS_1 {
      return topic.Qos
    }
    return packets.QOS_1   //for other users, the maximum QoS level is QoS1
  }
})
OnPublish()

This method will be called after receiving a MQTT PUBLISH packet.

// OnPublish returns whether the publish packet will be delivered or not.
// If returns false, the packet will not be delivered to any clients.
type OnPublish func(client *Client, publish *packets.Publish) bool

For example:

...
server.RegisterOnPublish(func(client *server.Client, publish *packets.Publish)  bool {
  if client.ClientOptions().Username == "subscribeonly" {
    client.Close()  //2.close the Network Connection
    return false
  }
  //Only qos1 & qos0 are acceptable(will be delivered)
	if publish.Qos == packets.QOS_2 {
    return false  //1.make a positive acknowledgement but not going to distribute the packet
  }
  return true
})

If a Server implementation does not authorize a PUBLISH to be performed by a Client; it has no way of informing that Client. It MUST either 1.make a positive acknowledgement, according to the normal QoS rules, or 2.close the Network Connection [MQTT-3.3.5-2].

OnClose()
// OnClose will be called after the tcp connection of the client has been closed
type OnClose func(client *Client, err error)
OnStop()
// OnStop will be called on server.Stop()
type OnStop func()

Server Stop Process

Call server.Stop() to stop the broker gracefully:

  1. Closing all open TCP listeners and shutting down all open websocket servers
  2. Closing all idle connections
  3. Waiting for all connections have been closed
  4. Triggering OnStop()

Test

Unit Test

$ go test -race .
$ cd pkg/packets
$ go test -race .

Integration Test

Pass paho.mqtt.testing.

Benchmark Test

Documentation & Results

TODO

  • Improve documentation
  • Performance comparison [EMQ/Mosquito]

Documentation

Overview

Package gmqtt provides an MQTT v3.1.1 server library.

Example

see /examples for more details.

s := NewServer()
ln, err := net.Listen("tcp", ":1883")
if err != nil {
	fmt.Println(err.Error())
	return
}
s.AddTCPListenner(ln)
ws := &WsServer{
	Server: &http.Server{Addr: ":8080"},
}
s.AddWebSocketServer(ws)

s.RegisterOnConnect(func(client *Client) (code uint8) {
	return packets.CodeAccepted
})
s.RegisterOnSubscribe(func(client *Client, topic packets.Topic) uint8 {
	fmt.Println("register onSubscribe callback")
	return packets.QOS_1
})
//register other callback before s.Run()

s.Run()
fmt.Println("started...")
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
<-signalCh
s.Stop(context.Background())
fmt.Println("stopped")
Output:

Index

Examples

Constants

View Source
const (
	Connecting = iota
	Connected
	Switiching
	Disconnected
)

Client status

View Source
const (
	StatusOnline  = "online"
	StatusOffline = "offline"
)

Client Status

View Source
const (
	DefaultDeliveryRetryInterval = 20 * time.Second
	DefaultQueueQos0Messages     = true
	DefaultMaxInflightMessages   = 20
	DefaultMaxQueueMessages      = 2048
	DefaultMsgRouterLen          = 4096
	DefaultRegisterLen           = 2048
	DefaultUnRegisterLen         = 2048
)

Default configration

Variables

View Source
var (
	ErrInvalStatus    = errors.New("invalid connection status")
	ErrConnectTimeOut = errors.New("connect time out")
)

Error

View Source
var (
	// ErrInvalWsMsgType [MQTT-6.0.0-1]
	ErrInvalWsMsgType = errors.New("invalid websocket message type")
)

Functions

func SetLogger

func SetLogger(l *logger.Logger)

SetLogger sets the logger. It is used in DEBUG mode.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a MQTT client

func (*Client) ClientOptions

func (client *Client) ClientOptions() ClientOptions

ClientOptions returns the ClientOptions. This is mainly used in callback functions. See ./example/hook

func (*Client) Close

func (client *Client) Close() <-chan struct{}

Close 关闭客户端连接,连接关闭完毕会将返回的channel关闭。

Close closes the client connection. The returned channel will be closed after unregister process has been done

func (*Client) IsConnected

func (client *Client) IsConnected() bool

IsConnected returns whether the client is connected or not.

func (*Client) SetUserData

func (client *Client) SetUserData(data interface{})

SetUserData is used to bind user data to the client

func (*Client) Status

func (client *Client) Status() int32

Status returns client's status

func (*Client) UserData

func (client *Client) UserData() interface{}

UserData returns the user data

type ClientInfo

type ClientInfo struct {
	ClientID     string    `json:"client_id"`
	Username     string    `json:"username"`
	RemoteAddr   string    `json:"remote_addr"`
	CleanSession bool      `json:"clean_session"`
	KeepAlive    uint16    `json:"keep_alive"`
	ConnectedAt  time.Time `json:"connected_at"`
}

ClientInfo represents a connected client

type ClientList

type ClientList []ClientInfo

ClientList represents ClientInfo slice

func (ClientList) Len

func (c ClientList) Len() int

func (ClientList) Less

func (c ClientList) Less(i, j int) bool

func (ClientList) Swap

func (c ClientList) Swap(i, j int)

type ClientOptions

type ClientOptions struct {
	ClientID     string
	Username     string
	Password     string
	KeepAlive    uint16
	CleanSession bool
	WillFlag     bool
	WillRetain   bool
	WillQos      uint8
	WillTopic    string
	WillPayload  []byte
}

ClientOptions is mainly used in callback functions. See ClientOptions()

type InflightElem

type InflightElem struct {
	//At is the entry time
	At time.Time
	//Pid is the packetID
	Pid packets.PacketID
	//Packet represents Publish packet
	Packet *packets.Publish
	Step   int
}

InflightElem is the element type in inflight queue

type Monitor

type Monitor struct {
	sync.Mutex
	Repository MonitorRepository
}

Monitor is used internally to save and get monitor data

func (*Monitor) ClientSubscriptions

func (m *Monitor) ClientSubscriptions(clientID string) SubscriptionList

ClientSubscriptions returns the subscription info for the given clientID

func (*Monitor) Clients

func (m *Monitor) Clients() ClientList

Clients returns the info for all connected clients

func (*Monitor) GetClient

func (m *Monitor) GetClient(clientID string) (ClientInfo, bool)

GetClient returns the client info for the given clientID

func (*Monitor) GetSession

func (m *Monitor) GetSession(clientID string) (SessionInfo, bool)

GetSession returns the session info for the given clientID

func (*Monitor) Sessions

func (m *Monitor) Sessions() SessionList

Sessions returns the session info for all sessions

func (*Monitor) Subscriptions

func (m *Monitor) Subscriptions() SubscriptionList

Subscriptions returns all subscription info

type MonitorRepository

type MonitorRepository interface {
	//Open opens the repository
	Open() error
	//Close closes the repository
	Close() error
	//PutClient puts a ClientInfo into the repository when the client connects
	PutClient(info ClientInfo)
	//GetClient returns the ClientInfo for the given clientID
	GetClient(clientID string) (ClientInfo, bool)
	//Clients returns ClientList which is the list for all connected clients, this method should be idempotency
	Clients() ClientList
	//DelClient deletes the ClientInfo from repository
	DelClient(clientID string)
	//PutSession puts a SessionInfo into monitor repository when the client is connects
	PutSession(info SessionInfo)
	//GetSession returns the SessionInfo for the given clientID
	GetSession(clientID string) (SessionInfo, bool)
	//Sessions returns SessionList which is the list for all sessions including online sessions and offline sessions, this method should be idempotency
	Sessions() SessionList
	//DelSession deletes the SessionInfo from repository
	DelSession(clientID string)
	//ClientSubscriptions returns the SubscriptionList for given clientID, this method should be idempotency
	ClientSubscriptions(clientID string) SubscriptionList
	//DelClientSubscriptions deletes the subscription info for given clientID from the repository
	DelClientSubscriptions(clientID string)
	//PutSubscription puts the SubscriptionsInfo into the repository when a new subscription is made
	PutSubscription(info SubscriptionsInfo)
	//DelSubscription deletes the topic for given clientID from repository
	DelSubscription(clientID string, topicName string)
	//Subscriptions returns all  subscriptions of the server
	Subscriptions() SubscriptionList
}

MonitorRepository is an interface which can be used to provide a persistence mechanics for the monitor data

type MonitorStore

type MonitorStore struct {
	// contains filtered or unexported fields
}

MonitorStore implements the MonitorRepository interface to provide an in-memory monitor repository

func (*MonitorStore) ClientSubscriptions

func (m *MonitorStore) ClientSubscriptions(clientID string) SubscriptionList

ClientSubscriptions returns the SubscriptionList for given clientID, this method should be idempotency

func (*MonitorStore) Clients

func (m *MonitorStore) Clients() ClientList

Clients returns ClientList which is the list for all connected clients, this method should be idempotency

func (*MonitorStore) Close

func (m *MonitorStore) Close() error

Close close the repository

func (*MonitorStore) DelClient

func (m *MonitorStore) DelClient(clientID string)

DelClient deletes the ClientInfo from repository

func (*MonitorStore) DelClientSubscriptions

func (m *MonitorStore) DelClientSubscriptions(clientID string)

DelClientSubscriptions deletes the subscription info for given clientID from the repository

func (*MonitorStore) DelSession

func (m *MonitorStore) DelSession(clientID string)

DelSession deletes the SessionInfo from repository

func (*MonitorStore) DelSubscription

func (m *MonitorStore) DelSubscription(clientID string, topicName string)

DelSubscription deletes the topic for given clientID from repository

func (*MonitorStore) GetClient

func (m *MonitorStore) GetClient(clientID string) (ClientInfo, bool)

GetClient returns the ClientInfo for the given clientID

func (*MonitorStore) GetSession

func (m *MonitorStore) GetSession(clientID string) (SessionInfo, bool)

GetSession returns the SessionInfo for the given clientID

func (*MonitorStore) Open

func (m *MonitorStore) Open() error

Open opens the repository

func (*MonitorStore) PutClient

func (m *MonitorStore) PutClient(info ClientInfo)

PutClient puts a ClientInfo into the repository when the client connects

func (*MonitorStore) PutSession

func (m *MonitorStore) PutSession(info SessionInfo)

PutSession puts a SessionInfo into monitor repository when the client is connects

func (*MonitorStore) PutSubscription

func (m *MonitorStore) PutSubscription(info SubscriptionsInfo)

PutSubscription puts the SubscriptionsInfo into the repository when a new subscription is made

func (*MonitorStore) Sessions

func (m *MonitorStore) Sessions() SessionList

Sessions returns SessionList which is the list for all sessions including online sessions and offline sessions, this method should be idempotency

func (*MonitorStore) Subscriptions

func (m *MonitorStore) Subscriptions() SubscriptionList

Subscriptions returns all subscriptions of the server

type OnAccept

type OnAccept func(conn net.Conn) bool

OnAccept 会在新连接建立的时候调用,只在TCP server中有效。如果返回false,则会直接关闭连接

OnAccept will be called after a new connection established in TCP server. If returns false, the connection will be close directly.

type OnClose

type OnClose func(client *Client, err error)

OnClose tcp连接关闭之后触发

OnClose will be called after the tcp connection of the client has been closed

type OnConnect

type OnConnect func(client *Client) (code uint8)

OnConnect 当合法的connect报文到达的时候触发,返回connack中响应码

OnConnect will be called when a valid connect packet is received. It returns the code of the connack packet

type OnPublish

type OnPublish func(client *Client, publish *packets.Publish) bool

OnPublish 返回接收到的publish报文是否允许转发,返回false则该报文不会被继续转发

OnPublish returns whether the publish packet will be delivered or not. If returns false, the packet will not be delivered to any clients.

type OnStop

type OnStop func()

OnStop will be called on server.Stop()

type OnSubscribe

type OnSubscribe func(client *Client, topic packets.Topic) uint8

OnSubscribe 返回topic允许订阅的最高QoS等级

OnSubscribe returns the maximum available QoS for the topic:

0x00 - Success - Maximum QoS 0
0x01 - Success - Maximum QoS 1
0x02 - Success - Maximum QoS 2
0x80 - Failure

type Server

type Server struct {

	//Monitor
	Monitor *Monitor
	// contains filtered or unexported fields
}

Server represents a mqtt server instance. Create an instance of Server, by using NewServer()

func NewServer

func NewServer() *Server

NewServer returns a default gmqtt server instance

func (*Server) AddTCPListenner

func (srv *Server) AddTCPListenner(ln ...net.Listener)

AddTCPListenner adds tcp listeners to mqtt server. This method enables the mqtt server to serve on multiple ports.

func (*Server) AddWebSocketServer

func (srv *Server) AddWebSocketServer(Server ...*WsServer)

AddWebSocketServer adds websocket server to mqtt server.

func (*Server) Broadcast

func (srv *Server) Broadcast(publish *packets.Publish, clientIDs ...string)

Broadcast 广播一个消息,此消息不受主题限制。默认广播到所有的客户端中去,如果clientIDs有设置,则只会广播到clientIDs指定的客户端。

Broadcast broadcasts the message to all clients. If the second param clientIDs is set, the message will only send to the clients specified by the clientIDs.

Notice: This method will not trigger the onPublish callback

func (*Server) Client

func (srv *Server) Client(clientID string) *Client

Client returns all the connected clients

func (*Server) Publish

func (srv *Server) Publish(publish *packets.Publish, clientIDs ...string)

Publish 主动发布一个主题,如果clientIDs没有设置,则默认会转发到所有有匹配主题的客户端,如果clientIDs有设置,则只会转发到clientIDs指定的有匹配主题的客户端。

Publish publishs a message to the broker. If the second param is not set, the message will be distributed to any clients that has matched subscriptions. If the second param clientIDs is set, the message will only try to distributed to the clients specified by the clientIDs

Notice: This method will not trigger the onPublish callback

func (*Server) RegisterOnAccept

func (srv *Server) RegisterOnAccept(callback OnAccept)

RegisterOnAccept registers a onAccept callback. A panic will cause if any RegisterOnXXX is called after server.Run()

func (*Server) RegisterOnClose

func (srv *Server) RegisterOnClose(callback OnClose)

RegisterOnClose registers a onClose callback.

func (*Server) RegisterOnConnect

func (srv *Server) RegisterOnConnect(callback OnConnect)

RegisterOnConnect registers a onConnect callback.

func (*Server) RegisterOnPublish

func (srv *Server) RegisterOnPublish(callback OnPublish)

RegisterOnPublish registers a onPublish callback.

func (*Server) RegisterOnStop

func (srv *Server) RegisterOnStop(callback OnStop)

RegisterOnStop registers a onStop callback.

func (*Server) RegisterOnSubscribe

func (srv *Server) RegisterOnSubscribe(callback OnSubscribe)

RegisterOnSubscribe registers a onSubscribe callback.

func (*Server) Run

func (srv *Server) Run()

Run starts the mqtt server. This method is non-blocking

func (*Server) SetDeliveryRetryInterval

func (srv *Server) SetDeliveryRetryInterval(duration time.Duration)

SetDeliveryRetryInterval sets the delivery retry interval.

func (*Server) SetMaxInflightMessages

func (srv *Server) SetMaxInflightMessages(i int)

SetMaxInflightMessages sets the maximum inflight messages.

func (*Server) SetMaxQueueMessages

func (srv *Server) SetMaxQueueMessages(nums int)

SetMaxQueueMessages sets the maximum queue messages.

func (*Server) SetMsgRouterLen

func (srv *Server) SetMsgRouterLen(i int)

SetMsgRouterLen sets the length of msgRouter channel.

func (*Server) SetQueueQos0Messages

func (srv *Server) SetQueueQos0Messages(b bool)

SetQueueQos0Messages sets whether to queue QoS 0 messages. Default to true.

func (*Server) SetRegisterLen

func (srv *Server) SetRegisterLen(i int)

SetRegisterLen sets the length of register channel.

func (*Server) SetUnregisterLen

func (srv *Server) SetUnregisterLen(i int)

SetUnregisterLen sets the length of unregister channel.

func (*Server) Status

func (srv *Server) Status() int32

Status returns the server status

func (*Server) Stop

func (srv *Server) Stop(ctx context.Context) error

Stop gracefully stops the mqtt server by the following steps:

  1. Closing all open TCP listeners and shutting down all open websocket servers
  2. Closing all idle connections
  3. Waiting for all connections have been closed
  4. Triggering OnStop()

func (*Server) Subscribe

func (srv *Server) Subscribe(clientID string, topics []packets.Topic)

Subscribe 为某一个客户端订阅主题

Subscribe subscribes topics for the client specified by clientID.

Notice: This method will not trigger the onSubscribe callback

func (*Server) UnSubscribe

func (srv *Server) UnSubscribe(clientID string, topics []string)

UnSubscribe 为某一个客户端取消订阅某个主题

UnSubscribe unsubscribes topics for the client specified by clientID.

type SessionInfo

type SessionInfo struct {
	ClientID        string    `json:"client_id"`
	Status          string    `json:"status"`
	RemoteAddr      string    `json:"remote_addr"`
	CleanSession    bool      `json:"clean_session"`
	Subscriptions   int       `json:"subscriptions"`
	MaxInflight     int       `json:"max_inflight"`
	InflightLen     int       `json:"inflight_len"`
	MaxMsgQueue     int       `json:"max_msg_queue"`
	MsgQueueLen     int       `json:"msg_queue_len"`
	MsgQueueDropped int       `json:"msg_queue_dropped"`
	ConnectedAt     time.Time `json:"connected_at"`
	OfflineAt       time.Time `json:"offline_at,omitempty"`
}

SessionInfo represents a session

type SessionList

type SessionList []SessionInfo

SessionList represent SessionInfo slice

func (SessionList) Len

func (s SessionList) Len() int

func (SessionList) Less

func (s SessionList) Less(i, j int) bool

func (SessionList) Swap

func (s SessionList) Swap(i, j int)

type SubscriptionList

type SubscriptionList []SubscriptionsInfo

SubscriptionList is SubscriptionsInfo slice

func (SubscriptionList) Len

func (s SubscriptionList) Len() int

func (SubscriptionList) Less

func (s SubscriptionList) Less(i, j int) bool

func (SubscriptionList) Swap

func (s SubscriptionList) Swap(i, j int)

type SubscriptionsInfo

type SubscriptionsInfo struct {
	ClientID string    `json:"client_id"`
	Qos      uint8     `json:"qos"`
	Name     string    `json:"name"`
	At       time.Time `json:"at"`
}

SubscriptionsInfo represents a subscription of a session

type WsServer

type WsServer struct {
	Server   *http.Server
	CertFile string //TLS configration
	KeyFile  string //TLS configration
}

WsServer is used to build websocket server

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL