gmqtt

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2020 License: MIT Imports: 23 Imported by: 11

README

中文文档

Gmqtt Mentioned in Awesome Go Build Status codecov Go Report Card

MQTT V5 is under development.(branch v5)

Gmqtt provides:

  • MQTT broker that fully implements the MQTT protocol V3.1.1.
  • Golang MQTT broker package for secondary development.
  • MQTT protocol pack/unpack package for implementing MQTT clients or testing.

Installation

$ go get -u github.com/DrmagicE/gmqtt

Features

  • Provide hook method to customized the broker behaviours(Authentication, ACL, etc..). See hooks.go for more details
  • Support tls/ssl and websocket
  • Enable user to write plugins. See plugin.go and /plugin for more details.
  • Provide abilities for extensions to interact with the server. See Server interface in server.go and example_test.go for more details.
  • Provide metrics (by using Prometheus). (plugin: prometheus)
  • Provide restful API to interact with server. (plugin:management)

Limitations

  • The retained messages are not persisted when the server exit.
  • Cluster is not supported.

Get Started

Build-in MQTT broker

$ cd cmd/broker
$ go run main.go

The broker will listen on port 1883 for TCP and 8080 for websocket. The broker loads the following plugins:

  • management: Listens on port 8081, provides restful api service
  • prometheus: Listens on port 8082, serve as a prometheus exporter with /metrics path.

Docker

$ docker build -t gmqtt .
$ docker run -p 1883:1883 -p  8081:8081 -p 8082:8082 gmqtt

Build with external source code

The features of build-in MQTT broker are not rich enough.It is not implementing some features such as Authentication, ACL etc.. But It is easy to write your own plugins to extend the broker.

func main() {
	// listener
	ln, err := net.Listen("tcp", ":1883")
	if err != nil {
		log.Fatalln(err.Error())
		return
	}
	// websocket server
	ws := &gmqtt.WsServer{
		Server: &http.Server{Addr: ":8080"},
		Path:   "/ws",
	}
	if err != nil {
		panic(err)
	}

	l, _ := zap.NewProduction()
	// l, _ := zap.NewDevelopment()
	s := gmqtt.NewServer(
		gmqtt.WithTCPListener(ln),
		gmqtt.WithWebsocketServer(ws),
		// Add your plugins
		gmqtt.WithPlugin(management.New(":8081", nil)),
		gmqtt.WithPlugin(prometheus.New(&http.Server{
			Addr: ":8082",
		}, "/metrics")),
		gmqtt.WithLogger(l),
	)

	s.Run()
	signalCh := make(chan os.Signal, 1)
	signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
	<-signalCh
	s.Stop(context.Background())
}

See /examples for more details.

Documentation

godoc

Hooks

Gmqtt implements the following hooks:

  • OnAccept (Only for tcp/ssl, not for ws/wss)
  • OnConnect
  • OnConnected
  • OnSessionCreated
  • OnSessionResumed
  • OnSessionTerminated
  • OnSubscribe
  • OnSubscribed
  • OnUnsubscribe
  • OnUnsubscribed
  • OnMsgArrived
  • OnAcked
  • OnMsgDropped
  • OnDeliver
  • OnClose
  • OnStop

See /examples/hook for more detail.

Stop the Server

Call server.Stop() to stop the broker gracefully:

  1. Close all open TCP listeners and shutting down all open websocket servers
  2. Close all idle connections
  3. Wait for all connections have been closed
  4. Trigger OnStop().

Test

Unit Test

$ go test -race . && go test -race pkg/packets
$ cd pkg/packets
$ go test -race .

Integration Test

Pass paho.mqtt.testing.

TODO

  • Support MQTT V3 and V5.
  • Support bridge mode and maybe cluster.

Breaking changes may occur when adding this new features.

Documentation

Overview

Package gmqtt provides an MQTT v3.1.1 server library.

Example

see /examples for more details.

ln, err := net.Listen("tcp", ":1883")
if err != nil {
	fmt.Println(err.Error())
	return
}

ws := &WsServer{
	Server: &http.Server{Addr: ":8080"},
	Path:   "/",
}
l, _ := zap.NewProduction()
srv := NewServer(
	WithTCPListener(ln),
	WithWebsocketServer(ws),

	// add config
	WithConfig(DefaultConfig),
	// add plugins
	// WithPlugin(prometheus.New(&http.Server{Addr: ":8082"}, "/metrics")),
	// add Hook
	WithHook(Hooks{
		OnConnect: func(ctx context.Context, client Client) (code uint8) {
			return packets.CodeAccepted
		},
		OnSubscribe: func(ctx context.Context, client Client, topic packets.Topic) (qos uint8) {
			fmt.Println("register onSubscribe callback")
			return packets.QOS_1
		},
	}),
	// add logger
	WithLogger(l),
)

srv.Run()
fmt.Println("started...")
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
<-signalCh
srv.Stop(context.Background())
fmt.Println("stopped")
Output:

Index

Examples

Constants

View Source
const (
	Connecting = iota
	Connected
	Switiching
	Disconnected
)

Client status

View Source
const (
	DefaultMsgRouterLen  = 4096
	DefaultRegisterLen   = 2048
	DefaultUnRegisterLen = 2048
)

Default configration

Variables

View Source
var (
	ErrInvalStatus    = errors.New("invalid connection status")
	ErrConnectTimeOut = errors.New("connect time out")
)

Error

View Source
var DefaultConfig = Config{
	RetryInterval:              20 * time.Second,
	RetryCheckInterval:         20 * time.Second,
	SessionExpiryInterval:      0 * time.Second,
	SessionExpiryCheckInterval: 0 * time.Second,
	QueueQos0Messages:          true,
	MaxInflight:                32,
	MaxAwaitRel:                100,
	MaxMsgQueue:                1000,
	DeliveryMode:               OnlyOnce,
	MsgRouterLen:               DefaultMsgRouterLen,
	RegisterLen:                DefaultRegisterLen,
	UnregisterLen:              DefaultUnRegisterLen,
}

DefaultConfig default config used by NewServer()

View Source
var (
	// ErrInvalWsMsgType [MQTT-6.0.0-1]
	ErrInvalWsMsgType = errors.New("invalid websocket message type")
)

Functions

func LoggerWithField

func LoggerWithField(fields ...zap.Field) *zap.Logger

LoggerWithField add fields to a new logger. Plugins can use this method to add plugin name field.

func NewMessage

func NewMessage(topic string, payload []byte, qos uint8, opts ...msgOptions) packets.Message

NewMessage creates a message for publish service.

func NewServer

func NewServer(opts ...Options) *server

NewServer returns a gmqtt server instance with the given options

func Retained

func Retained(retained bool) msgOptions

Retained sets retained flag to the message

Types

type Client

type Client interface {
	// OptionsReader returns ClientOptionsReader for reading options data.
	OptionsReader() ClientOptionsReader
	// IsConnected returns whether the client is connected.
	IsConnected() bool
	// ConnectedAt returns the connected time
	ConnectedAt() time.Time
	// DisconnectedAt return the disconnected time
	DisconnectedAt() time.Time
	// Connection returns the raw net.Conn
	Connection() net.Conn
	// Close closes the client connection. The returned channel will be closed after unregister process has been done
	Close() <-chan struct{}

	GetSessionStatsManager() SessionStatsManager
}

Client represent

type ClientOptionsReader

type ClientOptionsReader interface {
	ClientID() string
	Username() string
	Password() string
	KeepAlive() uint16
	CleanSession() bool
	WillFlag() bool
	WillRetain() bool
	WillQos() uint8
	WillTopic() string
	WillPayload() []byte
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
}

ClientOptionsReader is mainly used in callback functions.

type ClientStats

type ClientStats struct {
	ConnectedTotal    uint64
	DisconnectedTotal uint64
	// ActiveCurrent is the number of current active session.
	ActiveCurrent uint64
	// InactiveCurrent is the number of current inactive session.
	InactiveCurrent uint64
	// ExpiredTotal is the number of expired session.
	ExpiredTotal uint64
}

ClientStats provides the statistics of client connections.

type Config

type Config struct {
	RetryInterval              time.Duration
	RetryCheckInterval         time.Duration
	SessionExpiryInterval      time.Duration
	SessionExpiryCheckInterval time.Duration
	QueueQos0Messages          bool
	MaxInflight                int
	MaxAwaitRel                int
	MaxMsgQueue                int
	DeliveryMode               DeliveryMode
	MsgRouterLen               int
	RegisterLen                int
	UnregisterLen              int
}

type DeliveryMode

type DeliveryMode int
const (
	Overlap  DeliveryMode = 0
	OnlyOnce DeliveryMode = 1
)

type HookWrapper

type HookWrapper struct {
	OnConnectWrapper           OnConnectWrapper
	OnConnectedWrapper         OnConnectedWrapper
	OnSessionCreatedWrapper    OnSessionCreatedWrapper
	OnSessionResumedWrapper    OnSessionResumedWrapper
	OnSessionTerminatedWrapper OnSessionTerminatedWrapper
	OnSubscribeWrapper         OnSubscribeWrapper
	OnSubscribedWrapper        OnSubscribedWrapper
	OnUnsubscribeWrapper       OnUnsubscribeWrapper
	OnUnsubscribedWrapper      OnUnsubscribedWrapper
	OnMsgArrivedWrapper        OnMsgArrivedWrapper
	OnAckedWrapper             OnAckedWrapper
	OnMsgDroppedWrapper        OnMsgDroppedWrapper
	OnDeliverWrapper           OnDeliverWrapper
	OnCloseWrapper             OnCloseWrapper
	OnAcceptWrapper            OnAcceptWrapper
	OnStopWrapper              OnStopWrapper
}

HookWrapper groups all hook wrappers function

type MessageStats

type MessageStats struct {
	Qos0 struct {
		DroppedTotal  uint64
		ReceivedTotal uint64
		SentTotal     uint64
	}
	Qos1 struct {
		DroppedTotal  uint64
		ReceivedTotal uint64
		SentTotal     uint64
	}
	Qos2 struct {
		DroppedTotal  uint64
		ReceivedTotal uint64
		SentTotal     uint64
	}
	QueuedCurrent uint64
}

MessageStats represents the statistics of PUBLISH packet, separated by QOS.

type OnAccept

type OnAccept func(ctx context.Context, conn net.Conn) bool

OnAccept 会在新连接建立的时候调用,只在TCP server中有效。如果返回false,则会直接关闭连接

OnAccept will be called after a new connection established in TCP server. If returns false, the connection will be close directly.

type OnAcceptWrapper

type OnAcceptWrapper func(OnAccept) OnAccept

type OnAcked

type OnAcked func(ctx context.Context, client Client, msg packets.Message)

OnAcked 当客户端对qos1或qos2返回确认的时候调用

OnAcked will be called when receiving the ack packet for a published qos1 or qos2 message.

type OnAckedWrapper

type OnAckedWrapper func(OnAcked) OnAcked

type OnClose

type OnClose func(ctx context.Context, client Client, err error)

OnClose tcp连接关闭之后触发

OnClose will be called after the tcp connection of the client has been closed

type OnCloseWrapper

type OnCloseWrapper func(OnClose) OnClose

type OnConnect

type OnConnect func(ctx context.Context, client Client) (code uint8)

OnConnect 当合法的connect报文到达的时候触发,返回connack中响应码

OnConnect will be called when a valid connect packet is received. It returns the code of the connack packet

type OnConnectWrapper

type OnConnectWrapper func(OnConnect) OnConnect

type OnConnected

type OnConnected func(ctx context.Context, client Client)

OnConnected 当客户端成功连接后触发

OnConnected will be called when a mqtt client connect successfully.

type OnConnectedWrapper

type OnConnectedWrapper func(OnConnected) OnConnected

type OnDeliver

type OnDeliver func(ctx context.Context, client Client, msg packets.Message)

OnDeliver 分发消息时触发

OnDeliver will be called when publishing a message to a client.

type OnDeliverWrapper

type OnDeliverWrapper func(OnDeliver) OnDeliver

type OnMsgArrived

type OnMsgArrived func(ctx context.Context, client Client, msg packets.Message) (valid bool)

OnMsgArrived 返回接收到的publish报文是否允许转发,返回false则该报文不会被继续转发

OnMsgArrived returns whether the publish packet will be delivered or not. If returns false, the packet will not be delivered to any clients.

type OnMsgArrivedWrapper

type OnMsgArrivedWrapper func(OnMsgArrived) OnMsgArrived

type OnMsgDropped

type OnMsgDropped func(ctx context.Context, client Client, msg packets.Message)

OnMessageDropped 丢弃报文后触发

OnMsgDropped will be called after the msg dropped

type OnMsgDroppedWrapper

type OnMsgDroppedWrapper func(OnMsgDropped) OnMsgDropped

type OnSessionCreated

type OnSessionCreated func(ctx context.Context, client Client)

OnSessionCreated 新建session时触发

OnSessionCreated will be called when session created.

type OnSessionCreatedWrapper

type OnSessionCreatedWrapper func(OnSessionCreated) OnSessionCreated

type OnSessionResumed

type OnSessionResumed func(ctx context.Context, client Client)

OnSessionResumed 恢复session时触发

OnSessionResumed will be called when session resumed.

type OnSessionResumedWrapper

type OnSessionResumedWrapper func(OnSessionResumed) OnSessionResumed

type OnSessionTerminated

type OnSessionTerminated func(ctx context.Context, client Client, reason SessionTerminatedReason)

OnSessionTerminated session 下线时触发

OnSessionTerminated will be called when session terminated.

type OnSessionTerminatedWrapper

type OnSessionTerminatedWrapper func(OnSessionTerminated) OnSessionTerminated

type OnStop

type OnStop func(ctx context.Context)

OnStop will be called on server.Stop()

type OnStopWrapper

type OnStopWrapper func(OnStop) OnStop

type OnSubscribe

type OnSubscribe func(ctx context.Context, client Client, topic packets.Topic) (qos uint8)

OnSubscribe 返回topic允许订阅的最高QoS等级

OnSubscribe returns the maximum available QoS for the topic:

0x00 - Success - Maximum QoS 0
0x01 - Success - Maximum QoS 1
0x02 - Success - Maximum QoS 2
0x80 - Failure

type OnSubscribeWrapper

type OnSubscribeWrapper func(OnSubscribe) OnSubscribe

type OnSubscribed

type OnSubscribed func(ctx context.Context, client Client, topic packets.Topic)

OnSubscribed will be called after the topic subscribe successfully

type OnSubscribedWrapper

type OnSubscribedWrapper func(OnSubscribed) OnSubscribed

type OnUnsubscribe added in v0.1.3

type OnUnsubscribe func(ctx context.Context, client Client, topicName string)

OnUnsubscribe will be called when the topic is being unsubscribed

type OnUnsubscribeWrapper added in v0.1.3

type OnUnsubscribeWrapper func(OnUnsubscribe) OnUnsubscribe

type OnUnsubscribed

type OnUnsubscribed func(ctx context.Context, client Client, topicName string)

OnUnsubscribed will be called after the topic has been unsubscribed

type OnUnsubscribedWrapper

type OnUnsubscribedWrapper func(OnUnsubscribed) OnUnsubscribed

type Options

type Options func(srv *server)

func WithConfig

func WithConfig(config Config) Options

WithConfig set the config of the server

func WithHook

func WithHook(hooks Hooks) Options

WithHook set hooks of the server. Notice: WithPlugin() will overwrite hooks.

func WithLogger

func WithLogger(logger *zap.Logger) Options

func WithPlugin

func WithPlugin(plugin ...Plugable) Options

WithPlugin set plugin(s) of the server.

func WithTCPListener

func WithTCPListener(lns ...net.Listener) Options

WithTCPListener set tcp listener(s) of the server. Default listen on :1883.

func WithWebsocketServer

func WithWebsocketServer(ws ...*WsServer) Options

WithWebsocketServer set websocket server(s) of the server.

type PacketBytes

type PacketBytes struct {
	Connect     uint64
	Connack     uint64
	Disconnect  uint64
	Pingreq     uint64
	Pingresp    uint64
	Puback      uint64
	Pubcomp     uint64
	Publish     uint64
	Pubrec      uint64
	Pubrel      uint64
	Suback      uint64
	Subscribe   uint64
	Unsuback    uint64
	Unsubscribe uint64
}

PacketBytes represents total bytes of each packet type have been received or sent.

type PacketCount

type PacketCount struct {
	Connect     uint64
	Connack     uint64
	Disconnect  uint64
	Pingreq     uint64
	Pingresp    uint64
	Puback      uint64
	Pubcomp     uint64
	Publish     uint64
	Pubrec      uint64
	Pubrel      uint64
	Suback      uint64
	Subscribe   uint64
	Unsuback    uint64
	Unsubscribe uint64
}

PacketCount represents total number of each packet type have been received or sent.

type PacketStats

type PacketStats struct {
	BytesReceived *PacketBytes
	ReceivedTotal *PacketCount
	BytesSent     *PacketBytes
	SentTotal     *PacketCount
}

PacketStats represents the statistics of MQTT Packet.

type Plugable

type Plugable interface {
	// Load will be called in server.Run(). If return error, the server will panic.
	Load(service Server) error
	// Unload will be called when the server is shutdown, the return error is only for logging
	Unload() error
	// HookWrapper returns all hook wrappers that used by the plugin.
	// Return a empty wrapper  if the plugin does not need any hooks
	HookWrapper() HookWrapper
	// Name return the plugin name
	Name() string
}

Plugable is the interface need to be implemented for every plugins.

type PublishService

type PublishService interface {
	// Publish publish a message to broker.
	// Calling this method will not trigger OnMsgArrived hook.
	Publish(message packets.Message)
	// PublishToClient publish a message to a specific client.
	// If match sets to true, the message will send to the client
	// only if the client is subscribed to a topic that matches the message.
	// If match sets to false, the message will send to the client directly even
	// there are no matched subscriptions.
	// Calling this method will not trigger OnMsgArrived hook.
	PublishToClient(clientID string, message packets.Message, match bool)
}

PublishService provides the ability to publish messages to the broker.

type Server

type Server interface {
	// SubscriptionStore returns the subscription.Store.
	SubscriptionStore() subscription.Store
	// RetainedStore returns the retained.Store.
	RetainedStore() retained.Store
	// PublishService returns the PublishService
	PublishService() PublishService
	// Client return the client specified by clientID.
	Client(clientID string) Client
	// GetConfig returns the config of the server
	GetConfig() Config
	// GetStatsManager returns StatsManager
	GetStatsManager() StatsManager
}

Server interface represents a mqtt server instance.

type ServerStats

type ServerStats struct {
	PacketStats       *PacketStats
	ClientStats       *ClientStats
	MessageStats      *MessageStats
	SubscriptionStats *subscription.Stats
}

ServerStats is the collection of global statistics.

type SessionStats

type SessionStats struct {
	// InflightCurrent, the current length of the inflight queue.
	InflightCurrent uint64
	// AwaitRelCurrent, the current length of the awaitRel queue.
	AwaitRelCurrent uint64

	MessageStats
}

SessionStats the collection of statistics of each session.

type SessionStatsManager

type SessionStatsManager interface {

	// GetStats return the session statistics
	GetStats() *SessionStats
	// contains filtered or unexported methods
}

SessionStatsManager interface provides the ability to access the statistics of the session

type SessionTerminatedReason

type SessionTerminatedReason byte
const (
	NormalTermination SessionTerminatedReason = iota
	ConflictTermination
	ExpiredTermination
)

type StatsManager

type StatsManager interface {

	// GetStats return the server statistics
	GetStats() *ServerStats
	// contains filtered or unexported methods
}

StatsManager interface provides the ability to access the statistics of the server

type WsServer

type WsServer struct {
	Server   *http.Server
	Path     string // Url path
	CertFile string //TLS configration
	KeyFile  string //TLS configration
}

WsServer is used to build websocket server

Directories

Path Synopsis
cmd
examples
pkg
plugin

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL