package module
Version: v0.0.0-...-5387cfa Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2018 License: BSD-3-Clause Imports: 13 Imported by: 106



MQTT Clients, Servers and Load Testers in Go

For docs, see:

For a little discussion of this code see:


At this time, the following limitations apply:

  • QoS level 0 only; messages are only stored in RAM
  • Retain works, but at QoS level 0. Retained messages are lost on server restart.
  • Last will messages are not implemented.
  • Keepalive and timeouts are not implemented.


The example MQTT servers are in directories mqttsrv and smqttsrv (secured with TLS).

Benchmarking Tools

To use the benchmarking tools, cd into pingtest, loadtest, or many and type "go build". The tools have reasonable defaults, but you'll also want to use the -help flag to find out what can be tuned.

All benchmarks suck, and these three suck in different ways.

pingtest simulates a number of pairs of clients who are bouncing messages between them as fast as possible. It aims to measure latency of messages through the system when under load.

loadtest simulates a number of pairs of clients where one is sending as fast as possible to the other. Realistically, this ends up testing the ability of the system to decode and queue messages, because any slight inbalance in scheduling of readers causes a pile up of messages from the writer slamming them down the throat of the server.

many simulates a large number of clients who send a low transaction rate. The goal is to eventually use this to achieve 1 million (and more?) concurrent, active MQTT sessions in one server. So far, mqttsrv has survived a load of 40k concurrent connections from many.

Travis Build

build status



Package mqtt implements MQTT clients and servers.



This section is empty.


View Source
var ConnectionErrors = [6]error{
	errors.New("Connection Refused: unacceptable protocol version"),
	errors.New("Connection Refused: identifier rejected"),
	errors.New("Connection Refused: server unavailable"),
	errors.New("Connection Refused: bad user name or password"),
	errors.New("Connection Refused: not authorized"),

ConnectionErrors is an array of errors corresponding to the Connect return codes specified in the specification.


This section is empty.


type ClientConn

type ClientConn struct {
	ClientId string              // May be set before the call to Connect.
	Dump     bool                // When true, dump the messages in and out.
	Incoming chan *proto.Publish // Incoming messages arrive on this channel.
	// contains filtered or unexported fields

A ClientConn holds all the state associated with a connection to an MQTT server. It should be allocated via NewClientConn. Concurrent access to a ClientConn is NOT safe.

func NewClientConn

func NewClientConn(c net.Conn) *ClientConn

NewClientConn allocates a new ClientConn.

func (*ClientConn) Connect

func (c *ClientConn) Connect(user, pass string) error

Connect sends the CONNECT message to the server. If the ClientId is not already set, use a default (a 63-bit decimal random number). The "clean session" bit is always set.

func (*ClientConn) Disconnect

func (c *ClientConn) Disconnect()

Disconnect sends a DISCONNECT message to the server. This function blocks until the disconnect message is actually sent, and the connection is closed.

func (*ClientConn) Publish

func (c *ClientConn) Publish(m *proto.Publish)

Publish publishes the given message to the MQTT server. The QosLevel of the message must be QosAtLeastOnce for now.

func (*ClientConn) Subscribe

func (c *ClientConn) Subscribe(tqs []proto.TopicQos) *proto.SubAck

Subscribe subscribes this connection to a list of topics. Messages will be delivered on the Incoming channel.

type Server

type Server struct {
	Done          chan struct{}
	StatsInterval time.Duration // Defaults to 10 seconds. Must be set using sync/atomic.StoreInt64().
	Dump          bool          // When true, dump the messages in and out.
	// contains filtered or unexported fields

A Server holds all the state associated with an MQTT server.

func NewServer

func NewServer(l net.Listener) *Server

NewServer creates a new MQTT server, which accepts connections from the given listener. When the server is stopped (for instance by another goroutine closing the net.Listener), channel Done will become readable.

func (*Server) Start

func (s *Server) Start()

Start makes the Server start accepting and handling connections.

Source Files


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL