Documentation
¶
Overview ¶
Package mqtt is a thread safe and context controlled MQTT 3.1.1 client library.
Index ¶
- Variables
- func KeepAlive(ctx context.Context, cli Client, interval, timeout time.Duration) error
- type BaseClient
- func (c *BaseClient) Close() error
- func (c *BaseClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)
- func (c *BaseClient) Disconnect(ctx context.Context) error
- func (c *BaseClient) Done() <-chan struct{}
- func (c *BaseClient) Err() error
- func (c *BaseClient) Handle(handler Handler)
- func (c *BaseClient) Ping(ctx context.Context) error
- func (c *BaseClient) Publish(ctx context.Context, message *Message) error
- func (c *BaseClient) SetErrorOnce(err error)
- func (c *BaseClient) Stats() BaseStats
- func (c *BaseClient) Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)
- func (c *BaseClient) Unsubscribe(ctx context.Context, subs ...string) error
- func (c *BaseClient) ValidateMessage(message *Message) error
- type BaseClientStoreDialer
- type BaseStats
- type Client
- type ClientCloser
- type Closer
- type ConnState
- type ConnectOption
- type ConnectOptions
- type ConnectionError
- type ConnectionReturnCode
- type DialOption
- type DialOptions
- type Dialer
- type DialerFunc
- type Error
- type ErrorWithRetry
- type Handler
- type HandlerFunc
- type Message
- type NoContextDialer
- type NoContextDialerIface
- type ProtocolLevel
- type QoS
- type ReconnectClient
- type ReconnectOption
- type ReconnectOptions
- type RequestTimeoutError
- type RetryClient
- func (c *RetryClient) Client() *BaseClient
- func (c *RetryClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)
- func (c *RetryClient) Disconnect(ctx context.Context) error
- func (c *RetryClient) Handle(handler Handler)
- func (c *RetryClient) Ping(ctx context.Context) error
- func (c *RetryClient) Publish(ctx context.Context, message *Message) error
- func (c *RetryClient) Resubscribe(ctx context.Context)
- func (c *RetryClient) Retry(ctx context.Context)
- func (c *RetryClient) SetClient(ctx context.Context, cli *BaseClient)
- func (c *RetryClient) Stats() RetryStats
- func (c *RetryClient) Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)
- func (c *RetryClient) Unsubscribe(ctx context.Context, topics ...string) error
- type RetryStats
- type Retryer
- type ServeAsync
- type ServeMux
- type Subscription
- type URLDialer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosedClient = errors.New("operation on closed client")
ErrClosedClient means operation was requested on closed client.
var ErrClosedTransport = errors.New("read/write on closed transport")
ErrClosedTransport means that the underlying connection is closed.
var ErrConnectionFailed = errors.New("connection failed")
ErrConnectionFailed means the connection is not established.
var ErrInvalidPacket = errors.New("invalid packet")
ErrInvalidPacket means that an invalid message is arrived from the broker.
var ErrInvalidPacketLength = errors.New("invalid packet length")
ErrInvalidPacketLength means that an invalid length of the message is arrived.
var ErrInvalidQoS = errors.New("invalid QoS")
ErrInvalidQoS means the QoS value is not allowed.
var ErrInvalidRune = errors.New("invalid rune in UTF-8 string")
ErrInvalidRune means that the string has a rune not allowed in MQTT.
var ErrInvalidSubAck = errors.New("invalid SUBACK")
ErrInvalidSubAck means that the incomming SUBACK packet is inconsistent with the request.
var ErrInvalidTopicFilter = errors.New("invalid topic filter")
ErrInvalidTopicFilter means that the topic filter string is invalid.
var ErrKeepAliveDisabled = errors.New("keep alive disabled")
ErrKeepAliveDisabled is returned if Runned on keep alive disabled connection.
var ErrNotConnected = errors.New("not connected")
ErrNotConnected is returned if a function is called before Connect.
var ErrPayloadLenExceeded = errors.New("payload length exceeded")
ErrPayloadLenExceeded means the payload length is too large.
var ErrPingTimeout = errors.New("ping timeout")
ErrPingTimeout is returned on ping response timeout.
var ErrUnsupportedProtocol = errors.New("unsupported protocol")
ErrUnsupportedProtocol means that the specified scheme in the URL is not supported.
Functions ¶
Types ¶
type BaseClient ¶
type BaseClient struct { // Transport is an underlying connection. Typically net.Conn. Transport io.ReadWriteCloser // ConnState is called if the connection state is changed. ConnState func(ConnState, error) // MaxPayloadLen is a maximum allowed length of message payload. // 0 means unlimited. (It will panic if exceeds protocol maximum message length (256MB).) MaxPayloadLen int // contains filtered or unexported fields }
BaseClient is a low layer MQTT client. Zero values with valid underlying Transport is a valid BaseClient.
func DialContext ¶ added in v0.14.0
func DialContext(ctx context.Context, urlStr string, opts ...DialOption) (*BaseClient, error)
DialContext creates MQTT client using URL string.
func (*BaseClient) Connect ¶
func (c *BaseClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)
Connect to the broker.
func (*BaseClient) Disconnect ¶
func (c *BaseClient) Disconnect(ctx context.Context) error
Disconnect from the broker.
func (*BaseClient) Done ¶ added in v0.1.0
func (c *BaseClient) Done() <-chan struct{}
Done is a channel to signal connection close.
func (*BaseClient) Err ¶ added in v0.1.0
func (c *BaseClient) Err() error
Err returns connection error.
func (*BaseClient) Handle ¶ added in v0.1.0
func (c *BaseClient) Handle(handler Handler)
Handle registers the message handler.
func (*BaseClient) Publish ¶
func (c *BaseClient) Publish(ctx context.Context, message *Message) error
Publish a message to the broker. ID field of the message is filled if zero.
func (*BaseClient) SetErrorOnce ¶ added in v0.11.1
func (c *BaseClient) SetErrorOnce(err error)
SetErrorOnce sets client error value if not yet set.
func (*BaseClient) Stats ¶ added in v0.19.0
func (c *BaseClient) Stats() BaseStats
Stats returns base client stats.
func (*BaseClient) Subscribe ¶
func (c *BaseClient) Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)
Subscribe topics.
func (*BaseClient) Unsubscribe ¶
func (c *BaseClient) Unsubscribe(ctx context.Context, subs ...string) error
Unsubscribe topics.
func (*BaseClient) ValidateMessage ¶ added in v0.10.0
func (c *BaseClient) ValidateMessage(message *Message) error
ValidateMessage validates given message.
type BaseClientStoreDialer ¶ added in v0.13.0
type BaseClientStoreDialer struct { // Dialer is a wrapped dialer. Valid Dialer must be set before use. Dialer // contains filtered or unexported fields }
BaseClientStoreDialer is a dialer wrapper which stores the latest BaseClient.
Example ¶
dialer := &BaseClientStoreDialer{Dialer: &URLDialer{URL: "mqtt://localhost:1883"}} cli, err := NewReconnectClient(dialer) if err != nil { panic(err) } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if _, err := cli.Connect(ctx, "TestClient"); err != nil { panic(err) } defer cli.Disconnect(context.Background()) // Publish asynchronously if err := cli.Publish( ctx, &Message{Topic: "test", QoS: QoS1, Payload: []byte("async")}, ); err != nil { panic(err) } // Publish synchronously if err := dialer.BaseClient().Publish( ctx, &Message{Topic: "test", QoS: QoS1, Payload: []byte("sync")}, ); err != nil { panic(err) }
Output:
func (*BaseClientStoreDialer) BaseClient ¶ added in v0.13.0
func (d *BaseClientStoreDialer) BaseClient() *BaseClient
BaseClient returns latest BaseClient created by the dialer. It returns nil before the first call of Dial.
func (*BaseClientStoreDialer) DialContext ¶ added in v0.14.0
func (d *BaseClientStoreDialer) DialContext(ctx context.Context) (*BaseClient, error)
DialContext creates a new BaseClient.
type BaseStats ¶ added in v0.19.0
type BaseStats struct { // Recent ping delay. PingDelayRecent time.Duration // Maximum ping delay. PingDelayMax time.Duration // Minimum ping delay. PingDelayMin time.Duration // Count of ping error. CountPingError int }
BaseStats stores base client statistics.
type Client ¶
type Client interface { Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error) Disconnect(ctx context.Context) error Publish(ctx context.Context, message *Message) error Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error) Unsubscribe(ctx context.Context, subs ...string) error Ping(ctx context.Context) error Handle(Handler) }
Client is the interface of MQTT client.
type ClientCloser ¶ added in v0.1.0
ClientCloser groups Client and Closer interface
type ConnState ¶
type ConnState int
ConnState represents the status of MQTT connection.
type ConnectOption ¶
type ConnectOption func(*ConnectOptions) error
ConnectOption sets option for Connect.
func WithCleanSession ¶
func WithCleanSession(cleanSession bool) ConnectOption
WithCleanSession sets clean session flag.
func WithKeepAlive ¶ added in v0.1.0
func WithKeepAlive(interval uint16) ConnectOption
WithKeepAlive sets keep alive interval in seconds.
func WithProtocolLevel ¶ added in v0.3.0
func WithProtocolLevel(level ProtocolLevel) ConnectOption
WithProtocolLevel sets protocol level.
func WithUserNamePassword ¶
func WithUserNamePassword(userName, password string) ConnectOption
WithUserNamePassword sets plain text auth information used in Connect.
type ConnectOptions ¶
type ConnectOptions struct { UserName string Password string CleanSession bool KeepAlive uint16 Will *Message ProtocolLevel ProtocolLevel }
ConnectOptions represents options for Connect.
type ConnectionError ¶ added in v0.4.4
type ConnectionError struct { Err error Code ConnectionReturnCode }
ConnectionError ia a error storing connection return code.
func (*ConnectionError) Error ¶ added in v0.4.4
func (e *ConnectionError) Error() string
func (*ConnectionError) Unwrap ¶ added in v0.4.4
func (e *ConnectionError) Unwrap() error
Unwrap returns base error of ConnectionError. (for Go1.13 error unwrapping.)
type ConnectionReturnCode ¶
type ConnectionReturnCode byte
ConnectionReturnCode represents return code of connect request.
const ( ConnectionAccepted ConnectionReturnCode = 0 UnacceptableProtocolVersion ConnectionReturnCode = 1 IdentifierRejected ConnectionReturnCode = 2 BadUserNameOrPassword ConnectionReturnCode = 4 NotAuthorized ConnectionReturnCode = 5 )
Connection acceptance/rejection code.
func (ConnectionReturnCode) String ¶
func (c ConnectionReturnCode) String() string
type DialOption ¶
type DialOption func(*DialOptions) error
DialOption sets option for Dial.
func WithConnStateHandler ¶ added in v0.6.0
func WithConnStateHandler(handler func(ConnState, error)) DialOption
WithConnStateHandler sets connection state change handler.
func WithMaxPayloadLen ¶ added in v0.10.0
func WithMaxPayloadLen(l int) DialOption
WithMaxPayloadLen sets maximum payload length of the BaseClient.
func WithTLSCertFiles ¶ added in v0.9.0
func WithTLSCertFiles(host, caFile, certFile, privateKeyFile string) DialOption
WithTLSCertFiles loads certificate files
func WithTLSConfig ¶
func WithTLSConfig(config *tls.Config) DialOption
WithTLSConfig sets TLS configuration.
type DialOptions ¶
type DialOptions struct { Dialer *net.Dialer TLSConfig *tls.Config ConnState func(ConnState, error) MaxPayloadLen int }
DialOptions stores options for Dial.
type Dialer ¶ added in v0.1.0
type Dialer interface {
DialContext(context.Context) (*BaseClient, error)
}
Dialer is an interface to create connection.
type DialerFunc ¶ added in v0.4.1
type DialerFunc func(ctx context.Context) (*BaseClient, error)
DialerFunc type is an adapter to use functions as MQTT connection dialer.
func (DialerFunc) DialContext ¶ added in v0.14.0
func (d DialerFunc) DialContext(ctx context.Context) (*BaseClient, error)
DialContext calls d().
type Error ¶ added in v0.6.0
Error records a failed parsing.
type ErrorWithRetry ¶ added in v0.12.0
type ErrorWithRetry interface { error Retry(context.Context, *BaseClient) error }
ErrorWithRetry is a error with packets which should be retransmitted.
type HandlerFunc ¶
type HandlerFunc func(*Message)
HandlerFunc type is an adapter to use functions as MQTT message handler.
type NoContextDialer ¶ added in v0.14.0
type NoContextDialer struct {
NoContextDialerIface
}
NoContextDialer is a wrapper to use Dialer of mqtt-go<1.14 as mqtt-go>=1.14 Dialer.
WARNING: passed context is ignored by NoContextDialer. Make sure timeout is handled inside NoContextDialer.
Example ¶
d := oldDialer() cli, err := NewReconnectClient(&NoContextDialer{d}) if err != nil { fmt.Println("error:", err.Error()) return } cli.Handle(HandlerFunc(func(*Message) {})) fmt.Println("ok")
Output: ok
func (*NoContextDialer) DialContext ¶ added in v0.14.0
func (d *NoContextDialer) DialContext(context.Context) (*BaseClient, error)
DialContext wraps Dial without context.
WARNING: passed context is ignored by NoContextDialer. Make sure timeout is handled inside NoContextDialer.
type NoContextDialerIface ¶ added in v0.14.0
type NoContextDialerIface interface {
Dial() (*BaseClient, error)
}
NoContextDialerIface is a Dialer interface of mqtt-go<1.14.
type ProtocolLevel ¶ added in v0.3.0
type ProtocolLevel byte
ProtocolLevel represents MQTT protocol level.
const ( ProtocolLevel3 ProtocolLevel = 0x03 // MQTT 3.1 ProtocolLevel4 ProtocolLevel = 0x04 // MQTT 3.1.1 (default) )
ProtocolLevel values.
type ReconnectClient ¶ added in v0.15.0
ReconnectClient is a Client with reconnect and retry features.
func NewReconnectClient ¶ added in v0.1.0
func NewReconnectClient(dialer Dialer, opts ...ReconnectOption) (ReconnectClient, error)
NewReconnectClient creates a MQTT client with re-connect/re-publish/re-subscribe features.
type ReconnectOption ¶ added in v0.4.0
type ReconnectOption func(*ReconnectOptions) error
ReconnectOption sets option for Connect.
func WithAlwaysResubscribe ¶ added in v0.18.0
func WithAlwaysResubscribe(always bool) ReconnectOption
WithAlwaysResubscribe enables or disables re-subscribe on reconnect. Default value is false. This option can be used to ensure all subscriptions are restored even if the server is buggy.
func WithPingInterval ¶ added in v0.4.0
func WithPingInterval(interval time.Duration) ReconnectOption
WithPingInterval sets ping request interval. Default value is KeepAlive value set by ConnectOption.
func WithReconnectWait ¶ added in v0.4.0
func WithReconnectWait(base, max time.Duration) ReconnectOption
WithReconnectWait sets parameters of incremental reconnect wait.
func WithRetryClient ¶ added in v0.16.0
func WithRetryClient(cli *RetryClient) ReconnectOption
WithRetryClient sets RetryClient. Default value is zero RetryClient.
func WithTimeout ¶ added in v0.4.0
func WithTimeout(timeout time.Duration) ReconnectOption
WithTimeout sets timeout duration of server response. Default value is PingInterval.
type ReconnectOptions ¶ added in v0.4.0
type ReconnectOptions struct { ConnectOptions []ConnectOption Timeout time.Duration ReconnectWaitBase time.Duration ReconnectWaitMax time.Duration PingInterval time.Duration RetryClient *RetryClient AlwaysResubscribe bool }
ReconnectOptions represents options for Connect.
type RequestTimeoutError ¶ added in v0.16.0
type RequestTimeoutError struct {
// contains filtered or unexported fields
}
RequestTimeoutError is a context deadline exceeded error caused by RetryClient.ResponseTimeout.
func (*RequestTimeoutError) Error ¶ added in v0.16.0
func (e *RequestTimeoutError) Error() string
Error implements error.
type RetryClient ¶ added in v0.1.0
type RetryClient struct { // Maximum duration to wait for acknoledge response. // Messages with QoS1 and QoS2 will be retried. ResponseTimeout time.Duration // Directly publish QoS0 messages without queuing. // It will cause inorder of the messages but performance may be increased. DirectlyPublishQoS0 bool // Callback to receive background errors on raw message publish/subscribe operations. OnError func(error) // contains filtered or unexported fields }
RetryClient queues unacknowledged messages and retry on reconnect.
func (*RetryClient) Client ¶ added in v0.11.1
func (c *RetryClient) Client() *BaseClient
Client returns the base client.
func (*RetryClient) Connect ¶ added in v0.1.0
func (c *RetryClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)
Connect to the broker.
func (*RetryClient) Disconnect ¶ added in v0.4.3
func (c *RetryClient) Disconnect(ctx context.Context) error
Disconnect from the broker.
func (*RetryClient) Handle ¶ added in v0.1.0
func (c *RetryClient) Handle(handler Handler)
Handle registers the message handler.
func (*RetryClient) Ping ¶ added in v0.4.3
func (c *RetryClient) Ping(ctx context.Context) error
Ping to the broker.
func (*RetryClient) Publish ¶ added in v0.1.0
func (c *RetryClient) Publish(ctx context.Context, message *Message) error
Publish tries to publish the message and immediately returns. If it is not acknowledged to be published, the message will be queued.
func (*RetryClient) Resubscribe ¶ added in v0.1.0
func (c *RetryClient) Resubscribe(ctx context.Context)
Resubscribe subscribes all established subscriptions.
func (*RetryClient) Retry ¶ added in v0.4.7
func (c *RetryClient) Retry(ctx context.Context)
Retry all queued publish/subscribe requests.
func (*RetryClient) SetClient ¶ added in v0.1.0
func (c *RetryClient) SetClient(ctx context.Context, cli *BaseClient)
SetClient sets the new BaseClient. Call Retry() and Resubscribe() to process queued messages and subscriptions. The BaseClient must be unconnected when it is passed to the RetryClient.
func (*RetryClient) Stats ¶ added in v0.15.0
func (c *RetryClient) Stats() RetryStats
Stats returns retry stats.
func (*RetryClient) Subscribe ¶ added in v0.1.0
func (c *RetryClient) Subscribe(ctx context.Context, subs ...Subscription) ([]Subscription, error)
Subscribe tries to subscribe the topic and immediately return nil. If it is not acknowledged to be subscribed, the request will be queued. First return value ([]Subscription) is always nil.
func (*RetryClient) Unsubscribe ¶ added in v0.4.3
func (c *RetryClient) Unsubscribe(ctx context.Context, topics ...string) error
Unsubscribe tries to unsubscribe the topic and immediately return nil. If it is not acknowledged to be unsubscribed, the request will be queued.
type RetryStats ¶ added in v0.15.0
type RetryStats struct { // Number of queued tasks. QueuedTasks int // Number of queued messages and subscriptions. QueuedRetries int // Total number of proceeded tasks. TotalTasks int // Total number of retries. TotalRetries int // Count of SetClient. CountSetClient int // Count of Connect. CountConnect int // Count of error on Connect. CountConnectError int }
RetryStats stores retry statistics.
type Retryer ¶ added in v0.15.0
type Retryer interface { // SetClient sets the new BaseClient. // Call Retry() and Resubscribe() to process queued messages and subscriptions. // The BaseClient must be unconnected when it is passed to the RetryClient. SetClient(ctx context.Context, cli *BaseClient) // Client returns the base client. Client() *BaseClient // Resubscribe subscribes all established subscriptions. Resubscribe(ctx context.Context) // Retry all queued publish/subscribe requests. Retry(ctx context.Context) // Stat returns retry stats. Stats() RetryStats }
Retryer is an interface to control message retrying.
type ServeAsync ¶ added in v0.7.0
type ServeAsync struct { // Handler is an underlying handler. // Handler.Serve() will be called asynchronously. Handler }
ServeAsync is a MQTT message handler to process messages asynchronously.
func (*ServeAsync) Serve ¶ added in v0.7.0
func (m *ServeAsync) Serve(message *Message)
Serve the message in a new goroutine.
type ServeMux ¶
type ServeMux struct {
// contains filtered or unexported fields
}
ServeMux is a MQTT message handler multiplexer. The idea is very similar to http.ServeMux.
func (*ServeMux) HandleFunc ¶
HandleFunc registers the handler function for the given pattern.
type Subscription ¶
Subscription represents MQTT subscription target.
type URLDialer ¶ added in v0.1.0
type URLDialer struct { URL string Options []DialOption }
URLDialer is a Dialer using URL string.
func (*URLDialer) DialContext ¶ added in v0.14.0
func (d *URLDialer) DialContext(ctx context.Context) (*BaseClient, error)
DialContext creates connection using its values.
Source Files
¶
- atomic.go
- client.go
- conn.go
- connack.go
- connect.go
- dialer.go
- disconnect.go
- error.go
- filter.go
- keepalive.go
- message.go
- mqtt.go
- packet.go
- pingreq.go
- pingresp.go
- puback.go
- pubcomp.go
- publish.go
- pubrec.go
- pubrel.go
- reconnclient.go
- retryclient.go
- serve.go
- serveasync.go
- servemux.go
- suback.go
- subscribe.go
- subscriptions.go
- uniqid.go
- unsuback.go
- unsubscribe.go
Directories
¶
Path | Synopsis |
---|---|
examples
|
|
internal
|
|
filteredpipe
Package filteredpipe provides pipes with interceptor for testing.
|
Package filteredpipe provides pipes with interceptor for testing. |
Package mockmqtt provides simple standalone mock of mqtt.Client.
|
Package mockmqtt provides simple standalone mock of mqtt.Client. |
paho
module
|