mqtt

package module
v0.19.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2024 License: Apache-2.0 Imports: 18 Imported by: 30

README

mqtt-go

Go Reference ci codecov Go Report Card License

Yet another Go MQTT 3.1.1 client library

  • Go-ish interface

    Fully context controlled and mockable interface.

  • Extensible

    Easy to implement a wrapper with unified interface. e.g. AWS IoT WebSocket dialer with automatic presign URL updater is available: AWS IoT Device SDK for Go

  • Thread-safe

    All functions and structs are safe to be used from multiple goroutines.

Migration guide

Examples

Reference

License

This package is licensed under Apache License Version 2.0.

Documentation

Overview

Package mqtt is a thread safe and context controlled MQTT 3.1.1 client library.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrClosedClient = errors.New("operation on closed client")

ErrClosedClient means operation was requested on closed client.

View Source
var ErrClosedTransport = errors.New("read/write on closed transport")

ErrClosedTransport means that the underlying connection is closed.

View Source
var ErrConnectionFailed = errors.New("connection failed")

ErrConnectionFailed means the connection is not established.

View Source
var ErrInvalidPacket = errors.New("invalid packet")

ErrInvalidPacket means that an invalid message is arrived from the broker.

View Source
var ErrInvalidPacketLength = errors.New("invalid packet length")

ErrInvalidPacketLength means that an invalid length of the message is arrived.

View Source
var ErrInvalidQoS = errors.New("invalid QoS")

ErrInvalidQoS means the QoS value is not allowed.

View Source
var ErrInvalidRune = errors.New("invalid rune in UTF-8 string")

ErrInvalidRune means that the string has a rune not allowed in MQTT.

View Source
var ErrInvalidSubAck = errors.New("invalid SUBACK")

ErrInvalidSubAck means that the incomming SUBACK packet is inconsistent with the request.

View Source
var ErrInvalidTopicFilter = errors.New("invalid topic filter")

ErrInvalidTopicFilter means that the topic filter string is invalid.

View Source
var ErrKeepAliveDisabled = errors.New("keep alive disabled")

ErrKeepAliveDisabled is returned if Runned on keep alive disabled connection.

View Source
var ErrNotConnected = errors.New("not connected")

ErrNotConnected is returned if a function is called before Connect.

View Source
var ErrPayloadLenExceeded = errors.New("payload length exceeded")

ErrPayloadLenExceeded means the payload length is too large.

View Source
var ErrPingTimeout = errors.New("ping timeout")

ErrPingTimeout is returned on ping response timeout.

View Source
var ErrUnsupportedProtocol = errors.New("unsupported protocol")

ErrUnsupportedProtocol means that the specified scheme in the URL is not supported.

Functions

func KeepAlive added in v0.1.0

func KeepAlive(ctx context.Context, cli Client, interval, timeout time.Duration) error

KeepAlive runs keep alive loop. It must be called after Connect and interval must be smaller than the value specified by WithKeepAlive option passed to Connect. Caller should close the connection if it returned error according to MQTT 3.1.1 spec. 3.1.2.10.

Types

type BaseClient

type BaseClient struct {
	// Transport is an underlying connection. Typically net.Conn.
	Transport io.ReadWriteCloser
	// ConnState is called if the connection state is changed.
	ConnState func(ConnState, error)

	// MaxPayloadLen is a maximum allowed length of message payload.
	// 0 means unlimited. (It will panic if exceeds protocol maximum message length (256MB).)
	MaxPayloadLen int
	// contains filtered or unexported fields
}

BaseClient is a low layer MQTT client. Zero values with valid underlying Transport is a valid BaseClient.

func DialContext added in v0.14.0

func DialContext(ctx context.Context, urlStr string, opts ...DialOption) (*BaseClient, error)

DialContext creates MQTT client using URL string.

func (*BaseClient) Close

func (c *BaseClient) Close() error

Close force closes MQTT connection.

func (*BaseClient) Connect

func (c *BaseClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)

Connect to the broker.

func (*BaseClient) Disconnect

func (c *BaseClient) Disconnect(ctx context.Context) error

Disconnect from the broker.

func (*BaseClient) Done added in v0.1.0

func (c *BaseClient) Done() <-chan struct{}

Done is a channel to signal connection close.

func (*BaseClient) Err added in v0.1.0

func (c *BaseClient) Err() error

Err returns connection error.

func (*BaseClient) Handle added in v0.1.0

func (c *BaseClient) Handle(handler Handler)

Handle registers the message handler.

func (*BaseClient) Ping

func (c *BaseClient) Ping(ctx context.Context) error

Ping to the broker.

func (*BaseClient) Publish

func (c *BaseClient) Publish(ctx context.Context, message *Message) error

Publish a message to the broker. ID field of the message is filled if zero.

func (*BaseClient) SetErrorOnce added in v0.11.1

func (c *BaseClient) SetErrorOnce(err error)

SetErrorOnce sets client error value if not yet set.

func (*BaseClient) Stats added in v0.19.0

func (c *BaseClient) Stats() BaseStats

Stats returns base client stats.

func (*BaseClient) Subscribe

func (c *BaseClient) Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)

Subscribe topics.

func (*BaseClient) Unsubscribe

func (c *BaseClient) Unsubscribe(ctx context.Context, subs ...string) error

Unsubscribe topics.

func (*BaseClient) ValidateMessage added in v0.10.0

func (c *BaseClient) ValidateMessage(message *Message) error

ValidateMessage validates given message.

type BaseClientStoreDialer added in v0.13.0

type BaseClientStoreDialer struct {
	// Dialer is a wrapped dialer. Valid Dialer must be set before use.
	Dialer
	// contains filtered or unexported fields
}

BaseClientStoreDialer is a dialer wrapper which stores the latest BaseClient.

Example
dialer := &BaseClientStoreDialer{Dialer: &URLDialer{URL: "mqtt://localhost:1883"}}
cli, err := NewReconnectClient(dialer)
if err != nil {
	panic(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if _, err := cli.Connect(ctx, "TestClient"); err != nil {
	panic(err)
}
defer cli.Disconnect(context.Background())

// Publish asynchronously
if err := cli.Publish(
	ctx, &Message{Topic: "test", QoS: QoS1, Payload: []byte("async")},
); err != nil {
	panic(err)
}

// Publish synchronously
if err := dialer.BaseClient().Publish(
	ctx, &Message{Topic: "test", QoS: QoS1, Payload: []byte("sync")},
); err != nil {
	panic(err)
}
Output:

func (*BaseClientStoreDialer) BaseClient added in v0.13.0

func (d *BaseClientStoreDialer) BaseClient() *BaseClient

BaseClient returns latest BaseClient created by the dialer. It returns nil before the first call of Dial.

func (*BaseClientStoreDialer) DialContext added in v0.14.0

func (d *BaseClientStoreDialer) DialContext(ctx context.Context) (*BaseClient, error)

DialContext creates a new BaseClient.

type BaseStats added in v0.19.0

type BaseStats struct {
	// Recent ping delay.
	PingDelayRecent time.Duration
	// Maximum ping delay.
	PingDelayMax time.Duration
	// Minimum ping delay.
	PingDelayMin time.Duration
	// Count of ping error.
	CountPingError int
}

BaseStats stores base client statistics.

type Client

type Client interface {
	Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)
	Disconnect(ctx context.Context) error
	Publish(ctx context.Context, message *Message) error
	Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)
	Unsubscribe(ctx context.Context, subs ...string) error
	Ping(ctx context.Context) error
	Handle(Handler)
}

Client is the interface of MQTT client.

type ClientCloser added in v0.1.0

type ClientCloser interface {
	Client
	Closer
}

ClientCloser groups Client and Closer interface

type Closer added in v0.1.0

type Closer interface {
	Close() error
	Done() <-chan struct{}
	Err() error
}

Closer is the interface of connection closer.

type ConnState

type ConnState int

ConnState represents the status of MQTT connection.

const (
	StateNew          ConnState = iota // initial state
	StateActive                        // connected to the broker
	StateClosed                        // connection is unexpectedly closed
	StateDisconnected                  // connection is expectedly closed
)

ConnState values.

func (ConnState) String added in v0.4.7

func (s ConnState) String() string

type ConnectOption

type ConnectOption func(*ConnectOptions) error

ConnectOption sets option for Connect.

func WithCleanSession

func WithCleanSession(cleanSession bool) ConnectOption

WithCleanSession sets clean session flag.

func WithKeepAlive added in v0.1.0

func WithKeepAlive(interval uint16) ConnectOption

WithKeepAlive sets keep alive interval in seconds.

func WithProtocolLevel added in v0.3.0

func WithProtocolLevel(level ProtocolLevel) ConnectOption

WithProtocolLevel sets protocol level.

func WithUserNamePassword

func WithUserNamePassword(userName, password string) ConnectOption

WithUserNamePassword sets plain text auth information used in Connect.

func WithWill

func WithWill(will *Message) ConnectOption

WithWill sets will message.

type ConnectOptions

type ConnectOptions struct {
	UserName      string
	Password      string
	CleanSession  bool
	KeepAlive     uint16
	Will          *Message
	ProtocolLevel ProtocolLevel
}

ConnectOptions represents options for Connect.

type ConnectionError added in v0.4.4

type ConnectionError struct {
	Err  error
	Code ConnectionReturnCode
}

ConnectionError ia a error storing connection return code.

func (*ConnectionError) Error added in v0.4.4

func (e *ConnectionError) Error() string

func (*ConnectionError) Unwrap added in v0.4.4

func (e *ConnectionError) Unwrap() error

Unwrap returns base error of ConnectionError. (for Go1.13 error unwrapping.)

type ConnectionReturnCode

type ConnectionReturnCode byte

ConnectionReturnCode represents return code of connect request.

const (
	ConnectionAccepted          ConnectionReturnCode = 0
	UnacceptableProtocolVersion ConnectionReturnCode = 1
	IdentifierRejected          ConnectionReturnCode = 2
	ServerUnavailable           ConnectionReturnCode = 3
	BadUserNameOrPassword       ConnectionReturnCode = 4
	NotAuthorized               ConnectionReturnCode = 5
)

Connection acceptance/rejection code.

func (ConnectionReturnCode) String

func (c ConnectionReturnCode) String() string

type DialOption

type DialOption func(*DialOptions) error

DialOption sets option for Dial.

func WithConnStateHandler added in v0.6.0

func WithConnStateHandler(handler func(ConnState, error)) DialOption

WithConnStateHandler sets connection state change handler.

func WithDialer

func WithDialer(dialer *net.Dialer) DialOption

WithDialer sets dialer.

func WithMaxPayloadLen added in v0.10.0

func WithMaxPayloadLen(l int) DialOption

WithMaxPayloadLen sets maximum payload length of the BaseClient.

func WithTLSCertFiles added in v0.9.0

func WithTLSCertFiles(host, caFile, certFile, privateKeyFile string) DialOption

WithTLSCertFiles loads certificate files

func WithTLSConfig

func WithTLSConfig(config *tls.Config) DialOption

WithTLSConfig sets TLS configuration.

type DialOptions

type DialOptions struct {
	Dialer        *net.Dialer
	TLSConfig     *tls.Config
	ConnState     func(ConnState, error)
	MaxPayloadLen int
}

DialOptions stores options for Dial.

type Dialer added in v0.1.0

type Dialer interface {
	DialContext(context.Context) (*BaseClient, error)
}

Dialer is an interface to create connection.

type DialerFunc added in v0.4.1

type DialerFunc func(ctx context.Context) (*BaseClient, error)

DialerFunc type is an adapter to use functions as MQTT connection dialer.

func (DialerFunc) DialContext added in v0.14.0

func (d DialerFunc) DialContext(ctx context.Context) (*BaseClient, error)

DialContext calls d().

type Error added in v0.6.0

type Error struct {
	Err     error
	Failure string
	File    string
	Line    int
}

Error records a failed parsing.

func (*Error) Error added in v0.6.0

func (e *Error) Error() string

func (*Error) Is added in v0.6.0

func (e *Error) Is(target error) bool

Is reports whether chained error contains target. This is for Go1.13 error unwrapping.

func (*Error) Unwrap added in v0.6.0

func (e *Error) Unwrap() error

Unwrap returns the reason of the failure. This is for Go1.13 error unwrapping.

type ErrorWithRetry added in v0.12.0

type ErrorWithRetry interface {
	error
	Retry(context.Context, *BaseClient) error
}

ErrorWithRetry is a error with packets which should be retransmitted.

type Handler

type Handler interface {
	Serve(*Message)
}

Handler receives an MQTT message.

type HandlerFunc

type HandlerFunc func(*Message)

HandlerFunc type is an adapter to use functions as MQTT message handler.

func (HandlerFunc) Serve

func (h HandlerFunc) Serve(message *Message)

Serve calls h(message).

type Message

type Message struct {
	Topic   string
	ID      uint16
	QoS     QoS
	Retain  bool
	Dup     bool
	Payload []byte
}

Message represents MQTT message.

type NoContextDialer added in v0.14.0

type NoContextDialer struct {
	NoContextDialerIface
}

NoContextDialer is a wrapper to use Dialer of mqtt-go<1.14 as mqtt-go>=1.14 Dialer.

WARNING: passed context is ignored by NoContextDialer. Make sure timeout is handled inside NoContextDialer.

Example
d := oldDialer()
cli, err := NewReconnectClient(&NoContextDialer{d})
if err != nil {
	fmt.Println("error:", err.Error())
	return
}
cli.Handle(HandlerFunc(func(*Message) {}))
fmt.Println("ok")
Output:

ok

func (*NoContextDialer) DialContext added in v0.14.0

func (d *NoContextDialer) DialContext(context.Context) (*BaseClient, error)

DialContext wraps Dial without context.

WARNING: passed context is ignored by NoContextDialer. Make sure timeout is handled inside NoContextDialer.

type NoContextDialerIface added in v0.14.0

type NoContextDialerIface interface {
	Dial() (*BaseClient, error)
}

NoContextDialerIface is a Dialer interface of mqtt-go<1.14.

type ProtocolLevel added in v0.3.0

type ProtocolLevel byte

ProtocolLevel represents MQTT protocol level.

const (
	ProtocolLevel3 ProtocolLevel = 0x03 // MQTT 3.1
	ProtocolLevel4 ProtocolLevel = 0x04 // MQTT 3.1.1 (default)
)

ProtocolLevel values.

type QoS

type QoS uint8

QoS represents quality of service level.

const (
	QoS0             QoS = 0x00 // At most once delivery
	QoS1             QoS = 0x01 // At least once delivery
	QoS2             QoS = 0x02 // Exactly once delivery
	SubscribeFailure QoS = 0x80 // Rejected to subscribe
)

QoS values.

type ReconnectClient added in v0.15.0

type ReconnectClient interface {
	Client
	Retryer
}

ReconnectClient is a Client with reconnect and retry features.

func NewReconnectClient added in v0.1.0

func NewReconnectClient(dialer Dialer, opts ...ReconnectOption) (ReconnectClient, error)

NewReconnectClient creates a MQTT client with re-connect/re-publish/re-subscribe features.

type ReconnectOption added in v0.4.0

type ReconnectOption func(*ReconnectOptions) error

ReconnectOption sets option for Connect.

func WithAlwaysResubscribe added in v0.18.0

func WithAlwaysResubscribe(always bool) ReconnectOption

WithAlwaysResubscribe enables or disables re-subscribe on reconnect. Default value is false. This option can be used to ensure all subscriptions are restored even if the server is buggy.

func WithPingInterval added in v0.4.0

func WithPingInterval(interval time.Duration) ReconnectOption

WithPingInterval sets ping request interval. Default value is KeepAlive value set by ConnectOption.

func WithReconnectWait added in v0.4.0

func WithReconnectWait(base, max time.Duration) ReconnectOption

WithReconnectWait sets parameters of incremental reconnect wait.

func WithRetryClient added in v0.16.0

func WithRetryClient(cli *RetryClient) ReconnectOption

WithRetryClient sets RetryClient. Default value is zero RetryClient.

func WithTimeout added in v0.4.0

func WithTimeout(timeout time.Duration) ReconnectOption

WithTimeout sets timeout duration of server response. Default value is PingInterval.

type ReconnectOptions added in v0.4.0

type ReconnectOptions struct {
	ConnectOptions    []ConnectOption
	Timeout           time.Duration
	ReconnectWaitBase time.Duration
	ReconnectWaitMax  time.Duration
	PingInterval      time.Duration
	RetryClient       *RetryClient
	AlwaysResubscribe bool
}

ReconnectOptions represents options for Connect.

type RequestTimeoutError added in v0.16.0

type RequestTimeoutError struct {
	// contains filtered or unexported fields
}

RequestTimeoutError is a context deadline exceeded error caused by RetryClient.ResponseTimeout.

func (*RequestTimeoutError) Error added in v0.16.0

func (e *RequestTimeoutError) Error() string

Error implements error.

type RetryClient added in v0.1.0

type RetryClient struct {

	// Maximum duration to wait for acknoledge response.
	// Messages with QoS1 and QoS2 will be retried.
	ResponseTimeout time.Duration

	// Directly publish QoS0 messages without queuing.
	// It will cause inorder of the messages but performance may be increased.
	DirectlyPublishQoS0 bool

	// Callback to receive background errors on raw message publish/subscribe operations.
	OnError func(error)
	// contains filtered or unexported fields
}

RetryClient queues unacknowledged messages and retry on reconnect.

func (*RetryClient) Client added in v0.11.1

func (c *RetryClient) Client() *BaseClient

Client returns the base client.

func (*RetryClient) Connect added in v0.1.0

func (c *RetryClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)

Connect to the broker.

func (*RetryClient) Disconnect added in v0.4.3

func (c *RetryClient) Disconnect(ctx context.Context) error

Disconnect from the broker.

func (*RetryClient) Handle added in v0.1.0

func (c *RetryClient) Handle(handler Handler)

Handle registers the message handler.

func (*RetryClient) Ping added in v0.4.3

func (c *RetryClient) Ping(ctx context.Context) error

Ping to the broker.

func (*RetryClient) Publish added in v0.1.0

func (c *RetryClient) Publish(ctx context.Context, message *Message) error

Publish tries to publish the message and immediately returns. If it is not acknowledged to be published, the message will be queued.

func (*RetryClient) Resubscribe added in v0.1.0

func (c *RetryClient) Resubscribe(ctx context.Context)

Resubscribe subscribes all established subscriptions.

func (*RetryClient) Retry added in v0.4.7

func (c *RetryClient) Retry(ctx context.Context)

Retry all queued publish/subscribe requests.

func (*RetryClient) SetClient added in v0.1.0

func (c *RetryClient) SetClient(ctx context.Context, cli *BaseClient)

SetClient sets the new BaseClient. Call Retry() and Resubscribe() to process queued messages and subscriptions. The BaseClient must be unconnected when it is passed to the RetryClient.

func (*RetryClient) Stats added in v0.15.0

func (c *RetryClient) Stats() RetryStats

Stats returns retry stats.

func (*RetryClient) Subscribe added in v0.1.0

func (c *RetryClient) Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)

Subscribe tries to subscribe the topic and immediately return nil. If it is not acknowledged to be subscribed, the request will be queued. First return value ([]Subscription) is always nil.

func (*RetryClient) Unsubscribe added in v0.4.3

func (c *RetryClient) Unsubscribe(ctx context.Context, topics ...string) error

Unsubscribe tries to unsubscribe the topic and immediately return nil. If it is not acknowledged to be unsubscribed, the request will be queued.

type RetryStats added in v0.15.0

type RetryStats struct {
	// Number of queued tasks.
	QueuedTasks int
	// Number of queued messages and subscriptions.
	QueuedRetries int
	// Total number of proceeded tasks.
	TotalTasks int
	// Total number of retries.
	TotalRetries int
	// Count of SetClient.
	CountSetClient int
	// Count of Connect.
	CountConnect int
	// Count of error on Connect.
	CountConnectError int
}

RetryStats stores retry statistics.

type Retryer added in v0.15.0

type Retryer interface {
	// SetClient sets the new BaseClient.
	// Call Retry() and Resubscribe() to process queued messages and subscriptions.
	// The BaseClient must be unconnected when it is passed to the RetryClient.
	SetClient(ctx context.Context, cli *BaseClient)
	// Client returns the base client.
	Client() *BaseClient
	// Resubscribe subscribes all established subscriptions.
	Resubscribe(ctx context.Context)
	// Retry all queued publish/subscribe requests.
	Retry(ctx context.Context)
	// Stat returns retry stats.
	Stats() RetryStats
}

Retryer is an interface to control message retrying.

type ServeAsync added in v0.7.0

type ServeAsync struct {
	// Handler is an underlying handler.
	// Handler.Serve() will be called asynchronously.
	Handler
}

ServeAsync is a MQTT message handler to process messages asynchronously.

func (*ServeAsync) Serve added in v0.7.0

func (m *ServeAsync) Serve(message *Message)

Serve the message in a new goroutine.

type ServeMux

type ServeMux struct {
	// contains filtered or unexported fields
}

ServeMux is a MQTT message handler multiplexer. The idea is very similar to http.ServeMux.

func (*ServeMux) Handle

func (m *ServeMux) Handle(filter string, handler Handler) error

Handle registers the handler for the given pattern.

func (*ServeMux) HandleFunc

func (m *ServeMux) HandleFunc(filter string, handler func(*Message)) error

HandleFunc registers the handler function for the given pattern.

func (*ServeMux) Serve

func (m *ServeMux) Serve(message *Message)

Serve dispatches the message to the registered handlers.

type Subscription

type Subscription struct {
	Topic string
	QoS   QoS
}

Subscription represents MQTT subscription target.

type URLDialer added in v0.1.0

type URLDialer struct {
	URL     string
	Options []DialOption
}

URLDialer is a Dialer using URL string.

func (*URLDialer) DialContext added in v0.14.0

func (d *URLDialer) DialContext(ctx context.Context) (*BaseClient, error)

DialContext creates connection using its values.

Directories

Path Synopsis
examples
internal
filteredpipe
Package filteredpipe provides pipes with interceptor for testing.
Package filteredpipe provides pipes with interceptor for testing.
Package mockmqtt provides simple standalone mock of mqtt.Client.
Package mockmqtt provides simple standalone mock of mqtt.Client.
paho module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL