mqtt

package module
v0.0.0-...-a2ebb16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2019 License: BSD-3-Clause Imports: 13 Imported by: 2

README

Native Go MQTT Library

Codeship Status for FluuxIO/mqtt GoDoc GoReportCard codecov

Fluux MQTT is a MQTT v3.1.1 client library written in Go.

The library has been tested with the following MQTT servers:

Features

  • MQTT v3.1.1, QOS 0
  • Client manager to support auto-reconnect with exponential backoff.
  • TLS Support

Short term tasks

Implement support for QOS 1 and 2 (with storage backend interface and default backends).

Running tests

You can launch unit tests with:

go test ./...

Testing with Fluux public MQTT server

We encourage you to experiment and test on a public Fluux test server. It is available on mqtt.fluux.io (on ports 1883 for cleartext and 8883 for TLS).

Here is example code for a simple client:

package main

import (
	"log"
	"time"

	"gosrc.io/mqtt"
)

func main() {
	client := mqtt.NewClient("tls://mqtt.fluux.io:8883")
	client.ClientID = "MQTT-Sub"
	log.Printf("Connecting on: %s\n", client.Address)

	messages := make(chan mqtt.Message)
	client.Messages = messages

	postConnect := func(c *mqtt.Client) {
		log.Println("Connected")
		name := "/mremond/test-topic-1"
		topic := mqtt.Topic{Name: name, QOS: 0}
		c.Subscribe(topic)
	}

	cm := mqtt.NewClientManager(client, postConnect)
	cm.Start()

	for m := range messages {
		log.Printf("Received message from MQTT server on topic %s: %+v\n", m.Topic, m.Payload)
	}
}

Setting Mosquitto on OSX for testing

If you want to test Go MQTT library locally, you can install Mosquitto.

Mosquitto can be installed from homebrew:

brew install mosquitto
...
mosquitto has been installed with a default configuration file.
You can make changes to the configuration by editing:
    /usr/local/etc/mosquitto/mosquitto.conf

To have launchd start mosquitto at login:
  ln -sfv /usr/local/opt/mosquitto/*.plist ~/Library/LaunchAgents
Then to load mosquitto now:
  launchctl load ~/Library/LaunchAgents/homebrew.mxcl.mosquitto.plist
Or, if you don't want/need launchctl, you can just run:
  mosquitto -c /usr/local/etc/mosquitto/mosquitto.conf

Default config file can be customized in /usr/local/etc/mosquitto/mosquitto.conf. However, default config file should be ok for testing

You can launch Mosquitto broker with command:

/usr/local/sbin/mosquitto -c /usr/local/etc/mosquitto/mosquitto.conf

The following command can be use to subscribe a client:

mosquitto_sub -v -t 'test/topic'

You can publish a payload payload on a topic with:

mosquitto_pub -t "test/topic" -m "message payload" -q 1

Setting Mosquitto for testing on Windows 10

After you have install official Mosquitto build from main site, you can run the broker with command:

.\mosquitto.exe -v -c .\mosquitto.conf

You can subscribe with:

.\mosquitto_sub.exe -h 127.0.0.1 -v -t 'test/topic'

You can test publish with:

.\mosquitto_pub.exe -h 127.0.0.1 -t "test/topic" -m "message payload" -q 1

Documentation

Overview

Package mqtt implements MQTT client protocol. It can be used as a client library to write MQTT clients in Go.

You can use the MQTT client directly at the low-level and handle connection events in your own code (or ignore them). If you want to have sane default behaviour for handling reconnect, you can directly rely on the connection manager struct.

The messages are received on a message channel. The channel can be buffered. The main goal of the channel is to handle back pressure and make sure the client will not read message faster than it is able to process.

Index

Constants

View Source
const (
	ConnAccepted                     = 0x00
	ConnRefusedBadProtocolVersion    = 0x01
	ConnRefusedIDRejected            = 0x02
	ConnRefusedServerUnavailable     = 0x03
	ConnRefusedBadUsernameOrPassword = 0x04
	ConnRefusedNotAuthorized         = 0x05
)

MQTT error codes returned on CONNECT.

View Source
const (
	ProtocolName    = "MQTT"
	ProtocolLevel   = 4 // This is MQTT v3.1.1
	DefaultClientID = "Fluux-MQTT"
)

Default protocol values

View Source
const (
	// DefaultMQTTServer is a shortcut to define connection to local
	// server
	DefaultMQTTServer = "tcp://localhost:1883"
)

Variables

View Source
var (
	ErrMalformedLength                  = errors.New("malformed mqtt packet remaining length")
	ErrConnRefusedBadProtocolVersion    = errors.New("connection refused, unacceptable protocol version")
	ErrConnRefusedIDRejected            = errors.New("connection refused, identifier rejected")
	ErrConnRefusedServerUnavailable     = errors.New("connection refused, server unavailable")
	ErrConnRefusedBadUsernameOrPassword = errors.New("connection refused, bad user name or password")
	ErrConnRefusedNotAuthorized         = errors.New("connection refused, not authorized")
	ErrConnUnknown                      = errors.New("connection refused, unknown error")
)

Errors MQTT client can return.

View Source
var (
	// ErrIncorrectConnectResponse is triggered on CONNECT when server
	// does not reply with CONNACK packet.
	ErrIncorrectConnectResponse = errors.New("incorrect mqtt connect response")
)

Functions

func ConnAckError

func ConnAckError(returnCode int) error

ConnAckError translates an MQTT ConnAck error into a Go error.

Types

type Backoff

type Backoff struct {
	// contains filtered or unexported fields
}

Backoff can provide increasing duration with the number of attempt performed. The structure is used to support exponential backoff on connection attempts to avoid hammering the server we are connecting to.

func (*Backoff) Duration

func (b *Backoff) Duration() time.Duration

Duration returns the duration to apply to the current attempt.

func (*Backoff) DurationForAttempt

func (b *Backoff) DurationForAttempt(attempt int) time.Duration

DurationForAttempt returns a duration for an attempt number, in a stateless way.

func (*Backoff) Reset

func (b *Backoff) Reset()

Reset sets back the number of attempts to 0. This is to be called after a successfull operation has been performed, to reset the exponential backoff interval.

func (*Backoff) Wait

func (b *Backoff) Wait()

Wait sleeps for backoff duration for current attempt.

type Client

type Client struct {
	Config

	Handler  EventHandler
	Messages chan<- Message
	// contains filtered or unexported fields
}

Client is the main structure use to connect as a client on an MQTT server.

func NewClient

func NewClient(address string) *Client

New generates a new MQTT client with default parameters. Address must be set as we cannot find relevant default value for server. address is of the form tcp://hostname:port for cleartext connection or tls://hostname:port for TLS connection. TODO: Should messages channel be set on New ?

func (*Client) Connect

func (c *Client) Connect(defaultMsgChannel chan<- Message) error

Connect initiates synchronous connection to MQTT server and performs MQTT connect handshake.

We must have a default channel for the client to work: If the connection is persistent, it is possible that we receive messages coming from previous connection even if we do not subscribe to anything in that session of the client. Having a default channel makes sure we always have a way to receive all messages.

The channel will be closed when the session is closed and no further automatic reconnection will be attempted. You can use that close signal to reconnect the client if you wish to, immediately or after a delay.

The channel is expected to be passed by the caller because it allows the caller to pass a channel with a buffer size suiting its own use case and expected throughput.

func (*Client) Disconnect

func (c *Client) Disconnect()

Disconnect sends DISCONNECT MQTT packet to other party and clean up the client state.

func (*Client) Publish

func (c *Client) Publish(topic string, payload []byte)

Publish sends PUBLISH MQTT control packet.

func (*Client) String

func (c *Client) String() string

Format printable version of client state

func (*Client) Subscribe

func (c *Client) Subscribe(topic Topic)

Subscribe sends SUBSCRIBE MQTT control packet. At the moment subscription state is not kept in client state and are lost on reconnection.

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(topic string)

Unsubscribe sends UNSUBSCRIBE MQTT control packet.

type ClientManager

type ClientManager struct {
	Client      *Client
	PostConnect postConnect
}

ClientManager supervises an MQTT client connection. Its role is to handle connection events and apply reconnection strategy.

func NewClientManager

func NewClientManager(client *Client, pc postConnect) *ClientManager

NewClientManager creates a new client manager structure, intended to support handling MQTT client state event changes and autotrigger connection reconnection based on ClientManager configuration.

func (*ClientManager) Start

func (cm *ClientManager) Start()

Start launch the connection loop

func (*ClientManager) Stop

func (cm *ClientManager) Stop()

Stop cancels pending operations and terminates existing MQTT client.

type Config

type Config struct {
	Address string

	// *************************************************************************
	// ** Not Required, optional                                              **
	// *************************************************************************
	OptConnect
	OptTCP
}

Config provides a data structure of required configuration parameters for MQTT connection

type ConnAckPacket

type ConnAckPacket struct {
	SessionPresent bool
	ReturnCode     int
}

ConnAckPacket is the control packet sent as a reply to CONNECT packet. It contains the result of the CONNECT operation.

func (ConnAckPacket) Marshall

func (connack ConnAckPacket) Marshall() []byte

Marshall serializes a CONNACK struct as an MQTT control packet.

func (ConnAckPacket) PayloadSize

func (connack ConnAckPacket) PayloadSize() int

type ConnState

type ConnState int

ConnState represents the current connection state.

const (
	StateDisconnected ConnState = iota
)

This is a the list of events happening on the connection that the client can be notified about.

type ConnectPacket

type ConnectPacket struct {
	ProtocolName  string
	ProtocolLevel int
	Keepalive     int
	ClientID      string
	CleanSession  bool

	// TODO: Should 'Will' be a sub-struct ?
	WillFlag    bool
	WillTopic   string
	WillMessage string
	WillQOS     int
	WillRetain  bool
	Username    string
	Password    string
}

ConnectPacket is the control packet sent from client to log into an MQTT server.

func (ConnectPacket) Marshall

func (connect ConnectPacket) Marshall() []byte

Marshall serializes a CONNECT struct as an MQTT control packet.

func (ConnectPacket) PayloadSize

func (connect ConnectPacket) PayloadSize() int

PayloadSize calculates variable length part of CONNECT MQTT packets.

func (*ConnectPacket) SetWill

func (connect *ConnectPacket) SetWill(topic string, message string, qos int)

SetWill defines all the will values connect control packet at once, for consistency.

type DisconnectPacket

type DisconnectPacket struct{}

DisconnectPacket is the control packet sent from client to notify disconnection from server.

func (DisconnectPacket) Marshall

func (DisconnectPacket) Marshall() []byte

Marshall serializes a DISCONNECT struct as an MQTT control packet.

type Event

type Event struct {
	State       ConnState
	Description string
}

Event is a structure use to convey event changes related to client state. This is for example used to notify the client when the client get disconnected.

type EventHandler

type EventHandler func(Event)

EventHandler is use to pass events about state of the connection to client implementation.

type Marshaller

type Marshaller interface {
	Marshall() []byte
}

Marshaller interface is shared by all MQTT control packets

func Decode

func Decode(packetType int, fixedHeaderFlags int, payload []byte) Marshaller

Decode returns parsed struct from byte array. It assumes payload does not contain MQTT control packet fixed header, as parsing fixed header is needed to extract the packet type code we have to decode.

func PacketRead

func PacketRead(r io.Reader) (Marshaller, error)

PacketRead returns unmarshalled packet from io.Reader stream

type Message

type Message struct {
	Topic   string
	Payload []byte
}

Message encapsulates Publish MQTT payload from the MQTT client perspective. Message is used to abstract the detail of the MQTT protocol to the developer.

type OptConnect

type OptConnect struct {
	ProtocolLevel int
	ClientID      string
	Keepalive     int // TODO Keepalive should also probably be a time.Duration for more flexibility
	CleanSession  bool
	Username      string
	Password      string
}

OptConnect defines optional MQTT connection parameters. MQTT client libraries will default to sensible values. TODO Should this be called OptMQTT?

type OptTCP

type OptTCP struct {
	ConnectTimeout time.Duration
}

OptTCP defines TCP/IP related parameters. They are used to configure low level TCP client connection. Default should be fine for standard cases.

type PingReqPacket

type PingReqPacket struct{}

PingReqPacket is the control packet sent from client for connection //// keepalive. Client expects to receive a PingRespPacket

func (PingReqPacket) Marshall

func (pingreq PingReqPacket) Marshall() []byte

Marshall serializes a PINGREQ struct as an MQTT control packet.

type PingRespPacket

type PingRespPacket struct {
}

PingRespPacket is the control packet sent by server as response to client PINGREQ.

func (PingRespPacket) Marshall

func (pdu PingRespPacket) Marshall() []byte

Marshall serializes a PINGRESP struct as an MQTT control packet.

type PubAckPacket

type PubAckPacket struct {
	ID int
}

PubAckPacket is the control packet sent by client or server as response to client PUBLISH, when QOS for publish is greater than 1.

func (PubAckPacket) Marshall

func (puback PubAckPacket) Marshall() []byte

Marshall serializes a PUBACK struct as an MQTT control packet.

func (PubAckPacket) PayloadSize

func (puback PubAckPacket) PayloadSize() int

type PublishPacket

type PublishPacket struct {
	ID      int
	Dup     bool
	Qos     int
	Retain  bool
	Topic   string
	Payload []byte
}

PublishPacket is the control packet sent by client or server to initiate or deliver payload broadcast.

func (PublishPacket) Marshall

func (publish PublishPacket) Marshall() []byte

Marshall serializes a PUBLISH struct as an MQTT control packet.

func (PublishPacket) PayloadSize

func (publish PublishPacket) PayloadSize() int

TODO Find a better name From spec, Size is not size of the payload but of the variable header

type QOSOutPacket

type QOSOutPacket interface {
	PacketID() int
}

type QOSResponse

type QOSResponse interface {
	ResponseID() int
}

type SubAckPacket

type SubAckPacket struct {
	ID          int
	ReturnCodes []int
}

SubAckPacket is the control packet sent by server to acknowledge client SUBSCRIBE.

func (SubAckPacket) Marshall

func (suback SubAckPacket) Marshall() []byte

Marshall serializes a SUBACK struct as an MQTT control packet.

func (SubAckPacket) PayloadSize

func (suback SubAckPacket) PayloadSize() int

func (SubAckPacket) ResponseID

func (suback SubAckPacket) ResponseID() int

type SubscribePacket

type SubscribePacket struct {
	ID     int
	Topics []Topic
}

SubscribePacket is the control packet sent by client to subscribe to one or more topics.

func (SubscribePacket) Marshall

func (subscribe SubscribePacket) Marshall() []byte

Marshall serializes a SUBSCRIBE struct as an MQTT control packet.

func (SubscribePacket) PacketID

func (subscribe SubscribePacket) PacketID() int

func (SubscribePacket) PayloadSize

func (subscribe SubscribePacket) PayloadSize() int

type Subscriptions

type Subscriptions map[string]int

Keep trakc on acknowledged subscriptions

type Topic

type Topic struct {
	Name string
	QOS  int
}

Topic is a channel used for publish and subscribe in MQTT protocol.

type UnsubAckPacket

type UnsubAckPacket struct {
	ID int
}

UnsubAckPacket is the control packet sent by server to acknowledge client UNSUBSCRIBE.

func (UnsubAckPacket) Marshall

func (unsub UnsubAckPacket) Marshall() []byte

Marshall serializes a UNSUBACK struct as an MQTT control packet.

func (UnsubAckPacket) PayloadSize

func (unsub UnsubAckPacket) PayloadSize() int

func (UnsubAckPacket) ResponseID

func (unsub UnsubAckPacket) ResponseID() int

type UnsubscribePacket

type UnsubscribePacket struct {
	ID     int
	Topics []string
}

UnsubscribePacket is the control packet sent by client to unsubscribe from one or more topics.

func (UnsubscribePacket) Marshall

func (unsubscribe UnsubscribePacket) Marshall() []byte

Marshall serializes a UNSUBSCRIBE struct as an MQTT control packet.

func (UnsubscribePacket) PacketID

func (unsubscribe UnsubscribePacket) PacketID() int

func (UnsubscribePacket) PayloadSize

func (unsubscribe UnsubscribePacket) PayloadSize() int

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL