package module
Version: v0.0.0-...-c3fc0b7 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2018 License: MIT Imports: 8 Imported by: 3


Golang MQTT-Server

This repo is a implementation of the MQTT Protocol for the Go Programming Language.


What is MQTT?

MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely
simple and lightweight messaging protocol, designed for constrained devices
and low-bandwidth, high-latency or unreliable networks. The design principles
are to minimise network bandwidth and device resource requirements whilst also
attempting to ensure reliability and some degree of assurance of delivery.
These principles also turn out to make the protocol ideal of the emerging
“machine-to-machine” (M2M) or “Internet of Things” world of connected devices,
and for mobile applications where bandwidth and battery power are at a premium.


Install Go

Go is an open source programming language that makes it easy to build simple,
reliable, and efficient software.

You can download Go at There are executables for Microsoft Windows, Apple macOS and Linux, as well as the Go source code. There are also Golang releases for Docker Logo Docker!

Run the MQTT-Server

First of all, grab this repository by running the following line. The go get command will clone this repo into the $GOPATH directory.

go get

Now build and install the server with:

go install

This will compile the server code and move the executable to $GOPATH/bin/.

To start the server:


The default MQTT (TCP) port is :1883. You can now connect with any MQTT client.


To run the benchmark tests, use:

npm run test -- -g bench -p 1883 -c 100 -l 10
Parameter Description
-g Tests to run. bench means 'benchmark tests'
-p Server Port. Default: 1883
-c Total number of packages to send. Default: 100
-l Number of packages to send concurrently. Default: 10

Benchmark results depend on your system and configration!

The following results have been recorded with a Windows 10 x64 machine.

Go MQTT-Server:

[Bench] Network: single publisher -> single subscriber
[Bench] Sending 10000 packages, 10 concurrent ..
[Bench] Time delta: 642.22588 ms
[Bench] Msg/Sec: 15570.845572277469 Msg/s

Eclipse Mosquitto MQTT Broker:

[Bench] Network: single publisher -> single subscriber
[Bench] Sending 10000 packages, 10 concurrent ..
[Bench] Time delta: 645.351073 ms
[Bench] Msg/Sec: 15495.441812025932 Msg/s




View Source
const (
	CLOSING    = 3
	CLOSED     = 4
View Source
const (
	ACCEPTED            = 0

CONNACK return codes

View Source
const (
	CONNECT     = 1
	CONNACK     = 2
	PUBLISH     = 3
	PUBACK      = 4
	PUBREC      = 5
	PUBREL      = 6
	PUBCOMP     = 7
	SUBACK      = 9
	UNSUBACK    = 11
	PINGREQ     = 12
	PINGRESP    = 13

message types

View Source
const (


View Source
var (
	InclompleteHeader       = errors.New("incomplete header")
	MaxMessageLength        = errors.New("message length exceeds server maximum")
	MessageLengthInvalid    = errors.New("message length exceeds maximum")
	IncompleteMessage       = errors.New("incomplete message")
	UnknownMessageType      = errors.New("unknown mqtt message type")
	ReservedMessageType     = errors.New("reserved message type")
	ConnectMsgLacksProtocol = errors.New("connect message has no protocol field")
	ConnectProtocolUnexp    = errors.New("connect message protocol is not 'MQIsdp'")
	TooLongClientID         = errors.New("connect client id is too long")
	UnknownMessageID        = errors.New("unknown message id")



func Head(b0 byte, length int, total int) ([]byte, []byte)

sowas wie Body() oder New() weil mal mit body und mal nur head benötigt wird..

func Join

func Join(conn net.Conn, server *Server)

func ListenAndServe

func ListenAndServe(addr string, handler Handler) error


type Context

type Context struct {

	//	server   *Server
	ClientID string

	Will *Message
	// contains filtered or unexported fields

func NewContext

func NewContext(w io.Writer, c io.Closer, server *Server) *Context

func (*Context) Alive

func (ctx *Context) Alive() bool

func (*Context) Close

func (ctx *Context) Close() error

func (*Context) ConnAck

func (ctx *Context) ConnAck(code byte)

func (*Context) Fail

func (ctx *Context) Fail(err error) error

func (*Context) Failf

func (ctx *Context) Failf(format string, a ...interface{}) error

func (*Context) Get

func (ctx *Context) Get(key string) interface{}

func (*Context) PingResp

func (ctx *Context) PingResp()

func (*Context) Publish

func (ctx *Context) Publish(sub *Subscription, msg *Message)

func (*Context) Read

func (ctx *Context) Read(reader io.Reader)

read from a reader (input stream) a new mqtt message

func (*Context) ReadConnectMessage

func (ctx *Context) ReadConnectMessage(reader io.Reader, fh *FixedHeader, buf []byte)

parse CONNECT messages

func (*Context) ReadPubcompMessage

func (ctx *Context) ReadPubcompMessage(reader io.Reader, fh *FixedHeader, buf []byte)

parse a PUBCOMP message (a response to a PUBREL from a client to this server)

func (*Context) ReadPublishMessage

func (ctx *Context) ReadPublishMessage(reader io.Reader, fh *FixedHeader, buf []byte)

parse a PUBLISH message and tell the server about it

func (*Context) ReadPubrecMessage

func (ctx *Context) ReadPubrecMessage(reader io.Reader, fh *FixedHeader, buf []byte)

parse a PUBREC message (a response to a publish from this server to a client on qos 2)

func (*Context) ReadPubrelMessage

func (ctx *Context) ReadPubrelMessage(reader io.Reader, fh *FixedHeader, buf []byte)

parse a PUBREL message (a response to a PUBREC at QoS 2) the message has alredy been stored at the previous PUBREC message

func (*Context) ReadSubscribeMessage

func (ctx *Context) ReadSubscribeMessage(reader io.Reader, fh *FixedHeader, buf []byte)

parse a SUBSCRIBE message and send SUBACK

func (*Context) Set

func (ctx *Context) Set(key string, value interface{})

func (*Context) Subscribe

func (ctx *Context) Subscribe(topic string, qos byte) byte

func (*Context) Unsubscribe

func (ctx *Context) Unsubscribe(topic string)

func (*Context) Write

func (ctx *Context) Write(data []byte) (n int, err error)

type FixedHeader

type FixedHeader struct {
	// contains filtered or unexported fields

func (*FixedHeader) Read

func (fh *FixedHeader) Read(reader io.Reader) error

type Handler

type Handler interface {
	Connect(ctx *Context, username, password string) error
	Disconnect(ctx *Context)
	Publish(ctx *Context, msg *Message) error
	Subscribe(ctx *Context, topic string, qos byte) error

type Message

type Message struct {
	Topic string
	Buf   []byte
	QoS   byte
	// contains filtered or unexported fields

type Publisher

type Publisher interface {
	Publish(msg *Message)

type Server

type Server struct {
	// contains filtered or unexported fields

func NewServer

func NewServer(closer io.Closer, handler Handler) *Server

func (*Server) Alive

func (svr *Server) Alive() bool

func (*Server) Close

func (svr *Server) Close()

func (*Server) Publish

func (svr *Server) Publish(ctx *Context, msg *Message)

func (*Server) Run

func (svr *Server) Run()

func (*Server) Serve

func (svr *Server) Serve(rwc io.ReadWriteCloser)

func (*Server) Subscribe

func (svr *Server) Subscribe(ctx *Context, topic string, qos byte) *Subscription

func (*Server) Unsubscribe

func (svr *Server) Unsubscribe(subs *Subscription)

type Subscription

type Subscription struct {
	// contains filtered or unexported fields

func NewSubscription

func NewSubscription(ctx *Context, qos byte) *Subscription

func (*Subscription) ChainLength

func (s *Subscription) ChainLength() int

func (*Subscription) Publish

func (s *Subscription) Publish(msg *Message)

func (*Subscription) Unsubscribe

func (sub *Subscription) Unsubscribe()

type SubscriptionChange

type SubscriptionChange struct {
	// contains filtered or unexported fields

type SubscriptionHandler

type SubscriptionHandler interface {
	Subscribe(ctx *Context, topic string, qos byte) *Subscription
	Unsubscribe(subs *Subscription)

type SubscriptionRequest

type SubscriptionRequest struct {
	// contains filtered or unexported fields

type Topic

type Topic struct {
	// contains filtered or unexported fields

func NewTopic

func NewTopic(parent *Topic, name string) *Topic

func (*Topic) Enqueue

func (topic *Topic) Enqueue(queue **Subscription, s *Subscription)

func (*Topic) Find

func (topic *Topic) Find(s []string) *Subscription

func (*Topic) PrintIndent

func (topic *Topic) PrintIndent(builder *strings.Builder, indent string)

func (*Topic) Publish

func (topic *Topic) Publish(s []string, msg *Message)

func (*Topic) Remove

func (topic *Topic) Remove()

func (*Topic) String

func (topic *Topic) String() string

func (*Topic) Subscribe

func (topic *Topic) Subscribe(t []string, sub *Subscription)


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL