README

中文文档

mqtt Mentioned in Awesome Go Build Status codecov Go Report Card

mqtt provides:

  • MQTT broker that fully implements the MQTT protocol V3.1.1.
  • Golang MQTT broker package for secondary development.
  • MQTT protocol pack/unpack package for implementing MQTT clients or testing.

Installation

$ go get -u github.com/DrmagicE/mqtt

Features

  • Provide hook method to customized the broker behaviours(Authentication, ACL, etc..). See hooks.go for more details
  • Support tls/ssl and websocket
  • Enable user to write plugins. See plugin.go and /plugin for more details.
  • Provide abilities for extensions to interact with the server. See Server interface in server.go and example_test.go for more details.
  • Provide metrics (by using Prometheus). (plugin: prometheus)
  • Provide restful API to interact with server. (plugin:management)

Limitations

  • The retained messages are not persisted when the server exit.
  • Cluster is not supported.

Get Started

Build-in MQTT broker

$ cd cmd/broker
$ go run main.go

The broker will listen on port 1883 for TCP and 8080 for websocket. The broker loads the following plugins:

  • management: Listens on port 8081, provides restful api service
  • prometheus: Listens on port 8082, serve as a prometheus exporter with /metrics path.

Docker

$ docker build -t mqtt .
$ docker run -p 1883:1883 -p  8081:8081 -p 8082:8082 mqtt

Build with external source code

The features of build-in MQTT broker are not rich enough.It is not implementing some features such as Authentication, ACL etc.. But It is easy to write your own plugins to extend the broker.

func main() {
	// listener
	ln, err := net.Listen("tcp", ":1883")
	if err != nil {
		log.Fatalln(err.Error())
		return
	}
	// websocket server
	ws := &mqtt.WsServer{
		Server: &http.Server{Addr: ":8080"},
		Path:   "/ws",
	}
	if err != nil {
		panic(err)
	}

	l, _ := zap.NewProduction()
	// l, _ := zap.NewDevelopment()
	s := mqtt.NewServer(
		mqtt.WithTCPListener(ln),
		mqtt.WithWebsocketServer(ws),
		// Add your plugins
		mqtt.WithPlugin(management.New(":8081", nil)),
		mqtt.WithPlugin(prometheus.New(&http.Server{
			Addr: ":8082",
		}, "/metrics")),
		mqtt.WithLogger(l),
	)

	s.Run()
	signalCh := make(chan os.Signal, 1)
	signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
	<-signalCh
	s.Stop(context.Background())
}

See /examples for more details.

Documentation

godoc

Hooks

mqtt implements the following hooks:

  • OnAccept (Only for tcp/ssl, not for ws/wss)
  • OnConnect
  • OnConnected
  • OnSessionCreated
  • OnSessionResumed
  • OnSessionTerminated
  • OnSubscribe
  • OnSubscribed
  • OnUnsubscribed
  • OnMsgArrived
  • OnAcked
  • OnMsgDropped
  • OnDeliver
  • OnClose
  • OnStop

See /examples/hook for more detail.

Stop the Server

Call server.Stop() to stop the broker gracefully:

  1. Close all open TCP listeners and shutting down all open websocket servers
  2. Close all idle connections
  3. Wait for all connections have been closed
  4. Trigger OnStop().

Test

Unit Test

$ go test -race . && go test -race packets
$ cd packets
$ go test -race .

Integration Test

Pass paho.mqtt.testing.

TODO

  • Support MQTT V3 and V5.
  • Support bridge mode and maybe cluster.

Breaking changes may occur when adding this new features.

Expand ▾ Collapse ▴

Documentation

Overview

Package mqtt provides an MQTT v3.1.1 server library.

Example

see /examples for more details.

Code:

ln, err := net.Listen("tcp", ":1883")
if err != nil {
	fmt.Println(err.Error())
	return
}

ws := &WsServer{
	Server: &http.Server{Addr: ":8080"},
	Path:   "/",
}
l, _ := zap.NewProduction()
srv := NewServer(
	WithTCPListener(ln),
	WithWebsocketServer(ws),

	// add config
	WithConfig(DefaultConfig),
	// add plugins
	// WithPlugin(prometheus.New(&http.Server{Addr: ":8082"}, "/metrics")),
	// add Hook
	WithHook(Hooks{
		OnConnect: func(client Client) (code uint8) {
			return packets.CodeAccepted
		},
		OnSubscribe: func(client Client, topic packets.Topic) (qos uint8) {
			fmt.Println("register onSubscribe callback")
			return packets.QOS_1
		},
	}),
	// add logger
	WithLogger(l),
)

srv.Run()
fmt.Println("started...")
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
<-signalCh
srv.Stop(context.Background())
fmt.Println("stopped")

Index

Examples

Constants

const (
	Connecting = iota
	Connected
	Switiching
	Disconnected
)

Client status


const (
	DefaultMsgRouterLen  = 4096
	DefaultRegisterLen   = 2048
	DefaultUnRegisterLen = 2048
)

Default configration


Variables

var (
	ErrInvalStatus    = errors.New("invalid connection status")
	ErrConnectTimeOut = errors.New("connect time out")
)

Error


var DefaultConfig = Config{
	RetryInterval:              20 * time.Second,
	RetryCheckInterval:         20 * time.Second,
	SessionExpiryInterval:      0 * time.Second,
	SessionExpiryCheckInterval: 0 * time.Second,
	QueueQos0Messages:          true,
	MaxInflight:                32,
	MaxAwaitRel:                100,
	MaxMsgQueue:                1000,
	DeliveryMode:               OnlyOnce,
	MsgRouterLen:               DefaultMsgRouterLen,
	RegisterLen:                DefaultRegisterLen,
	UnregisterLen:              DefaultUnRegisterLen,
}

DefaultConfig default config used by NewServer()


var (
	// ErrInvalWsMsgType [MQTT-6.0.0-1]
	ErrInvalWsMsgType = errors.New("invalid websocket message type")
)

Functions

func LoggerWithField

func LoggerWithField(fields ...zap.Field) *zap.Logger

LoggerWithField add fields to a new logger. Plugins can use this method to add plugin name field.

func NewMessage

func NewMessage(topic string, payload []byte, qos uint8, opts ...msgOptions) packets.Message

NewMessage creates a message for publish service.

func RandomID

func RandomID() string

func Retained

func Retained(retained bool) msgOptions

Retained sets retained flag to the message

Types

type Client

type Client interface {
	// OptionsReader returns ClientOptionsReader for reading options data.
	OptionsReader() ClientOptionsReader
	// IsConnected returns whether the client is connected.
	IsConnected() bool
	// ConnectedAt returns the connected time
	ConnectedAt() time.Time
	// DisconnectedAt return the disconnected time
	DisconnectedAt() time.Time
	// Connection returns the raw net.Conn
	Connection() net.Conn
	// Close closes the client connection. The returned channel will be closed after unregister process has been done
	Close() <-chan struct{}

	GetSessionStatsManager() SessionStatsManager

	// 自定义数据
	Get(key string) (nson.Value, bool)
	Set(key string, value nson.Value)
	Del(key string)
}

Client represent

type ClientOptionsReader

type ClientOptionsReader interface {
	ClientID() string
	Username() string
	Password() string
	KeepAlive() uint16
	CleanSession() bool
	WillFlag() bool
	WillRetain() bool
	WillQos() uint8
	WillTopic() string
	WillPayload() []byte
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
}

ClientOptionsReader is mainly used in callback functions.

type ClientStats

type ClientStats struct {
	ConnectedTotal    uint64
	DisconnectedTotal uint64
	// ActiveCurrent is the number of current active session.
	ActiveCurrent uint64
	// InactiveCurrent is the number of current inactive session.
	InactiveCurrent uint64
	// ExpiredTotal is the number of expired session.
	ExpiredTotal uint64
}

ClientStats provides the statistics of client connections.

type Config

type Config struct {
	RetryInterval              time.Duration
	RetryCheckInterval         time.Duration
	SessionExpiryInterval      time.Duration
	SessionExpiryCheckInterval time.Duration
	QueueQos0Messages          bool
	MaxInflight                int
	MaxAwaitRel                int
	MaxMsgQueue                int
	DeliveryMode               DeliveryMode
	MsgRouterLen               int
	RegisterLen                int
	UnregisterLen              int
}

type DeliveryMode

type DeliveryMode int
const (
	Overlap  DeliveryMode = 0
	OnlyOnce DeliveryMode = 1
)

type MessageStats

type MessageStats struct {
	Qos0 struct {
		DroppedTotal  uint64
		ReceivedTotal uint64
		SentTotal     uint64
	}
	Qos1 struct {
		DroppedTotal  uint64
		ReceivedTotal uint64
		SentTotal     uint64
	}
	Qos2 struct {
		DroppedTotal  uint64
		ReceivedTotal uint64
		SentTotal     uint64
	}
	QueuedCurrent uint64
}

MessageStats represents the statistics of PUBLISH packet, separated by QOS.

type OnAccept

type OnAccept func(conn net.Conn) bool

OnAccept 会在新连接建立的时候调用,只在TCP server中有效。如果返回false,则会直接关闭连接

OnAccept will be called after a new connection established in TCP server. If returns false, the connection will be close directly.

type OnAcked

type OnAcked func(client Client, msg packets.Message)

OnAcked 当客户端对qos1或qos2返回确认的时候调用

OnAcked will be called when receiving the ack packet for a published qos1 or qos2 message.

type OnClose

type OnClose func(client Client, err error)

OnClose tcp连接关闭之后触发

OnClose will be called after the tcp connection of the client has been closed

type OnConnect

type OnConnect func(client Client) (code uint8)

OnConnect 当合法的connect报文到达的时候触发,返回connack中响应码

OnConnect will be called when a valid connect packet is received. It returns the code of the connack packet

type OnConnected

type OnConnected func(client Client)

OnConnected 当客户端成功连接后触发

OnConnected will be called when a mqtt client connect successfully.

type OnDeliver

type OnDeliver func(client Client, msg packets.Message)

OnDeliver 分发消息时触发

OnDeliver will be called when publishing a message to a client.

type OnMsgArrived

type OnMsgArrived func(client Client, msg packets.Message) (valid bool)

OnMsgArrived 返回接收到的publish报文是否允许转发,返回false则该报文不会被继续转发

OnMsgArrived returns whether the publish packet will be delivered or not. If returns false, the packet will not be delivered to any clients.

type OnMsgDropped

type OnMsgDropped func(client Client, msg packets.Message)

OnMessageDropped 丢弃消息后触发

OnMsgDropped will be called after the msg dropped

type OnSessionCreated

type OnSessionCreated func(client Client)

OnSessionCreated 新建session时触发

OnSessionCreated will be called when session created.

type OnSessionResumed

type OnSessionResumed func(client Client)

OnSessionResumed 恢复session时触发

OnSessionResumed will be called when session resumed.

type OnSessionTerminated

type OnSessionTerminated func(client Client, reason SessionTerminatedReason)

OnSessionTerminated session 终止时触发

OnSessionTerminated will be called when session terminated.

type OnStop

type OnStop func()

OnStop will be called on server.Stop()

type OnSubscribe

type OnSubscribe func(client Client, topic packets.Topic) (qos uint8)

OnSubscribe 返回topic允许订阅的最高QoS等级

OnSubscribe returns the maximum available QoS for the topic:

0x00 - Success - Maximum QoS 0
0x01 - Success - Maximum QoS 1
0x02 - Success - Maximum QoS 2
0x80 - Failure

type OnSubscribed

type OnSubscribed func(client Client, topic packets.Topic)

OnSubscribed will be called after the topic subscribe successfully

type OnUnsubscribed

type OnUnsubscribed func(client Client, topicName string)

OnUnsubscribed will be called after the topic has been unsubscribed

type Options

type Options func(srv *server)

func WithConfig

func WithConfig(config Config) Options

WithConfig set the config of the server

func WithHook

func WithHook(hooks Hooks) Options

WithHook set hooks of the server. Notice: WithPlugin() will overwrite hooks.

func WithLogger

func WithLogger(logger *zap.Logger) Options

func WithPlugin

func WithPlugin(plugin Plugin) Options

WithPlugin set plugin(s) of the server.

func WithTCPListener

func WithTCPListener(listener net.Listener) Options

WithTCPListener set tcp listener of the server. Default listen on :1883.

func WithWebsocketServer

func WithWebsocketServer(ws *WsServer) Options

WithWebsocketServer set websocket server(s) of the server.

type PacketBytes

type PacketBytes struct {
	Connect     uint64
	Connack     uint64
	Disconnect  uint64
	Pingreq     uint64
	Pingresp    uint64
	Puback      uint64
	Pubcomp     uint64
	Publish     uint64
	Pubrec      uint64
	Pubrel      uint64
	Suback      uint64
	Subscribe   uint64
	Unsuback    uint64
	Unsubscribe uint64
}

PacketBytes represents total bytes of each packet type have been received or sent.

type PacketCount

type PacketCount struct {
	Connect     uint64
	Connack     uint64
	Disconnect  uint64
	Pingreq     uint64
	Pingresp    uint64
	Puback      uint64
	Pubcomp     uint64
	Publish     uint64
	Pubrec      uint64
	Pubrel      uint64
	Suback      uint64
	Subscribe   uint64
	Unsuback    uint64
	Unsubscribe uint64
}

PacketCount represents total number of each packet type have been received or sent.

type PacketStats

type PacketStats struct {
	BytesReceived *PacketBytes
	ReceivedTotal *PacketCount
	BytesSent     *PacketBytes
	SentTotal     *PacketCount
}

PacketStats represents the statistics of MQTT Packet.

type Plugin

type Plugin interface {
	// Load will be called in server.Run(). If return error, the server will panic.
	Load(service Server) error
	// Unload will be called when the server is shutdown, the return error is only for logging
	Unload() error
	// HookWrapper returns all hook wrappers that used by the plugin.
	// Return a empty wrapper  if the plugin does not need any hooks
	Hooks() Hooks
	// Name return the plugin name
	Name() string
}

插件

type PublishService

type PublishService interface {
	// Publish publish a message to broker.
	// Calling this method will not trigger OnMsgArrived hook.
	Publish(message packets.Message)
	// PublishToClient publish a message to a specific client.
	// If match sets to true, the message will send to the client
	// only if the client is subscribed to a topic that matches the message.
	// If match sets to false, the message will send to the client directly even
	// there are no matched subscriptions.
	// Calling this method will not trigger OnMsgArrived hook.
	PublishToClient(clientID string, message packets.Message, match bool)
}

PublishService provides the ability to publish messages to the broker.

type Server

type Server interface {
	// SubscriptionStore returns the subscription.Store.
	SubscriptionStore() subscription.Store
	// RetainedStore returns the retained.Store.
	RetainedStore() retained.Store
	// PublishService returns the PublishService
	PublishService() PublishService
	// Client return the client specified by clientID.
	Client(clientID string) (Client, bool)
	// GetConfig returns the config of the server
	GetConfig() Config
	// GetStatsManager returns StatsManager
	GetStatsManager() StatsManager
	Init(opts ...Options)
	Run()
	Stop(ctx context.Context) error
}

Server interface represents a mqtt server instance.

func NewServer

func NewServer(opts ...Options) Server

NewServer returns a mqtt server instance with the given options

type ServerStats

type ServerStats struct {
	PacketStats       *PacketStats
	ClientStats       *ClientStats
	MessageStats      *MessageStats
	SubscriptionStats *subscription.Stats
}

ServerStats is the collection of global statistics.

type SessionStats

type SessionStats struct {
	// InflightCurrent, the current length of the inflight queue.
	InflightCurrent uint64
	// AwaitRelCurrent, the current length of the awaitRel queue.
	AwaitRelCurrent uint64

	MessageStats
}

SessionStats the collection of statistics of each session.

type SessionStatsManager

type SessionStatsManager interface {

	// GetStats return the session statistics
	GetStats() *SessionStats
	// contains filtered or unexported methods
}

SessionStatsManager interface provides the ability to access the statistics of the session

type SessionTerminatedReason

type SessionTerminatedReason byte
const (
	NormalTermination SessionTerminatedReason = iota
	ConflictTermination
	ExpiredTermination
)

type StatsManager

type StatsManager interface {

	// GetStats return the server statistics
	GetStats() *ServerStats
	// contains filtered or unexported methods
}

StatsManager interface provides the ability to access the statistics of the server

type WsServer

type WsServer struct {
	Server   *http.Server
	Path     string // Url path
	CertFile string //TLS configration
	KeyFile  string //TLS configration
}

WsServer is used to build websocket server