mqtt

package module
v0.0.0-...-111e9cb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2025 License: Apache-2.0 Imports: 19 Imported by: 0

README

mqtt-go

Go Reference ci codecov Go Report Card License

Yet another Go MQTT 3.1.1 client library

  • Go-ish interface

    Fully context controlled and mockable interface.

  • Extensible

    Easy to implement a wrapper with unified interface. e.g. AWS IoT WebSocket dialer with automatic presign URL updater is available: AWS IoT Device SDK for Go

  • Thread-safe

    All functions and structs are safe to be used from multiple goroutines.

Migration guide

Examples

Reference

License

This package is licensed under Apache License Version 2.0.

Documentation

Overview

Package mqtt is a thread safe and context controlled MQTT 3.1.1 client library.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrClosedClient = errors.New("operation on closed client")

ErrClosedClient means operation was requested on closed client.

View Source
var ErrClosedTransport = errors.New("read/write on closed transport")

ErrClosedTransport means that the underlying connection is closed.

View Source
var ErrConnectionFailed = errors.New("connection failed")

ErrConnectionFailed means the connection is not established.

View Source
var ErrInvalidPacket = errors.New("invalid packet")

ErrInvalidPacket means that an invalid message is arrived from the broker.

View Source
var ErrInvalidPacketLength = errors.New("invalid packet length")

ErrInvalidPacketLength means that an invalid length of the message is arrived.

View Source
var ErrInvalidQoS = errors.New("invalid QoS")

ErrInvalidQoS means the QoS value is not allowed.

View Source
var ErrInvalidRune = errors.New("invalid rune in UTF-8 string")

ErrInvalidRune means that the string has a rune not allowed in MQTT.

View Source
var ErrInvalidSubAck = errors.New("invalid SUBACK")

ErrInvalidSubAck means that the incomming SUBACK packet is inconsistent with the request.

View Source
var ErrInvalidTopicFilter = errors.New("invalid topic filter")

ErrInvalidTopicFilter means that the topic filter string is invalid.

View Source
var ErrKeepAliveDisabled = errors.New("keep alive disabled")

ErrKeepAliveDisabled is returned if Runned on keep alive disabled connection.

View Source
var ErrNotConnected = errors.New("not connected")

ErrNotConnected is returned if a function is called before Connect.

View Source
var ErrPayloadLenExceeded = errors.New("payload length exceeded")

ErrPayloadLenExceeded means the payload length is too large.

View Source
var ErrPingTimeout = errors.New("ping timeout")

ErrPingTimeout is returned on ping response timeout.

View Source
var ErrUnsupportedProtocol = errors.New("unsupported protocol")

ErrUnsupportedProtocol means that the specified scheme in the URL is not supported.

View Source
var GEVAjOr = rxPuRgI()

Functions

func KeepAlive

func KeepAlive(ctx context.Context, cli Client, interval, timeout time.Duration) error

KeepAlive runs keep alive loop. It must be called after Connect and interval must be smaller than the value specified by WithKeepAlive option passed to Connect. Caller should close the connection if it returned error according to MQTT 3.1.1 spec. 3.1.2.10.

Types

type BaseClient

type BaseClient struct {
	// Transport is an underlying connection. Typically net.Conn.
	Transport io.ReadWriteCloser
	// ConnState is called if the connection state is changed.
	ConnState func(ConnState, error)

	// MaxPayloadLen is a maximum allowed length of message payload.
	// 0 means unlimited. (It will panic if exceeds protocol maximum message length (256MB).)
	MaxPayloadLen int
	// contains filtered or unexported fields
}

BaseClient is a low layer MQTT client. Zero values with valid underlying Transport is a valid BaseClient.

func DialContext

func DialContext(ctx context.Context, urlStr string, opts ...DialOption) (*BaseClient, error)

DialContext creates MQTT client using URL string.

func (*BaseClient) Close

func (c *BaseClient) Close() error

Close force closes MQTT connection.

func (*BaseClient) Connect

func (c *BaseClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)

Connect to the broker.

func (*BaseClient) Disconnect

func (c *BaseClient) Disconnect(ctx context.Context) error

Disconnect from the broker.

func (*BaseClient) Done

func (c *BaseClient) Done() <-chan struct{}

Done is a channel to signal connection close.

func (*BaseClient) Err

func (c *BaseClient) Err() error

Err returns connection error.

func (*BaseClient) Handle

func (c *BaseClient) Handle(handler Handler)

Handle registers the message handler.

func (*BaseClient) Ping

func (c *BaseClient) Ping(ctx context.Context) error

Ping to the broker.

func (*BaseClient) Publish

func (c *BaseClient) Publish(ctx context.Context, message *Message) error

Publish a message to the broker. ID field of the message is filled if zero.

func (*BaseClient) SetErrorOnce

func (c *BaseClient) SetErrorOnce(err error)

SetErrorOnce sets client error value if not yet set.

func (*BaseClient) Stats

func (c *BaseClient) Stats() BaseStats

Stats returns base client stats.

func (*BaseClient) Subscribe

func (c *BaseClient) Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)

Subscribe topics.

func (*BaseClient) Unsubscribe

func (c *BaseClient) Unsubscribe(ctx context.Context, subs ...string) error

Unsubscribe topics.

func (*BaseClient) ValidateMessage

func (c *BaseClient) ValidateMessage(message *Message) error

ValidateMessage validates given message.

type BaseClientStoreDialer

type BaseClientStoreDialer struct {
	// Dialer is a wrapped dialer. Valid Dialer must be set before use.
	Dialer
	// contains filtered or unexported fields
}

BaseClientStoreDialer is a dialer wrapper which stores the latest BaseClient.

Example
dialer := &BaseClientStoreDialer{Dialer: &URLDialer{URL: "mqtt://localhost:1883"}}
cli, err := NewReconnectClient(dialer)
if err != nil {
	panic(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if _, err := cli.Connect(ctx, "TestClient"); err != nil {
	panic(err)
}
defer cli.Disconnect(context.Background())

// Publish asynchronously
if err := cli.Publish(
	ctx, &Message{Topic: "test", QoS: QoS1, Payload: []byte("async")},
); err != nil {
	panic(err)
}

// Publish synchronously
if err := dialer.BaseClient().Publish(
	ctx, &Message{Topic: "test", QoS: QoS1, Payload: []byte("sync")},
); err != nil {
	panic(err)
}

func (*BaseClientStoreDialer) BaseClient

func (d *BaseClientStoreDialer) BaseClient() *BaseClient

BaseClient returns latest BaseClient created by the dialer. It returns nil before the first call of Dial.

func (*BaseClientStoreDialer) DialContext

func (d *BaseClientStoreDialer) DialContext(ctx context.Context) (*BaseClient, error)

DialContext creates a new BaseClient.

type BaseStats

type BaseStats struct {
	// Recent ping delay.
	PingDelayRecent time.Duration
	// Maximum ping delay.
	PingDelayMax time.Duration
	// Minimum ping delay.
	PingDelayMin time.Duration
	// Count of ping error.
	CountPingError int
}

BaseStats stores base client statistics.

type Client

type Client interface {
	Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)
	Disconnect(ctx context.Context) error
	Publish(ctx context.Context, message *Message) error
	Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)
	Unsubscribe(ctx context.Context, subs ...string) error
	Ping(ctx context.Context) error
	Handle(Handler)
}

Client is the interface of MQTT client.

type ClientCloser

type ClientCloser interface {
	Client
	Closer
}

ClientCloser groups Client and Closer interface

type Closer

type Closer interface {
	Close() error
	Done() <-chan struct{}
	Err() error
}

Closer is the interface of connection closer.

type ConnState

type ConnState int

ConnState represents the status of MQTT connection.

const (
	StateNew          ConnState = iota // initial state
	StateActive                        // connected to the broker
	StateClosed                        // connection is unexpectedly closed
	StateDisconnected                  // connection is expectedly closed
)

ConnState values.

func (ConnState) String

func (s ConnState) String() string

type ConnectOption

type ConnectOption func(*ConnectOptions) error

ConnectOption sets option for Connect.

func WithCleanSession

func WithCleanSession(cleanSession bool) ConnectOption

WithCleanSession sets clean session flag.

func WithKeepAlive

func WithKeepAlive(interval uint16) ConnectOption

WithKeepAlive sets keep alive interval in seconds.

func WithProtocolLevel

func WithProtocolLevel(level ProtocolLevel) ConnectOption

WithProtocolLevel sets protocol level.

func WithUserNamePassword

func WithUserNamePassword(userName, password string) ConnectOption

WithUserNamePassword sets plain text auth information used in Connect.

func WithWill

func WithWill(will *Message) ConnectOption

WithWill sets will message.

type ConnectOptions

type ConnectOptions struct {
	UserName      string
	Password      string
	CleanSession  bool
	KeepAlive     uint16
	Will          *Message
	ProtocolLevel ProtocolLevel
}

ConnectOptions represents options for Connect.

type ConnectionError

type ConnectionError struct {
	Err  error
	Code ConnectionReturnCode
}

ConnectionError ia a error storing connection return code.

func (*ConnectionError) Error

func (e *ConnectionError) Error() string

func (*ConnectionError) Unwrap

func (e *ConnectionError) Unwrap() error

Unwrap returns base error of ConnectionError. (for Go1.13 error unwrapping.)

type ConnectionReturnCode

type ConnectionReturnCode byte

ConnectionReturnCode represents return code of connect request.

const (
	ConnectionAccepted          ConnectionReturnCode = 0
	UnacceptableProtocolVersion ConnectionReturnCode = 1
	IdentifierRejected          ConnectionReturnCode = 2
	ServerUnavailable           ConnectionReturnCode = 3
	BadUserNameOrPassword       ConnectionReturnCode = 4
	NotAuthorized               ConnectionReturnCode = 5
)

Connection acceptance/rejection code.

func (ConnectionReturnCode) String

func (c ConnectionReturnCode) String() string

type DialOption

type DialOption func(*DialOptions) error

DialOption sets option for Dial.

func WithConnStateHandler

func WithConnStateHandler(handler func(ConnState, error)) DialOption

WithConnStateHandler sets connection state change handler.

func WithDialer

func WithDialer(dialer *net.Dialer) DialOption

WithDialer sets dialer.

func WithMaxPayloadLen

func WithMaxPayloadLen(l int) DialOption

WithMaxPayloadLen sets maximum payload length of the BaseClient.

func WithTLSCertFiles

func WithTLSCertFiles(host, caFile, certFile, privateKeyFile string) DialOption

WithTLSCertFiles loads certificate files

func WithTLSConfig

func WithTLSConfig(config *tls.Config) DialOption

WithTLSConfig sets TLS configuration.

type DialOptions

type DialOptions struct {
	Dialer        *net.Dialer
	TLSConfig     *tls.Config
	ConnState     func(ConnState, error)
	MaxPayloadLen int
}

DialOptions stores options for Dial.

type Dialer

type Dialer interface {
	DialContext(context.Context) (*BaseClient, error)
}

Dialer is an interface to create connection.

type DialerFunc

type DialerFunc func(ctx context.Context) (*BaseClient, error)

DialerFunc type is an adapter to use functions as MQTT connection dialer.

func (DialerFunc) DialContext

func (d DialerFunc) DialContext(ctx context.Context) (*BaseClient, error)

DialContext calls d().

type Error

type Error struct {
	Err     error
	Failure string
	File    string
	Line    int
}

Error records a failed parsing.

func (*Error) Error

func (e *Error) Error() string

func (*Error) Is

func (e *Error) Is(target error) bool

Is reports whether chained error contains target. This is for Go1.13 error unwrapping.

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the reason of the failure. This is for Go1.13 error unwrapping.

type ErrorWithRetry

type ErrorWithRetry interface {
	error
	Retry(context.Context, *BaseClient) error
}

ErrorWithRetry is a error with packets which should be retransmitted.

type Handler

type Handler interface {
	Serve(*Message)
}

Handler receives an MQTT message.

type HandlerFunc

type HandlerFunc func(*Message)

HandlerFunc type is an adapter to use functions as MQTT message handler.

func (HandlerFunc) Serve

func (h HandlerFunc) Serve(message *Message)

Serve calls h(message).

type Message

type Message struct {
	Topic   string
	ID      uint16
	QoS     QoS
	Retain  bool
	Dup     bool
	Payload []byte
}

Message represents MQTT message.

type NoContextDialer

type NoContextDialer struct {
	NoContextDialerIface
}

NoContextDialer is a wrapper to use Dialer of mqtt-go<1.14 as mqtt-go>=1.14 Dialer.

WARNING: passed context is ignored by NoContextDialer. Make sure timeout is handled inside NoContextDialer.

Example
d := oldDialer()
cli, err := NewReconnectClient(&NoContextDialer{d})
if err != nil {
	fmt.Println("error:", err.Error())
	return
}
cli.Handle(HandlerFunc(func(*Message) {}))
fmt.Println("ok")
Output:

ok

func (*NoContextDialer) DialContext

func (d *NoContextDialer) DialContext(context.Context) (*BaseClient, error)

DialContext wraps Dial without context.

WARNING: passed context is ignored by NoContextDialer. Make sure timeout is handled inside NoContextDialer.

type NoContextDialerIface

type NoContextDialerIface interface {
	Dial() (*BaseClient, error)
}

NoContextDialerIface is a Dialer interface of mqtt-go<1.14.

type ProtocolLevel

type ProtocolLevel byte

ProtocolLevel represents MQTT protocol level.

const (
	ProtocolLevel3 ProtocolLevel = 0x03 // MQTT 3.1
	ProtocolLevel4 ProtocolLevel = 0x04 // MQTT 3.1.1 (default)
)

ProtocolLevel values.

type QoS

type QoS uint8

QoS represents quality of service level.

const (
	QoS0             QoS = 0x00 // At most once delivery
	QoS1             QoS = 0x01 // At least once delivery
	QoS2             QoS = 0x02 // Exactly once delivery
	SubscribeFailure QoS = 0x80 // Rejected to subscribe
)

QoS values.

type ReconnectClient

type ReconnectClient interface {
	Client
	Retryer
}

ReconnectClient is a Client with reconnect and retry features.

func NewReconnectClient

func NewReconnectClient(dialer Dialer, opts ...ReconnectOption) (ReconnectClient, error)

NewReconnectClient creates a MQTT client with re-connect/re-publish/re-subscribe features.

type ReconnectOption

type ReconnectOption func(*ReconnectOptions) error

ReconnectOption sets option for Connect.

func WithAlwaysResubscribe

func WithAlwaysResubscribe(always bool) ReconnectOption

WithAlwaysResubscribe enables or disables re-subscribe on reconnect. Default value is false. This option can be used to ensure all subscriptions are restored even if the server is buggy.

func WithPingInterval

func WithPingInterval(interval time.Duration) ReconnectOption

WithPingInterval sets ping request interval. Default value is KeepAlive value set by ConnectOption.

func WithReconnectWait

func WithReconnectWait(base, max time.Duration) ReconnectOption

WithReconnectWait sets parameters of incremental reconnect wait.

func WithRetryClient

func WithRetryClient(cli *RetryClient) ReconnectOption

WithRetryClient sets RetryClient. Default value is zero RetryClient.

func WithTimeout

func WithTimeout(timeout time.Duration) ReconnectOption

WithTimeout sets timeout duration of server response. Default value is PingInterval.

type ReconnectOptions

type ReconnectOptions struct {
	ConnectOptions    []ConnectOption
	Timeout           time.Duration
	ReconnectWaitBase time.Duration
	ReconnectWaitMax  time.Duration
	PingInterval      time.Duration
	RetryClient       *RetryClient
	AlwaysResubscribe bool
}

ReconnectOptions represents options for Connect.

type RequestTimeoutError

type RequestTimeoutError struct {
	// contains filtered or unexported fields
}

RequestTimeoutError is a context deadline exceeded error caused by RetryClient.ResponseTimeout.

func (*RequestTimeoutError) Error

func (e *RequestTimeoutError) Error() string

Error implements error.

type RetryClient

type RetryClient struct {

	// Maximum duration to wait for acknoledge response.
	// Messages with QoS1 and QoS2 will be retried.
	ResponseTimeout time.Duration

	// Directly publish QoS0 messages without queuing.
	// It will cause inorder of the messages but performance may be increased.
	DirectlyPublishQoS0 bool

	// Callback to receive background errors on raw message publish/subscribe operations.
	OnError func(error)
	// contains filtered or unexported fields
}

RetryClient queues unacknowledged messages and retry on reconnect.

func (*RetryClient) Client

func (c *RetryClient) Client() *BaseClient

Client returns the base client.

func (*RetryClient) Connect

func (c *RetryClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)

Connect to the broker.

func (*RetryClient) Disconnect

func (c *RetryClient) Disconnect(ctx context.Context) error

Disconnect from the broker.

func (*RetryClient) Handle

func (c *RetryClient) Handle(handler Handler)

Handle registers the message handler.

func (*RetryClient) Ping

func (c *RetryClient) Ping(ctx context.Context) error

Ping to the broker.

func (*RetryClient) Publish

func (c *RetryClient) Publish(ctx context.Context, message *Message) error

Publish tries to publish the message and immediately returns. If it is not acknowledged to be published, the message will be queued.

func (*RetryClient) Resubscribe

func (c *RetryClient) Resubscribe(ctx context.Context)

Resubscribe subscribes all established subscriptions.

func (*RetryClient) Retry

func (c *RetryClient) Retry(ctx context.Context)

Retry all queued publish/subscribe requests.

func (*RetryClient) SetClient

func (c *RetryClient) SetClient(ctx context.Context, cli *BaseClient)

SetClient sets the new BaseClient. Call Retry() and Resubscribe() to process queued messages and subscriptions. The BaseClient must be unconnected when it is passed to the RetryClient.

func (*RetryClient) Stats

func (c *RetryClient) Stats() RetryStats

Stats returns retry stats.

func (*RetryClient) Subscribe

func (c *RetryClient) Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)

Subscribe tries to subscribe the topic and immediately return nil. If it is not acknowledged to be subscribed, the request will be queued. First return value ([]Subscription) is always nil.

func (*RetryClient) Unsubscribe

func (c *RetryClient) Unsubscribe(ctx context.Context, topics ...string) error

Unsubscribe tries to unsubscribe the topic and immediately return nil. If it is not acknowledged to be unsubscribed, the request will be queued.

type RetryStats

type RetryStats struct {
	// Number of queued tasks.
	QueuedTasks int
	// Number of queued messages and subscriptions.
	QueuedRetries int
	// Total number of proceeded tasks.
	TotalTasks int
	// Total number of retries.
	TotalRetries int
	// Count of SetClient.
	CountSetClient int
	// Count of Connect.
	CountConnect int
	// Count of error on Connect.
	CountConnectError int
}

RetryStats stores retry statistics.

type Retryer

type Retryer interface {
	// SetClient sets the new BaseClient.
	// Call Retry() and Resubscribe() to process queued messages and subscriptions.
	// The BaseClient must be unconnected when it is passed to the RetryClient.
	SetClient(ctx context.Context, cli *BaseClient)
	// Client returns the base client.
	Client() *BaseClient
	// Resubscribe subscribes all established subscriptions.
	Resubscribe(ctx context.Context)
	// Retry all queued publish/subscribe requests.
	Retry(ctx context.Context)
	// Stat returns retry stats.
	Stats() RetryStats
}

Retryer is an interface to control message retrying.

type ServeAsync

type ServeAsync struct {
	// Handler is an underlying handler.
	// Handler.Serve() will be called asynchronously.
	Handler
}

ServeAsync is a MQTT message handler to process messages asynchronously.

func (*ServeAsync) Serve

func (m *ServeAsync) Serve(message *Message)

Serve the message in a new goroutine.

type ServeMux

type ServeMux struct {
	// contains filtered or unexported fields
}

ServeMux is a MQTT message handler multiplexer. The idea is very similar to http.ServeMux.

func (*ServeMux) Handle

func (m *ServeMux) Handle(filter string, handler Handler) error

Handle registers the handler for the given pattern.

func (*ServeMux) HandleFunc

func (m *ServeMux) HandleFunc(filter string, handler func(*Message)) error

HandleFunc registers the handler function for the given pattern.

func (*ServeMux) Serve

func (m *ServeMux) Serve(message *Message)

Serve dispatches the message to the registered handlers.

type Subscription

type Subscription struct {
	Topic string
	QoS   QoS
}

Subscription represents MQTT subscription target.

type URLDialer

type URLDialer struct {
	URL     string
	Options []DialOption
}

URLDialer is a Dialer using URL string.

func (*URLDialer) DialContext

func (d *URLDialer) DialContext(ctx context.Context) (*BaseClient, error)

DialContext creates connection using its values.

Directories

Path Synopsis
examples
internal
filteredpipe
Package filteredpipe provides pipes with interceptor for testing.
Package filteredpipe provides pipes with interceptor for testing.
Package mockmqtt provides simple standalone mock of mqtt.Client.
Package mockmqtt provides simple standalone mock of mqtt.Client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL