libmqtt

package module
v0.0.0-...-6e21248 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2018 License: Apache-2.0 Imports: 21 Imported by: 0

README

libmqtt

Build Status GoDoc GoReportCard codecov

Feature rich modern MQTT library in pure Go, for Go, C/C++, Java

Table of contents

Features

  1. MQTT v3.1.1/v5.0 client support (async only)
  2. High performance and less memory footprint (see Benchmark)
  3. Customizable topic routing (see Topic Routing)
  4. Multiple Builtin session persist methods (see Session Persist)
  5. C/C++ lib, Java lib, Command line client support
  6. Idiomatic Go

Usage

This package can be used as

As a Go lib
Prerequisite
  • Go 1.9+ (with GOPATH configured)
Steps
  1. Go get this project
go get github.com/goiiot/libmqtt
  1. Import this package in your project file
import "github.com/goiiot/libmqtt"
  1. Create a custom client
client, err := libmqtt.NewClient(
    // server address(es)
    libmqtt.WithServer("localhost:1883"),
)
if err != nil {
    // handle client creation error
}

Notice: If you would like to explore all the options available, please refer to GoDoc#Option

  1. Register the handlers and Connect, then you are ready to pub/sub with server

We recommend you to register handlers for pub, sub, unsub, net error and persist error, for they can provide you more controllability of the lifecycle of the client

// register handler for pub success/fail (optional, but recommended)
client.HandlePub(PubHandler)

// register handler for sub success/fail (optional, but recommended)
client.HandleSub(SubHandler)

// register handler for unsub success/fail (optional, but recommended)
client.HandleUnSub(UnSubHandler)

// register handler for net error (optional, but recommended)
client.HandleNet(NetHandler)

// register handler for persist error (optional, but recommended)
client.HandlePersist(PersistHandler)

// define your topic handlers like a golang http server
client.Handle("foo", func(topic string, qos libmqtt.QosLevel, msg []byte) {
    // handle the topic message
})

client.Handle("bar", func(topic string, qos libmqtt.QosLevel, msg []byte) {
    // handle the topic message
})
// connect to server
client.Connect(func(server string, code libmqtt.ConnAckCode, err error) {
    if err != nil {
        // failed
        panic(err)
    }

    if code != libmqtt.ConnAccepted {
        // server rejected or in error
        panic(code)
    }

    // success
    // you are now connected to the `server`
    // (the `server` is one of your provided `servers` when create the client)
    // start your business logic here or send a signal to your logic to start

    // subscribe some topic(s)
    client.Subscribe(
        &libmqtt.Topic{Name: "foo"},
        &libmqtt.Topic{Name: "bar", Qos: libmqtt.Qos1},
        // ...
    )

    // publish some topic message(s)
    client.Publish(
        &libmqtt.PublishPacket{
            TopicName: "foo",
            Qos:       libmqtt.Qos0,
            Payload:   []byte("foo data"),
        },
        &libmqtt.PublishPacket{
            TopicName: "bar",
            Qos:       libmqtt.Qos1,
            Payload:   []byte("bar data"),
        },
        // ...
    )
})
  1. Unsubscribe topic(s)
client.UnSubscribe("foo", "bar")
  1. Destroy the client when you would like to
// use true for a immediate disconnect to server
// use false to send a DisConn packet to server before disconnect
client.Destroy(true)
As a C/C++ lib

Please refer to c - README.md

As a Java lib

Please refer to java - README.md

As a command line client

Please refer to cmd/libmqtt - README.md

As MQTT infrastructure

This package can also be used as MQTT packet encoder and decoder

// decode one mqtt 3.1.1 packet from reader
packet, err := libmqtt.Decode(libmqtt.V311, reader)
// ...

// encode one mqtt packet to buffered writer
err := libmqtt.Encode(packet, bufferWriter)
// ...

Topic Routing

Routing topics is one of the most important thing when it comes to business logic, we currently have built two TopicRouters which is ready to use, they are TextRouter and RegexRouter

  • TextRouter will match the exact same topic which was registered to client by Handle method. (this is the default router in a client)
  • RegexRouter will go through all the registered topic handlers, and use regular expression to test whether that is matched and should dispatch to the handler

If you would like to apply other routing strategy to the client, you can provide this option when creating the client

client, err := libmqtt.NewClient(
    // ...
    // e.g. use `RegexRouter`
    libmqtt.WithRouter(libmqtt.NewRegexRouter()),
    // ...
)

Session Persist

Per MQTT Specification, session state should be persisted and be recovered when next time connected to server without clean session flag set, currently we provide persist method as following:

  1. NonePersist - no session persist
  2. memPersist - in memory session persist
  3. filePersist - files session persist (with write barrier)
  4. redisPersist - redis session persist (available inside github.com/goiiot/libmqtt/extension package)

Note: Use RedisPersist if possible.

Benchmark

The procedure of the benchmark is as following:

  1. Create the client
  2. Connect to server
  3. Publish N times to topic foo
  4. Unsubscribe topic (no subscribe, just ensure all pub message has been sent)
  5. Destroy client (without disconnect packet)

The benchmark result listed below was taken on a MacBook Pro 13' (Early 2015, macOS 10.13.2), statistics inside which is the value of ten times average

Bench Name Pub Count ns/op B/op allocs/op
BenchmarkLibmqttClient-4 10000 12011 405 5
BenchmarkPahoClient-4 10000 32604 1232 16

You can make the benchmark using source code from benchmark

Extensions

Helpful extensions for libmqtt (see extension)

LICENSE

Copyright Go-IIoT (https://github.com/goiiot)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Documentation

Index

Constants

View Source
const (
	SubOkMaxQos0 = 0    // SubOkMaxQos0 QoS 0 is used by server
	SubOkMaxQos1 = 1    // SubOkMaxQos1 QoS 1 is used by server
	SubOkMaxQos2 = 2    // SubOkMaxQos2 QoS 2 is used by server
	SubFail      = 0x80 // SubFail means that subscription is not successful
)
View Source
const (
	CodeSuccess                             = 0   // Packet: ConnAck, PubAck, PubRecv, PubRel, PubComp, UnSubAck, Auth
	CodeNormalDisconn                       = 0   // Packet: DisConn
	CodeGrantedQos0                         = 0   // Packet: SubAck
	CodeGrantedQos1                         = 1   // Packet: SubAck
	CodeGrantedQos2                         = 2   // Packet: SubAck
	CodeDisconnWithWill                     = 4   // Packet: DisConn
	CodeNoMatchingSubscribers               = 16  // Packet: PubAck, PubRecv
	CodeNoSubscriptionExisted               = 17  // Packet: UnSubAck
	CodeContinueAuth                        = 24  // Packet: Auth
	CodeReAuth                              = 25  // Packet: Auth
	CodeUnspecifiedError                    = 128 // Packet: ConnAck, PubAck, PubRecv, SubAck, UnSubAck, DisConn
	CodeMalformedPacket                     = 129 // Packet: ConnAck, DisConn
	CodeProtoError                          = 130 // Packet: ConnAck, DisConn
	CodeImplementationSpecificError         = 131 // Packet: ConnAck, PubAck, PubRecv, SubAck, UnSubAck, DisConn
	CodeUnsupportedProtoVersion             = 132 // Packet: ConnAck
	CodeClientIdNotValid                    = 133 // Packet: ConnAck
	CodeBadUserPass                         = 134 // Packet: ConnAck
	CodeNotAuthorized                       = 135 // Packet: ConnAck, PubAck, PubRecv, SubAck, UnSubAck, DisConn
	CodeServerUnavail                       = 136 // Packet: ConnAck
	CodeServerBusy                          = 137 // Packet: ConnAck, DisConn
	CodeBanned                              = 138 // Packet: ConnAck
	CodeServerShuttingDown                  = 139 // Packet: DisConn
	CodeBadAuthenticationMethod             = 140 // Packet: ConnAck, DisConn
	CodeKeepaliveTimeout                    = 141 // Packet: DisConn
	CodeSessionTakenOver                    = 142 // Packet: DisConn
	CodeTopicFilterInvalid                  = 143 // Packet: SubAck, UnSubAck, DisConn
	CodeTopicNameInvalid                    = 144 // Packet: ConnAck, PubAck, PubRecv, DisConn
	CodePacketIdentifierInUse               = 145 // Packet: PubAck, PubRecv, PubAck, UnSubAck
	CodePacketIdentifierNotFound            = 146 // Packet: PubRel, PubComp
	CodeReceiveMaxExceeded                  = 147 // Packet: DisConn
	CodeTopicAliasInvalid                   = 148 // Packet: DisConn
	CodePacketTooLarge                      = 149 // Packet: ConnAck, DisConn
	CodeMessageRateTooHigh                  = 150 // Packet: DisConn
	CodeQuotaExceeded                       = 151 // Packet: ConnAck, PubAck, PubRec, SubAck, DisConn
	CodeAdministrativeAction                = 152 // Packet: DisConn
	CodePayloadFormatInvalid                = 153 // Packet: ConnAck, PubAck, PubRecv, DisConn
	CodeRetainNotSupported                  = 154 // Packet: ConnAck, DisConn
	CodeQosNoSupported                      = 155 // Packet: ConnAck, DisConn
	CodeUseAnotherServer                    = 156 // Packet: ConnAck, DisConn
	CodeServerMoved                         = 157 // Packet: ConnAck, DisConn
	CodeSharedSubscriptionNotSupported      = 158 // Packet: SubAck, DisConn
	CodeConnectionRateExceeded              = 159 // Packet: ConnAck, DisConn
	CodeMaxConnectTime                      = 160 // Packet: DisConn
	CodeSubscriptionIdentifiersNotSupported = 161 // Packet: SubAck, DisConn
	CodeWildcardSubscriptionNotSupported    = 162 // Packet: SubAck, DisConn
)

Variables

View Source
var (
	// ErrDecodeBadPacket is the error happened when trying to decode a none MQTT packet
	ErrDecodeBadPacket = errors.New("try decoding none MQTT packet ")

	// ErrDecodeNoneV311Packet is the error happened when
	// trying to decode mqtt 3.1.1 packet but got other mqtt packet ProtoVersion
	ErrDecodeNoneV311Packet = errors.New("try decoding none MQTT v3.1.1 packet ")

	// ErrDecodeNoneV5Packet is the error happened when
	// trying to decode mqtt 5 packet but got other mqtt packet ProtoVersion
	ErrDecodeNoneV5Packet = errors.New("try decoding none MQTT v5 packet ")
)
View Source
var (
	// ErrUnsupportedVersion unsupported mqtt ProtoVersion
	ErrUnsupportedVersion = errors.New("trying encode/decode packet with unsupported MQTT version ")

	// ErrEncodeBadPacket happens when trying to encode none MQTT packet
	ErrEncodeBadPacket = errors.New("trying encode none MQTT packet ")

	// ErrEncodeLargePacket happens when MQTT packet is too large according to MQTT spec
	ErrEncodeLargePacket = errors.New("MQTT packet too large")
)
View Source
var (
	// PingReqPacket is the final instance of pingReqPacket
	PingReqPacket = &pingReqPacket{}
	// PingRespPacket is the final instance of pingRespPacket
	PingRespPacket = &pingRespPacket{}
)
View Source
var (
	// ErrPacketDroppedByStrategy used when persist store packet while strategy
	// don't allow that persist
	ErrPacketDroppedByStrategy = errors.New("packet persist dropped by strategy ")
)
View Source
var (
	// ErrTimeOut connection timeout error
	ErrTimeOut = errors.New("connection timeout ")
)
View Source
var NonePersist = &nonePersist{}

NonePersist defines no persist storage

Functions

func Encode

func Encode(packet Packet, w BufferedWriter) error

Encode MQTT packet to bytes according to protocol ProtoVersion

Types

type AuthPacket

type AuthPacket struct {
	BasePacket
	Code  byte       // the authentication result code
	Props *AuthProps // authentication properties
}

AuthPacket Client <-> Server as part of an extended authentication exchange, such as challenge / response authentication.

It is a Protocol Error for the Client or Server to send an AUTH packet if the ConnPacket did not contain the same Authentication Method

func (*AuthPacket) Bytes

func (a *AuthPacket) Bytes() []byte

func (*AuthPacket) Type

func (a *AuthPacket) Type() CtrlType

Type of AuthPacket is CtrlAuth

type AuthProps

type AuthProps struct {
	AuthMethod string
	AuthData   []byte
	Reason     string
	UserProps  UserProps
}

AuthProps properties of AuthPacket

type BasePacket

type BasePacket struct {
	ProtoVersion ProtoVersion
}

BasePacket for packet encoding and MQTT version note

func (*BasePacket) Version

func (b *BasePacket) Version() ProtoVersion

Version is the MQTT version of this packet

type BufferedReader

type BufferedReader interface {
	io.Reader
	io.ByteReader
}

BufferedReader buffered reader, e.g. bufio.Reader, bytes.Buffer

type BufferedWriter

type BufferedWriter interface {
	io.Writer
	io.ByteWriter
}

BufferedWriter buffered writer, e.g. bufio.Writer, bytes.Buffer

type Client

type Client interface {
	// Handle register topic handlers, mostly used for RegexHandler, RestHandler
	// the default handler inside the client is TextHandler, which match the exactly same topic
	Handle(topic string, h TopicHandler)

	// Connect to all specified server with client options
	Connect(ConnHandler)

	// Publish a message for the topic
	Publish(packets ...*PublishPacket)

	// Subscribe topic(s)
	Subscribe(topics ...*Topic)

	// UnSubscribe topic(s)
	UnSubscribe(topics ...string)

	// Wait will wait until all connection finished
	Wait()

	// Destroy all client connection
	Destroy(force bool)

	// handlers
	HandlePub(PubHandler)
	HandleSub(SubHandler)
	HandleUnSub(UnSubHandler)
	HandleNet(NetHandler)
	HandlePersist(PersistHandler)
}

Client act as a mqtt client

func NewClient

func NewClient(options ...Option) (Client, error)

NewClient create a new mqtt client

type ConnAckPacket

type ConnAckPacket struct {
	BasePacket
	Present bool
	Code    byte
	Props   *ConnAckProps
}

ConnAckPacket is the packet sent by the Server in response to a ConnPacket received from a Client.

The first packet sent from the Server to the Client MUST be a ConnAckPacket

func (*ConnAckPacket) Bytes

func (c *ConnAckPacket) Bytes() []byte

func (*ConnAckPacket) Type

func (c *ConnAckPacket) Type() CtrlType

Type ConnAckPacket's type is CtrlConnAck

type ConnAckProps

type ConnAckProps struct {
	// If the Session Expiry Interval is absent the value in the ConnPacket used.
	// The server uses this property to inform the Client that it is using
	// a value other than that sent by the Client in the ConnAck
	SessionExpiryInterval uint32

	// The Server uses this value to limit the number of QoS 1 and QoS 2 publications
	// that it is willing to process concurrently for the Client.
	//
	// It does not provide a mechanism to limit the QoS 0 publications that
	// the Client might try to send
	MaxRecv uint16

	MaxQos QosLevel

	// Declares whether the Server supports retained messages.
	// true means that retained messages are not supported.
	// false means retained messages are supported
	RetainAvail bool

	// Maximum Packet Size the Server is willing to accept.
	// If the Maximum Packet Size is not present, there is no limit on the
	// packet size imposed beyond the limitations in the protocol as a
	// result of the remaining length encoding and the protocol header sizes
	MaxPacketSize uint32

	// The Client Identifier which was assigned by the Server
	// because a zero length Client Identifier was found in the ConnPacket
	AssignedClientID string

	// This value indicates the highest value that the Server will accept
	// as a Topic Alias sent by the Client.
	//
	// The Server uses this value to limit the number of Topic Aliases
	// that it is willing to hold on this Connection.
	MaxTopicAlias uint16

	// Human readable string designed for diagnostics
	Reason string

	// User defines Properties
	UserProps UserProps

	// Whether the Server supports Wildcard Subscriptions.
	// false means that Wildcard Subscriptions are not supported.
	// true means Wildcard Subscriptions are supported.
	//
	// default is true
	WildcardSubAvail bool // 40

	// Whether the Server supports Subscription Identifiers.
	// false means that Subscription Identifiers are not supported.
	// true means Subscription Identifiers are supported.
	//
	// default is true
	SubIDAvail bool

	// Whether the Server supports Shared Subscriptions.
	// false means that Shared Subscriptions are not supported.
	// true means Shared Subscriptions are supported
	//
	// default is true
	SharedSubAvail bool

	// Keep Alive time assigned by the Server
	ServerKeepalive uint16

	// Response Information
	RespInfo string

	// Can be used by the Client to identify another Server to use
	ServerRef string

	// The name of the authentication method
	AuthMethod string

	// The contents of this data are defined by the authentication method.
	AuthData []byte
}

ConnAckProps defines connect acknowledge properties

type ConnHandler

type ConnHandler func(server string, code byte, err error)

ConnHandler is the handler which tend to the Connect result server is the server address provided by user in client creation call code is the ConnResult code err is the error happened when connect to server, if a error happened, the code value will max byte value (255)

type ConnPacket

type ConnPacket struct {
	BasePacket
	ProtoName string

	CleanSession bool
	IsWill       bool
	WillQos      QosLevel
	WillRetain   bool

	// Properties
	Props *ConnProps

	// Payloads
	Username    string
	Password    string
	ClientID    string
	Keepalive   uint16
	WillTopic   string
	WillMessage []byte
}

ConnPacket is the first packet sent by Client to Server

func (*ConnPacket) Bytes

func (c *ConnPacket) Bytes() []byte

func (*ConnPacket) Type

func (c *ConnPacket) Type() CtrlType

Type ConnPacket's type is CtrlConn

type ConnProps

type ConnProps struct {
	// If the Session Expiry Interval is absent the value 0 is used.
	// If it is set to 0, or is absent, the Session ends when the Network Connection is closed.
	// If the Session Expiry Interval is 0xFFFFFFFF (UINT_MAX), the Session does not expire.
	SessionExpiryInterval uint32

	// The Client uses this value to limit the number of QoS 1 and QoS 2 publications
	// that it is willing to process concurrently.
	//
	// There is no mechanism to limit the QoS 0 publications that the Server might try to send.
	//
	// The value of Receive Maximum applies only to the current Network Connection.
	// If the Receive Maximum value is absent then its value defaults to 65,535
	MaxRecv uint16

	// The Maximum Packet Size the Client is willing to accept
	//
	// If the Maximum Packet Size is not present,
	// no limit on the packet size is imposed beyond the limitations in the protocol as a result of the remaining length encoding and the protocol header sizes
	MaxPacketSize uint32

	// This value indicates the highest value that the Client will accept
	// as a Topic Alias sent by the Server.
	//
	// The Client uses this value to limit the number of Topic Aliases that
	// it is willing to hold on this Connection.
	MaxTopicAlias uint16

	// The Client uses this value to request the Server to return Response
	// Information in the ConnAckPacket
	ReqRespInfo bool

	// The Client uses this value to indicate whether the Reason String
	// or User Properties are sent in the case of failures.
	ReqProblemInfo bool

	// User defined Properties
	UserProps UserProps

	// If Authentication Method is absent, extended authentication is not performed.
	//
	// If a Client sets an Authentication Method in the ConnPacket,
	// the Client MUST NOT send any packets other than AuthPacket or DisConn packets
	// until it has received a ConnAck packet
	AuthMethod string

	// The contents of this data are defined by the authentication method.
	AuthData []byte
}

ConnProps defines connect packet properties

type CtrlType

type CtrlType byte

CtrlType is MQTT Control packet type

const (
	CtrlConn      CtrlType = 1  // Connect
	CtrlConnAck   CtrlType = 2  // connect ack
	CtrlPublish   CtrlType = 3  // publish
	CtrlPubAck    CtrlType = 4  // publish ack
	CtrlPubRecv   CtrlType = 5  // publish received
	CtrlPubRel    CtrlType = 6  // publish release
	CtrlPubComp   CtrlType = 7  // publish complete
	CtrlSubscribe CtrlType = 8  // subscribe
	CtrlSubAck    CtrlType = 9  // subscribe ack
	CtrlUnSub     CtrlType = 10 // unsubscribe
	CtrlUnSubAck  CtrlType = 11 // unsubscribe ack
	CtrlPingReq   CtrlType = 12 // ping request
	CtrlPingResp  CtrlType = 13 // ping response
	CtrlDisConn   CtrlType = 14 // disconnect
	CtrlAuth      CtrlType = 15 // authentication (since MQTT 5)
)

type DisConnPacket

type DisConnPacket struct {
	BasePacket
	Code  byte
	Props *DisConnProps
}

DisConnPacket is the final Control Packet sent from the Client to the Server. It indicates that the Client is disconnecting cleanly.

func (*DisConnPacket) Bytes

func (d *DisConnPacket) Bytes() []byte

func (*DisConnPacket) Type

func (d *DisConnPacket) Type() CtrlType

Type of DisConnPacket is CtrlDisConn

type DisConnProps

type DisConnProps struct {
	// Session Expiry Interval in seconds
	// If the Session Expiry Interval is absent, the Session Expiry Interval in the CONNECT packet is used
	//
	// The Session Expiry Interval MUST NOT be sent on a DISCONNECT by the Server
	SessionExpiryInterval uint32

	// Human readable, designed for diagnostics and SHOULD NOT be parsed by the receiver
	Reason string

	// User defines Properties
	UserProps UserProps

	// Used by the Client to identify another Server to use
	ServerRef string
}

DisConnProps properties for DisConnPacket

type LogLevel

type LogLevel byte

LogLevel is used to set log level in client creation

const (
	Silent  LogLevel = iota // Silent no log
	Verbose                 // Verbose log all
	Debug                   // Debug log with debug and above
	Info                    // Info log with info and above
	Warning                 // Warning log with warning and above
	Error                   // Error log error only
)

type NetHandler

type NetHandler func(server string, err error)

NetHandler handles the error occurred when net broken

type Option

type Option func(*client) error

Option is client option for connection options

func WithBackoffStrategy

func WithBackoffStrategy(firstDelay, maxDelay time.Duration, factor float64) Option

WithBackoffStrategy will set reconnect backoff strategy firstDelay is the time to wait before retrying after the first failure maxDelay defines the upper bound of backoff delay factor is applied to the backoff after each retry.

e.g. FirstDelay = 1s and Factor = 2, then the SecondDelay is 2s, the ThirdDelay is 4s

func WithBuf

func WithBuf(sendBuf, recvBuf int) Option

WithBuf designate the channel size of send and recv

func WithCleanSession

func WithCleanSession(f bool) Option

WithCleanSession will set clean flag in connect packet

func WithClientID

func WithClientID(clientID string) Option

WithClientID set the client id for connection

func WithCustomTLS

func WithCustomTLS(config *tls.Config) Option

WithCustomTLS replaces the TLS options with a custom tls.Config

func WithDialTimeout

func WithDialTimeout(timeout uint16) Option

WithDialTimeout for connection time out (time in second)

func WithIdentity

func WithIdentity(username, password string) Option

WithIdentity for username and password

func WithKeepalive

func WithKeepalive(keepalive uint16, factor float64) Option

WithKeepalive set the keepalive interval (time in second)

func WithLog

func WithLog(l LogLevel) Option

WithLog will create basic logger for log

func WithPersist

func WithPersist(method PersistMethod) Option

WithPersist defines the persist method to be used

func WithRouter

func WithRouter(r TopicRouter) Option

WithRouter set the router for topic dispatch

func WithServer

func WithServer(servers ...string) Option

WithServer adds servers as client server Just use `ip:port` or `domain.name:port`, only TCP connection supported for now

func WithTLS

func WithTLS(certFile, keyFile string, caCert string, serverNameOverride string, skipVerify bool) Option

WithTLS for client tls certification

func WithVersion

func WithVersion(version ProtoVersion, compromise bool) Option

WithVersion defines the mqtt protocol ProtoVersion in use

func WithWill

func WithWill(topic string, qos QosLevel, retain bool, payload []byte) Option

WithWill mark this connection as a will teller

type Packet

type Packet interface {
	// Type return the packet type
	Type() CtrlType

	// Bytes encode packet to bytes
	Bytes() []byte

	// Version MQTT version of the packet
	Version() ProtoVersion
}

Packet is MQTT control packet

func Decode

func Decode(version ProtoVersion, reader BufferedReader) (Packet, error)

Decode will decode one mqtt packet

type PersistHandler

type PersistHandler func(err error)

PersistHandler handles err happened when persist process has trouble

type PersistMethod

type PersistMethod interface {
	// Name of what persist strategy used
	Name() string

	// Store a packet with key
	Store(key string, p Packet) error

	// Load a packet from stored data according to the key
	Load(key string) (Packet, bool)

	// Range over data stored, return false to break the range
	Range(func(key string, p Packet) bool)

	// Delete
	Delete(key string) error

	// Destroy stored data
	Destroy() error
}

PersistMethod defines the behavior of persist methods

func NewFilePersist

func NewFilePersist(dirPath string, strategy *PersistStrategy) PersistMethod

NewFilePersist will create a file persist method with provided dirPath and strategy, if no strategy provided (nil), then the default strategy will be used

func NewMemPersist

func NewMemPersist(strategy *PersistStrategy) PersistMethod

NewMemPersist create a in memory persist method with provided strategy if no strategy provided (nil), then the default strategy will be used

type PersistStrategy

type PersistStrategy struct {
	// Interval applied to file/database persist
	// if this field is set to 0, means do persist per action
	// default value is 1s
	Interval time.Duration

	// MaxCount applied to all persist method
	// if this field set to 0, means no persist limit
	// for memory persist, means max in memory count
	// for file/database persist, means max entry in file/memory
	// default value is 0
	MaxCount uint32

	// DropOnExceed defines how to tackle with packets incoming
	// when max count is reached, default value is false
	DropOnExceed bool

	// DuplicateReplace defines whether duplicated key should
	// override previous one, default value is true
	DuplicateReplace bool
}

PersistStrategy defines the details to be complied in persist methods

type ProtoVersion

type ProtoVersion byte

ProtoVersion MQTT Protocol ProtoVersion

const (
	V311 ProtoVersion = 4 // V311 means MQTT 3.1.1
	V5   ProtoVersion = 5 // V5 means MQTT 5
)

type PubAckPacket

type PubAckPacket struct {
	BasePacket
	PacketID uint16
	Code     byte
	Props    *PubAckProps
}

PubAckPacket is the response to a PublishPacket with QoS level 1.

func (*PubAckPacket) Bytes

func (p *PubAckPacket) Bytes() []byte

func (*PubAckPacket) Type

func (p *PubAckPacket) Type() CtrlType

Type of PubAckPacket is CtrlPubAck

type PubAckProps

type PubAckProps struct {
	// Human readable string designed for diagnostics
	Reason string

	// UserProps User defined Properties
	UserProps UserProps
}

PubAckProps properties for PubAckPacket

type PubCompPacket

type PubCompPacket struct {
	BasePacket
	PacketID uint16
	Code     byte
	Props    *PubCompProps
}

PubCompPacket is the response to a PubRelPacket. It is the fourth and final packet of the QoS 892 2 protocol exchange. 893

func (*PubCompPacket) Bytes

func (p *PubCompPacket) Bytes() []byte

func (*PubCompPacket) Type

func (p *PubCompPacket) Type() CtrlType

Type of PubCompPacket is CtrlPubComp

type PubCompProps

type PubCompProps struct {
	// Human readable string designed for diagnostics
	Reason string

	// UserProps User defined Properties
	UserProps UserProps
}

PubCompProps properties for PubCompPacket

type PubHandler

type PubHandler func(topic string, err error)

PubHandler handles the error occurred when publish some message if err is not nil, that means a error occurred when sending pub msg

type PubRecvPacket

type PubRecvPacket struct {
	BasePacket
	PacketID uint16
	Code     byte
	Props    *PubRecvProps
}

PubRecvPacket is the response to a PublishPacket with QoS 2. It is the second packet of the QoS 2 protocol exchange.

func (*PubRecvPacket) Bytes

func (p *PubRecvPacket) Bytes() []byte

func (*PubRecvPacket) Type

func (p *PubRecvPacket) Type() CtrlType

Type of PubRecvPacket is CtrlPubRecv

type PubRecvProps

type PubRecvProps struct {
	// Human readable string designed for diagnostics
	Reason string

	// UserProps User defined Properties
	UserProps UserProps
}

PubRecvProps properties for PubRecvPacket

type PubRelPacket

type PubRelPacket struct {
	BasePacket
	PacketID uint16
	Code     byte
	Props    *PubRelProps
}

PubRelPacket is the response to a PubRecvPacket. It is the third packet of the QoS 2 protocol exchange.

func (*PubRelPacket) Bytes

func (p *PubRelPacket) Bytes() []byte

func (*PubRelPacket) Type

func (p *PubRelPacket) Type() CtrlType

Type of PubRelPacket is CtrlPubRel

type PubRelProps

type PubRelProps struct {
	// Human readable string designed for diagnostics
	Reason string

	// UserProps User defined Properties
	UserProps UserProps
}

PubRelProps properties for PubRelPacket

type PublishPacket

type PublishPacket struct {
	BasePacket
	IsDup     bool
	Qos       QosLevel
	IsRetain  bool
	TopicName string
	Payload   []byte
	PacketID  uint16
	Props     *PublishProps
}

PublishPacket is sent from a Client to a Server or from Server to a Client to transport an Application Message.

func (*PublishPacket) Bytes

func (p *PublishPacket) Bytes() []byte

func (*PublishPacket) Type

func (p *PublishPacket) Type() CtrlType

Type of PublishPacket is CtrlPublish

type PublishProps

type PublishProps struct {
	// PayloadFormat Indicator
	// 0, Indicates that the Payload is unspecified bytes, which is equivalent to not sending a Payload Format Indicator
	// 1, Indicates that the Payload is UTF-8 Encoded Character Data. The UTF-8 data in the Payload
	PayloadFormat byte // required in server

	// MessageExpiryInterval
	// Lifetime of the Application Message in seconds
	// If absent, the Application Message does not expire
	MessageExpiryInterval uint32

	// A Topic Alias is an integer value that is used to identify the Topic
	// instead of using the Topic Name.
	//
	// This reduces the size of the PUBLISH packet, and is useful when the
	// Topic Names are long and the same Topic Names are used repetitively
	// within a Network Connection
	TopicAlias uint16

	// RespTopic Used as the Topic Name for a response message
	RespTopic string

	// CorrelationData used by the sender of the Request Message to identify which request the Response Message is for when it is received
	CorrelationData []byte

	// User defined Properties
	UserProps UserProps

	// SubIDs the identifier of the subscription (always no 0)
	//
	// Multiple Subscription Identifiers will be included if the publication
	// is the result of a match to more than one subscription, in this case
	// their order is not significant
	SubIDs []int

	// ContentType describe the content of the Application Message
	ContentType string
}

PublishProps properties for PublishPacket

type QosLevel

type QosLevel = byte

QosLevel is either 0, 1, 2

const (
	Qos0 QosLevel = 0x00 // Qos0 = 0
	Qos1 QosLevel = 0x01 // Qos1 = 1
	Qos2 QosLevel = 0x02 // Qos2 = 2
)

type RegexRouter

type RegexRouter struct {
	// contains filtered or unexported fields
}

RegexRouter use regex to match topic messages

func NewRegexRouter

func NewRegexRouter() *RegexRouter

NewRegexRouter will create a regex router

func (*RegexRouter) Dispatch

func (r *RegexRouter) Dispatch(p *PublishPacket)

Dispatch the received packet

func (*RegexRouter) Handle

func (r *RegexRouter) Handle(topicRegex string, h TopicHandler)

Handle will register the topic with handler

func (*RegexRouter) Name

func (r *RegexRouter) Name() string

Name is the name of router

type StandardRouter

type StandardRouter struct {
	// contains filtered or unexported fields
}

StandardRouter implements standard MQTT routing behaviour

func NewStandardRouter

func NewStandardRouter() *StandardRouter

NewStandardRouter will create a standard mqtt router

func (*StandardRouter) Dispatch

func (s *StandardRouter) Dispatch(p *PublishPacket)

Dispatch defines the action to dispatch published packet

func (*StandardRouter) Handle

func (s *StandardRouter) Handle(topic string, h TopicHandler)

Handle defines how to register topic with handler

func (*StandardRouter) Name

func (s *StandardRouter) Name() string

Name is the name of router

type SubAckPacket

type SubAckPacket struct {
	BasePacket
	PacketID uint16
	Codes    []byte
	Props    *SubAckProps
}

SubAckPacket is sent by the Server to the Client to confirm receipt and processing of a SubscribePacket.

SubAckPacket contains a list of return codes, that specify the maximum QoS level that was granted in each Subscription that was requested by the SubscribePacket.

func (*SubAckPacket) Bytes

func (s *SubAckPacket) Bytes() []byte

func (*SubAckPacket) Type

func (s *SubAckPacket) Type() CtrlType

Type of SubAckPacket is CtrlSubAck

type SubAckProps

type SubAckProps struct {
	// Human readable string designed for diagnostics
	Reason string

	// UserProps User defined Properties
	UserProps UserProps
}

SubAckProps properties for SubAckPacket

type SubHandler

type SubHandler func(topics []*Topic, err error)

SubHandler handles the error occurred when subscribe some topic if err is not nil, that means a error occurred when sending sub msg

type SubscribePacket

type SubscribePacket struct {
	BasePacket
	PacketID uint16
	Topics   []*Topic
	Props    *SubscribeProps
}

SubscribePacket is sent from the Client to the Server to create one or more Subscriptions.

Each Subscription registers a Client's interest in one or more TopicNames. The Server sends PublishPackets to the Client in order to forward Application Messages that were published to TopicNames that match these Subscriptions. The SubscribePacket also specifies (for each Subscription) the maximum QoS with which the Server can send Application Messages to the Client

func (*SubscribePacket) Bytes

func (s *SubscribePacket) Bytes() []byte

func (*SubscribePacket) Type

func (s *SubscribePacket) Type() CtrlType

Type of SubscribePacket is CtrlSubscribe

type SubscribeProps

type SubscribeProps struct {
	// SubID identifier of the subscription
	SubID uint32
	// UserProps User defined Properties
	UserProps UserProps
}

SubscribeProps properties for SubscribePacket

type TextRouter

type TextRouter struct {
	// contains filtered or unexported fields
}

TextRouter uses plain string comparison to dispatch topic message this is the default router in client

func NewTextRouter

func NewTextRouter() *TextRouter

NewTextRouter will create a text based router

func (*TextRouter) Dispatch

func (r *TextRouter) Dispatch(p *PublishPacket)

Dispatch the received packet

func (*TextRouter) Handle

func (r *TextRouter) Handle(topic string, h TopicHandler)

Handle will register the topic with handler

func (*TextRouter) Name

func (r *TextRouter) Name() string

Name of TextRouter is "TextRouter"

type Topic

type Topic struct {
	Name string
	Qos  QosLevel
}

Topic for both topic name and topic qos

func (*Topic) String

func (t *Topic) String() string

type TopicHandler

type TopicHandler func(topic string, qos QosLevel, msg []byte)

TopicHandler handles topic sub message topic is the client user provided topic code can be SubOkMaxQos0, SubOkMaxQos1, SubOkMaxQos2, SubFail

type TopicRouter

type TopicRouter interface {
	// Name is the name of router
	Name() string
	// Handle defines how to register topic with handler
	Handle(topic string, h TopicHandler)
	// Dispatch defines the action to dispatch published packet
	Dispatch(p *PublishPacket)
}

TopicRouter defines how to route the topic message to handler

type UnSubAckPacket

type UnSubAckPacket struct {
	BasePacket
	PacketID uint16
	Props    *UnSubAckProps
}

UnSubAckPacket is sent by the Server to the Client to confirm receipt of an UnSubPacket

func (*UnSubAckPacket) Bytes

func (s *UnSubAckPacket) Bytes() []byte

func (*UnSubAckPacket) Type

func (s *UnSubAckPacket) Type() CtrlType

Type of UnSubAckPacket is CtrlUnSubAck

type UnSubAckProps

type UnSubAckProps struct {
	// Human readable string designed for diagnostics
	Reason string

	// UserProps User defined Properties
	UserProps UserProps
}

UnSubAckProps properties for UnSubAckPacket

type UnSubHandler

type UnSubHandler func(topic []string, err error)

UnSubHandler handles the error occurred when publish some message

type UnSubPacket

type UnSubPacket struct {
	BasePacket
	PacketID   uint16
	TopicNames []string
	Props      *UnSubProps
}

UnSubPacket is sent by the Client to the Server, to unsubscribe from topics.

func (*UnSubPacket) Bytes

func (s *UnSubPacket) Bytes() []byte

func (*UnSubPacket) Type

func (s *UnSubPacket) Type() CtrlType

Type of UnSubPacket is CtrlUnSub

type UnSubProps

type UnSubProps struct {
	// UserProps User defined Properties
	UserProps UserProps
}

UnSubProps properties for UnSubPacket

type UserProps

type UserProps map[string][]string

UserProps contains user defined properties

Directories

Path Synopsis
c
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL