Documentation
¶
Overview ¶
Package mq provides a lightweight, idiomatic MQTT v5.0 and v3.1.1 client library for Go.
The library is designed with zero external dependencies (only Go standard library) and provides a clean, functional options-based API for connecting to MQTT servers, publishing messages, and subscribing to topics.
Features ¶
- Full MQTT v5.0 and v3.1.1 support
- (v5.0) User Properties & Packet Properties
- (v5.0) Topic Aliases (auto-managed)
- (v5.0) Request/Response pattern support
- (v5.0) Session & Message Expiry
- (v5.0) Shared Subscriptions
- (v5.0) Reason Codes & Enhanced Error Handling
- TLS/SSL encrypted connections
- Automatic reconnection with exponential backoff
- Clean, idiomatic Go API with functional options
- Context-based cancellation and timeouts
- Zero external dependencies (main library)
Unified API Philosophy ¶
The library exposes a single, unified API that embraces modern MQTT v5.0 concepts (Properties, Reason Codes, Session Expiry). When connecting to an MQTT v3.1.1 server, these v5-specific features are handled gracefully—they are simply ignored during packet encoding. This allows you to write code once using modern idioms while maintaining compatibility with older servers.
Quick Start ¶
Connect to a server and publish a message:
client, err := mq.Dial("tcp://localhost:1883",
mq.WithClientID("my-client"),
mq.WithProtocolVersion(mq.ProtocolV50)) // Use MQTT v5.0
if err != nil {
log.Fatal(err)
}
defer client.Disconnect(context.Background())
token := client.Publish("sensors/temperature", []byte("22.5"), mq.WithQoS(1))
err = token.Wait(context.Background()) // 'select' also supported, see further down
Subscribe to a topic:
client.Subscribe("sensors/+/temperature", mq.AtLeastOnce,
func(c *mq.Client, msg mq.Message) {
fmt.Printf("%s: %s\n", msg.Topic, string(msg.Payload))
})
Connection Options ¶
The Dial and DialContext functions accept various options to configure the client:
- WithProtocolVersion(v) - Set MQTT version (ProtocolV50 or ProtocolV311)
- WithAutoProtocolVersion(bool) - Enable automatic protocol version negotiation (default: true)
- WithClientID(id) - Set the MQTT client identifier
- WithCredentials(user, pass) - Set username and password
- WithKeepAlive(duration) - Set keepalive interval (default: 60s)
- WithCleanSession(bool) - Set clean start/session flag
- WithSessionExpiryInterval(secs) - Set session expiry (v5.0)
- WithConnectUserProperties(map) - Set user properties for CONNECT (v5.0)
- WithAutoReconnect(bool) - Enable auto-reconnect (default: true)
- WithTLS(config) - Enable TLS encryption
- WithWill(topic, payload, qos, retained) - Set Last Will and Testament
- WithOutgoingQueueSize(int) - Set internal outgoing buffer size
- WithIncomingQueueSize(int) - Set internal incoming buffer size
- WithQoS0LimitPolicy(policy) - Set reliability policy for QoS 0
- WithHandlerInterceptor(interceptor) - Add an interceptor for incoming messages
- WithPublishInterceptor(interceptor) - Add an interceptor for outgoing messages
Interceptors (Middleware) ¶
The library supports an interceptor pattern (middleware) for both incoming and outgoing messages. This is useful for cross-cutting concerns like logging, metrics, tracing (OpenTelemetry), or message auditing.
Handler Interceptors wrap message handlers for incoming messages:
loggingInterceptor := func(next mq.MessageHandler) mq.MessageHandler {
return func(c *mq.Client, m mq.Message) {
log.Printf("Received: %s", m.Topic)
next(c, m)
}
}
client, _ := mq.Dial(uri, mq.WithHandlerInterceptor(loggingInterceptor))
Publish Interceptors wrap the Publish call for outgoing messages:
tracingInterceptor := func(next mq.PublishFunc) mq.PublishFunc {
return func(topic string, payload []byte, opts ...mq.PublishOption) mq.Token {
// Inject tracing headers into opts here
return next(topic, payload, opts...)
}
}
client, _ := mq.Dial(uri, mq.WithPublishInterceptor(tracingInterceptor))
TLS Connections ¶
The library supports TLS/SSL encrypted connections:
client, err := mq.Dial("tls://server:8883",
mq.WithClientID("secure-client"),
mq.WithTLS(&tls.Config{
InsecureSkipVerify: false,
}))
Supported URL schemes: tcp://, mqtt://, tls://, ssl://, mqtts://
Quality of Service ¶
The library supports all three MQTT QoS levels:
- QoS 0 (mq.AtMostOnce): At most once delivery (fire and forget)
- QoS 1 (mq.AtLeastOnce): At least once delivery (acknowledged)
- QoS 2 (mq.ExactlyOnce): Exactly once delivery (assured)
Example:
// Using named constants (recommended)
client.Publish("topic", []byte("data"), mq.WithQoS(mq.AtLeastOnce))
// Using numeric values
client.Publish("topic", []byte("data"), mq.WithQoS(1))
Wildcard Subscriptions ¶
MQTT supports two wildcard characters in topic filters:
- '+' matches a single level (e.g., "sensors/+/temperature")
- '#' matches multiple levels (e.g., "sensors/#")
Example:
// Subscribe to all temperature sensors
client.Subscribe("sensors/+/temperature", mq.AtLeastOnce, handler)
// Subscribe to all sensor data
client.Subscribe("sensors/#", mq.AtMostOnce, handler)
MQTT v5.0 Properties ¶
MQTT v5.0 introduces "Properties" that can be attached to packets. This library provides a clean API for using common properties:
client.Publish("sensors/temp", payload,
mq.WithContentType("application/json"),
mq.WithUserProperty("sensor-id", "temp-01"),
mq.WithMessageExpiry(3600)) // Expire in 1 hour
Supported properties include:
- ContentType: Specifies the MIME type of the payload
- MessageExpiry: How long the message should be kept by the server
- UserProperties: Custom key-value pairs (metadata)
- ConnectionUserProperties: Custom metadata received from the server on connect (v5.0)
- ResponseTopic & CorrelationData: For request/response patterns
Topic Aliases ¶
Topic Aliases (v5.0) allow reducing bandwidth by using a short numeric ID instead of the full topic string for repeated publications.
// Enable topic alias for this publication
client.Publish("very/long/topic/name/for/bandwidth/saving", data,
mq.WithAlias())
The library automatically manages alias assignment and mapping.
Subscription Options (v5.0) ¶
MQTT v5.0 adds options to control subscription behavior:
- WithNoLocal: Don't receive messages you published yourself
- WithRetainAsPublished: Keep the original retain flag from the publisher
- WithRetainHandling: Control when the server sends retained messages
- WithSubscriptionIdentifier: Set a numeric identifier for the subscription
- WithSubscribeUserProperty: Add custom metadata to the subscription
Example:
client.Subscribe("chat/room", mq.AtLeastOnce, handler,
mq.WithNoLocal(true))
Client-side Session Persistence ¶
The library supports pluggable session persistence to save pending messages (QoS 1 & 2) and subscriptions across restarts.
store, _ := mq.NewFileStore("/path/to/persist", "client-id")
client, _ := mq.Dial(server,
mq.WithClientID("client-id"),
mq.WithCleanSession(false),
mq.WithSessionStore(store),
// persistent subscription
mq.WithSubscription("topic", handler),
)
Error Handling ¶
Operations return a Token that can be used for both blocking and non-blocking error handling. In MQTT v5.0, errors often include Reason Codes.
// Blocking with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := token.Wait(ctx); err != nil {
// Check for specific reason codes (v5.0) using errors.Is
if errors.Is(err, mq.ReasonCodeUnspecifiedError) {
log.Printf("Server rejected operation: %v", err)
}
}
// Non-blocking with select
select {
case <-token.Done():
if err := token.Error(); err != nil {
log.Printf("Failed: %v", err)
}
case <-time.After(5 * time.Second):
log.Println("Timeout")
}
// Connection can be closed with a specific reason code and properties (MQTT v5.0):
expiry := uint32(3600)
client.Disconnect(ctx,
mq.WithReason(mq.ReasonCodeNormalDisconnect),
mq.WithDisconnectProperties(&mq.Properties{
SessionExpiryInterval: &expiry,
ReasonString: "Shutting down",
}),
)
The client handles reconnection automatically unless configured otherwise.
Index ¶
- Constants
- Variables
- func MatchTopic(filter, topic string) bool
- type Authenticator
- type Client
- func (c *Client) AssignedClientID() string
- func (c *Client) ConnectionUserProperties() map[string]string
- func (c *Client) Disconnect(ctx context.Context, opts ...DisconnectOption) error
- func (c *Client) GetStats() ClientStats
- func (c *Client) IsConnected() bool
- func (c *Client) Publish(topic string, payload []byte, opts ...PublishOption) Token
- func (c *Client) Reauthenticate(ctx context.Context) error
- func (c *Client) ResponseInformation() string
- func (c *Client) ServerCapabilities() ServerCapabilities
- func (c *Client) ServerKeepAlive() uint16
- func (c *Client) ServerReference() string
- func (c *Client) SessionExpiryInterval() uint32
- func (c *Client) Subscribe(topic string, qos QoS, handler MessageHandler, opts ...SubscribeOption) Token
- func (c *Client) Unsubscribe(topic string, opts ...UnsubscribeOption) Token
- type ClientStats
- type ContextDialer
- type DialFunc
- type DisconnectError
- type DisconnectOption
- type DisconnectOptions
- type FileStore
- func (f *FileStore) Clear() error
- func (f *FileStore) ClearPendingPublishes() error
- func (f *FileStore) ClearReceivedQoS2() error
- func (f *FileStore) ClientID() string
- func (f *FileStore) DeletePendingPublish(packetID uint16) error
- func (f *FileStore) DeleteReceivedQoS2(packetID uint16) error
- func (f *FileStore) DeleteSubscription(topic string) error
- func (f *FileStore) LoadPendingPublishes() (map[uint16]*PersistedPublish, error)
- func (f *FileStore) LoadReceivedQoS2() (map[uint16]struct{}, error)
- func (f *FileStore) LoadSubscriptions() (map[string]*PersistedSubscription, error)
- func (f *FileStore) SavePendingPublish(packetID uint16, pub *PersistedPublish) error
- func (f *FileStore) SaveReceivedQoS2(packetID uint16) error
- func (f *FileStore) SaveSubscription(topic string, sub *PersistedSubscription) error
- type FileStoreOption
- type HandlerInterceptor
- type LimitPolicy
- type Message
- type MessageHandler
- type MqttError
- type Option
- func WithAuthenticator(auth Authenticator) Option
- func WithAutoProtocolVersion(auto bool) Option
- func WithAutoReconnect(enable bool) Option
- func WithCleanSession(clean bool) Option
- func WithClientID(id string) Option
- func WithConnectTimeout(duration time.Duration) Option
- func WithConnectUserProperties(props map[string]string) Option
- func WithCredentials(username, password string) Option
- func WithDefaultPublishHandler(handler MessageHandler) Option
- func WithDialer(dialer ContextDialer) Option
- func WithHandlerInterceptor(interceptor HandlerInterceptor) Option
- func WithIncomingQueueSize(size int) Option
- func WithKeepAlive(duration time.Duration) Option
- func WithLogger(logger *slog.Logger) Option
- func WithMaxIncomingPacket(max int) Option
- func WithMaxPayloadSize(max int) Option
- func WithMaxTopicLength(max int) Option
- func WithOnConnect(onConnect func(*Client)) Option
- func WithOnConnectionLost(onConnectionLost func(*Client, error)) Option
- func WithOnServerRedirect(onServerRedirect func(serverURI string)) Option
- func WithOutgoingQueueSize(size int) Option
- func WithProtocolVersion(version uint8) Option
- func WithPublishInterceptor(interceptor PublishInterceptor) Option
- func WithQoS0LimitPolicy(policy QoS0LimitPolicy) Option
- func WithReceiveMaximum(max uint16, policy LimitPolicy) Option
- func WithRequestProblemInformation(request bool) Option
- func WithRequestResponseInformation(request bool) Option
- func WithSessionExpiryInterval(seconds uint32) Option
- func WithSessionStore(store SessionStore) Option
- func WithSubscription(topic string, handler MessageHandler) Option
- func WithTLS(config *tls.Config) Option
- func WithTopicAliasMaximum(max uint16) Option
- func WithWill(topic string, payload []byte, qos uint8, retained bool, ...) Option
- type PersistedPublish
- type PersistedSubscription
- type PersistedSubscriptionOptions
- type Properties
- type PublishFunc
- type PublishInterceptor
- type PublishOption
- func WithAlias() PublishOption
- func WithContentType(contentType string) PublishOption
- func WithCorrelationData(data []byte) PublishOption
- func WithMessageExpiry(seconds uint32) PublishOption
- func WithPayloadFormat(format uint8) PublishOption
- func WithProperties(props *Properties) PublishOption
- func WithQoS(qos QoS) PublishOption
- func WithResponseTopic(topic string) PublishOption
- func WithRetain(retain bool) PublishOption
- func WithUserProperty(key, value string) PublishOption
- type PublishOptions
- type PublishProperties
- type QoS
- type QoS0LimitPolicy
- type ReasonCode
- type ServerCapabilities
- type SessionStore
- type SubscribeOption
- func WithNoLocal(noLocal bool) SubscribeOption
- func WithPersistence(persistence bool) SubscribeOption
- func WithRetainAsPublished(retain bool) SubscribeOption
- func WithRetainHandling(handling uint8) SubscribeOption
- func WithSubscribeUserProperty(key, value string) SubscribeOption
- func WithSubscriptionIdentifier(id int) SubscribeOption
- type SubscribeOptions
- type Token
- type UnsubscribeOption
- type UnsubscribeOptions
Examples ¶
Constants ¶
const ( // ProtocolV311 is MQTT version 3.1.1 ProtocolV311 uint8 = 4 // ProtocolV50 is MQTT version 5.0 (default) ProtocolV50 uint8 = 5 )
const ( PayloadFormatBytes uint8 = 0 PayloadFormatUTF8 uint8 = 1 )
Payload format indicators
const ( // DefaultMaxTopicLength is the maximum length of an MQTT topic (2 bytes for length prefix) DefaultMaxTopicLength = 65535 // DefaultMaxPayloadSize is the maximum size of an MQTT message payload (256MB) DefaultMaxPayloadSize = 268435455 // 256MB - 1 // DefaultMaxIncomingPacket is the maximum size of an incoming MQTT packet DefaultMaxIncomingPacket = 268435455 // 256MB - 1 // MaxClientIDLength is the recommended maximum client ID length MaxClientIDLength = 23 )
MQTT specification limits (defaults when not configured)
Variables ¶
var ( // ErrConnectionRefused is returned when the server rejects the connection. // You can unwrap this error to find the specific reason if available. ErrConnectionRefused = errors.New("connection refused") // Specific connection refusal reasons (v3.1.1) ErrUnacceptableProtocolVersion = errors.New("unacceptable protocol version") ErrIdentifierRejected = errors.New("identifier rejected") ErrBadUsernameOrPassword = errors.New("bad username or password") ErrNotAuthorized = errors.New("not authorized") // ErrSubscriptionFailed is returned when the server rejects a subscription. ErrSubscriptionFailed = errors.New("subscription failed") // ErrClientDisconnected is returned when an operation is cancelled because // the client was disconnected or stopped. ErrClientDisconnected = errors.New("client disconnected") )
Standard errors returned by the client
Functions ¶
func MatchTopic ¶ added in v0.9.1
MatchTopic checks if a topic matches a topic filter with MQTT wildcards. It follows MQTT-4.7 rules for wildcard matching.
Supports: - '+' matches exactly one topic level - '#' matches multiple topic levels (must be the last character in the filter)
Example ¶
filter := "sensors/+/temperature"
topic1 := "sensors/living-room/temperature"
topic2 := "sensors/kitchen/humidity"
fmt.Printf("%s matches %s: %v\n", topic1, filter, MatchTopic(filter, topic1))
fmt.Printf("%s matches %s: %v\n", topic2, filter, MatchTopic(filter, topic2))
filterHash := "sensors/#"
topic3 := "sensors/basement/temperature/current"
fmt.Printf("%s matches %s: %v\n", topic3, filterHash, MatchTopic(filterHash, topic3))
Output: sensors/living-room/temperature matches sensors/+/temperature: true sensors/kitchen/humidity matches sensors/+/temperature: false sensors/basement/temperature/current matches sensors/#: true
Types ¶
type Authenticator ¶
type Authenticator interface {
// Method returns the authentication method name.
//
// This is sent in the CONNECT packet's AuthenticationMethod property.
// Common values: "SCRAM-SHA-1", "SCRAM-SHA-256", "OAUTH2", "KERBEROS".
//
// The method name should match what the server expects.
Method() string
// InitialData returns the initial authentication data to send in CONNECT.
//
// This data is included in the CONNECT packet's AuthenticationData property.
// Return nil or empty slice if no initial data is needed.
//
// For SCRAM, this would be the client-first-message.
// For OAuth, this might be an access token.
InitialData() ([]byte, error)
// HandleChallenge processes a challenge from the server and returns response data.
//
// This method is called when the client receives an AUTH packet from the server
// during the authentication exchange. The reasonCode will typically be:
// - 0x18 (Continue authentication) - Server is continuing the exchange
// - 0x00 (Success) - Authentication completed successfully
//
// Return the response data to send back to the server in an AUTH packet.
// Return an error if the challenge cannot be processed or authentication fails.
//
// IMPORTANT: This method is called synchronously in the packet processing loop.
// It should complete quickly (< 100ms) to avoid blocking other packets.
//
// This is especially critical during re-authentication, where a slow HandleChallenge
// will block processing of PUBLISH, PUBACK, and other packets, potentially causing
// timeouts or degraded performance.
//
// If you need to perform expensive operations (network calls, heavy crypto), consider:
// - Pre-computing data in InitialData()
// - Caching results
// - Using fast cryptographic libraries
//
// For most authentication methods (SCRAM, token-based), this is not an issue.
HandleChallenge(challengeData []byte, reasonCode uint8) ([]byte, error)
// Complete is called when authentication succeeds (CONNACK received).
//
// This allows the authenticator to perform any cleanup, store tokens, or
// finalize the authentication state.
//
// Return an error if post-authentication setup fails. This will be logged
// but won't affect the connection (CONNACK was already successful).
Complete() error
}
Authenticator handles the authentication exchange for a specific authentication method.
Users implement this interface to provide custom authentication logic for MQTT v5.0 Enhanced Authentication (AUTH packet flow). This enables support for challenge/response mechanisms such as SCRAM, OAuth, Kerberos, or custom methods.
The authentication flow:
- InitialData() is called to get data for CONNECT packet
- HandleChallenge() is called for each AUTH packet from server
- Complete() is called when CONNACK is received (authentication succeeded)
Example implementation (simple token):
type TokenAuth struct {
token string
}
func (t *TokenAuth) Method() string {
return "TOKEN"
}
func (t *TokenAuth) InitialData() ([]byte, error) {
return []byte(t.token), nil
}
func (t *TokenAuth) HandleChallenge(data []byte, code uint8) ([]byte, error) {
return nil, fmt.Errorf("unexpected challenge")
}
func (t *TokenAuth) Complete() error {
return nil
}
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents an MQTT client connection.
func Dial ¶
Dial establishes a connection to an MQTT server and returns a Client.
It is a wrapper around DialContext that uses the configured connection timeout (see WithConnectTimeout) to control the initial handshake.
The server parameter specifies the server address with scheme and port. Supported schemes:
- tcp:// or mqtt:// - Unencrypted connection (default port 1883)
- tls://, ssl://, or mqtts:// - TLS encrypted connection (default port 8883)
Options can be provided to configure the client behavior. Common options include WithClientID, WithCredentials, WithKeepAlive, WithTLS, and WithAutoReconnect.
The function performs the MQTT handshake and starts background goroutines for reading, writing, and managing the connection. If AutoReconnect is enabled (default: true), the client will automatically reconnect on connection loss.
Example (basic connection):
client, err := mq.Dial("tcp://localhost:1883",
mq.WithClientID("my-client"))
if err != nil {
log.Fatal(err)
}
defer client.Disconnect(context.Background())
Example (with authentication):
client, err := mq.Dial("tcp://server:1883",
mq.WithClientID("secure-client"),
mq.WithCredentials("username", "password"))
Example (TLS connection):
client, err := mq.Dial("tls://server:8883",
mq.WithClientID("tls-client"),
mq.WithTLS(&tls.Config{
InsecureSkipVerify: false,
}))
Example (all options):
client, err := mq.Dial("tcp://server:1883",
mq.WithClientID("full-client"),
mq.WithCredentials("user", "pass"),
mq.WithKeepAlive(60*time.Second),
mq.WithCleanSession(true),
mq.WithAutoReconnect(true),
mq.WithConnectTimeout(30*time.Second),
mq.WithWill("status/offline", []byte("disconnected"), 1, true))
func DialContext ¶ added in v0.9.1
DialContext establishes a connection to an MQTT server with a context and returns a Client.
The context is used to control the initial connection establishment, including the network dial, TLS handshake, and MQTT CONNECT handshake. If the context is cancelled or expires before the handshake completes, DialContext returns an error.
When using DialContext, the WithConnectTimeout option is ignored for the initial connection (as the provided context takes precedence), but it is still used for subsequent automatic reconnection attempts.
Once the initial connection is established, the context's expiration has no effect on the ongoing connection or background maintenance.
Example:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := mq.DialContext(ctx, "tcp://localhost:1883",
mq.WithClientID("my-client"))
func (*Client) AssignedClientID ¶
AssignedClientID returns the client ID assigned by the server.
When connecting with an empty client ID, the server may assign a unique identifier and return it in the CONNACK packet. This method returns that assigned ID if one was provided.
This is only populated for MQTT v5.0 connections where the client connected with an empty client ID and the server assigned one. For v3.1.1 connections or when the client provided their own ID, this returns an empty string.
The assigned ID can be useful for:
- Logging and debugging
- Reconnection with the same session
- Understanding what ID the server chose
Example:
client, _ := mq.Dial("tcp://localhost:1883",
mq.WithClientID(""), // Empty - let server assign
mq.WithProtocolVersion(mq.ProtocolV50))
assignedID := client.AssignedClientID()
if assignedID != "" {
fmt.Printf("Server assigned ID: %s\n", assignedID)
}
func (*Client) ConnectionUserProperties ¶ added in v0.9.3
ConnectionUserProperties returns the User Properties received from the server in the CONNACK packet. These are application-specific key-value pairs provided by the server during the connection handshake.
This is only populated for MQTT v5.0 connections. Returns a copy of the map to prevent concurrent modification.
func (*Client) Disconnect ¶
func (c *Client) Disconnect(ctx context.Context, opts ...DisconnectOption) error
Disconnect gracefully disconnects from the server.
It sends a DISCONNECT packet to the server, stops all background goroutines, and closes the network connection. The function blocks until all goroutines have exited or the context is cancelled.
If AutoReconnect is enabled, it will be disabled after calling Disconnect. To reconnect, create a new client with Dial.
If the client is connected with MQTT v5.0, you can provide options such as WithReason to specify the reason code. These options are ignored when using MQTT v3.1.1.
Example:
// Normal disconnect (v3.1.1 or v5.0) client.Disconnect(context.Background()) // Disconnect with reason (v5.0 only) client.Disconnect(context.Background(), mq.WithReason(mq.ReasonCodeDisconnectWithWill))
func (*Client) GetStats ¶ added in v0.9.2
func (c *Client) GetStats() ClientStats
GetStats returns the current client statistics.
func (*Client) IsConnected ¶
IsConnected returns true if the client is currently connected to the server. This method is thread-safe.
func (*Client) Publish ¶
func (c *Client) Publish(topic string, payload []byte, opts ...PublishOption) Token
Publish publishes a message to the specified topic.
The function returns a Token that can be used to wait for completion. For QoS 0, the token completes immediately after sending. For QoS 1 and 2, the token completes after receiving the appropriate acknowledgment from the server.
Example (QoS 0 - fire and forget):
client.Publish("sensors/temp", []byte("22.5"), mq.WithQoS(0))
Example (QoS 1 - wait for acknowledgment):
token := client.Publish("sensors/temp", []byte("22.5"), mq.WithQoS(1))
if err := token.Wait(context.Background()); err != nil {
log.Printf("Publish failed: %v", err)
}
Example (retained message):
client.Publish("status/online", []byte("true"),
mq.WithQoS(1),
mq.WithRetain(true))
Example (QoS 2 with timeout):
token := client.Publish("critical/alert", []byte("fire"),
mq.WithQoS(2))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := token.Wait(ctx); err != nil {
log.Printf("Publish timeout or failed: %v", err)
}
func (*Client) Reauthenticate ¶
Reauthenticate initiates re-authentication with the server (MQTT v5.0).
This sends an AUTH packet with reason code 0x19 (Re-authenticate) to start a new authentication exchange. The authenticator's HandleChallenge method will be called for each challenge from the server.
Re-authentication is useful for:
- Refreshing expired tokens
- Rotating credentials
- Periodic security validation
Note: The Client and Server can continue to send and receive other Control Packets (such as PUBLISH) during the re-authentication exchange. The connection remains fully functional.
This method returns immediately. Authentication happens asynchronously. Use the authenticator's Complete method to know when it succeeds.
Returns an error if:
- Not using MQTT v5.0
- No authenticator configured
- Not connected
Example:
// Refresh authentication periodically
ticker := time.NewTicker(30 * time.Minute)
go func() {
for range ticker.C {
if err := client.Reauthenticate(context.Background()); err != nil {
log.Printf("Re-authentication failed: %v", err)
}
}
}()
func (*Client) ResponseInformation ¶
ResponseInformation returns the response information string provided by the server.
In MQTT v5.0, the server can provide a ResponseInformation string in the CONNACK packet. This string can be used by the client as the basis for creating response topics in request/response messaging patterns.
This is typically used in multi-tenant or managed cloud environments where the server wants to control topic naming conventions and ensure proper namespace isolation between clients.
This method returns the server's response information if one was provided, or an empty string if:
- The connection is using MQTT v3.1.1 (property not supported)
- The server did not provide response information
- Not yet connected
Example usage:
client, _ := mq.Dial("tcp://localhost:1883",
mq.WithProtocolVersion(mq.ProtocolV50))
if respInfo := client.ResponseInformation(); respInfo != "" {
// Use server's suggested prefix for response topics
responseTopic := respInfo + "my-responses"
client.Publish("requests/data", payload,
mq.WithResponseTopic(responseTopic))
}
func (*Client) ServerCapabilities ¶
func (c *Client) ServerCapabilities() ServerCapabilities
ServerCapabilities returns the server capabilities received in the CONNACK packet. This is only populated for MQTT v5.0 connections. For v3.1.1 connections, default values are returned.
func (*Client) ServerKeepAlive ¶
ServerKeepAlive returns the keepalive interval (in seconds) that the server wants the client to use.
In MQTT v5.0, the server can override the client's requested keepalive interval by sending a Server Keep Alive property in the CONNACK packet. When this happens, the client MUST use the server's value instead of its requested value.
This method returns the server's keepalive value if one was provided, or 0 if:
- The connection is using MQTT v3.1.1 (property not supported)
- The server did not override the keepalive (accepted client's request)
- Not yet connected
The library automatically adjusts its internal keepalive timer when the server overrides the value, so users typically don't need to take action. This method is primarily useful for logging and debugging.
Example:
client, _ := mq.Dial("tcp://localhost:1883",
mq.WithKeepAlive(60*time.Second),
mq.WithProtocolVersion(mq.ProtocolV50))
if serverKA := client.ServerKeepAlive(); serverKA > 0 {
fmt.Printf("Server overrode keepalive to %d seconds\n", serverKA)
}
func (*Client) ServerReference ¶
ServerReference returns the server reference URI provided by the server.
In MQTT v5.0, the server can provide a ServerReference string in the CONNACK or DISCONNECT packet to tell the client to connect to a different server. This is used for:
- Load balancing: Distribute clients across multiple servers
- Maintenance: Move clients before server shutdown
- Geographic routing: Direct clients to nearest server
- Failover: Redirect to backup server
IMPORTANT: The library does NOT automatically redirect to the referenced server. Users must check this value and manually reconnect if desired or use the WithOnServerRedirect option.
This method returns the server's reference URI if one was provided, or an empty string if:
- The connection is using MQTT v3.1.1 (property not supported)
- The server did not provide a server reference
- Not yet connected
Example usage with callback (recommended):
client, _ := mq.Dial("tcp://server-a.example.com:1883",
mq.WithOnServerRedirect(func(newServer string) {
log.Printf("Server suggests: %s", newServer)
// Decide whether to reconnect
}))
Example usage with polling:
client, _ := mq.Dial("tcp://server-a.example.com:1883",
mq.WithProtocolVersion(mq.ProtocolV50))
if ref := client.ServerReference(); ref != "" {
fmt.Printf("Server suggests redirecting to: %s\n", ref)
// Application decides whether to reconnect
client.Disconnect(context.Background())
newClient, _ := mq.Dial(ref, mq.WithProtocolVersion(mq.ProtocolV50))
}
func (*Client) SessionExpiryInterval ¶
SessionExpiryInterval returns the session expiry interval (in seconds) that the server is using for this connection.
For MQTT v5.0, this returns the actual value negotiated with the server. The server can override the client's requested value.
For MQTT v3.1.1:
- If CleanSession is false (persistent), this returns 0xFFFFFFFF (MaxUint32), reflecting that the session does not expire on disconnect.
- If CleanSession is true, this returns 0.
Returns 0 if session expires immediately on disconnect or not yet connected.
Example:
client, _ := mq.Dial("tcp://localhost:1883",
mq.WithProtocolVersion(mq.ProtocolV50),
mq.WithSessionExpiryInterval(3600))
actual := client.SessionExpiryInterval()
if actual != 3600 {
fmt.Printf("Server overrode session expiry to %d seconds\n", actual)
}
func (*Client) Subscribe ¶
func (c *Client) Subscribe(topic string, qos QoS, handler MessageHandler, opts ...SubscribeOption) Token
Subscribe subscribes to a topic with the specified QoS level.
The handler function is called for each message received on topics matching the subscription filter. If a message matches multiple subscription filters, the handlers for all matching subscriptions will be called.
The handler is called in a separate goroutine, so it should not block for long periods.
Topic filters support MQTT wildcards:
- '+' matches a single level (e.g., "sensors/+/temperature")
- '#' matches multiple levels (e.g., "sensors/#")
The function returns a Token that completes when the subscription is acknowledged by the server.
For persistent sessions (CleanSession=false), it is recommended to use the mq.WithSubscription option during Dial instead. This ensures handlers are automatically re-registered if the session is lost and the client must re-subscribe.
Example (simple subscription):
token := client.Subscribe("sensors/temperature", 1,
func(c *mq.Client, msg mq.Message) {
fmt.Printf("Temperature: %s\n", string(msg.Payload))
})
if err := token.Wait(context.Background()); err != nil {
log.Fatal(err)
}
Example with options (MQTT v5.0):
client.Subscribe("chat/room", 1, handler, mq.WithNoLocal(true))
func (*Client) Unsubscribe ¶
func (c *Client) Unsubscribe(topic string, opts ...UnsubscribeOption) Token
Unsubscribe unsubscribes from a single topic.
After unsubscribing, the client will no longer receive messages on the specified topic. The function returns a Token that completes when the unsubscription is acknowledged by the server.
This method supports options for MQTT v5.0 features like User Properties.
Example:
token := client.Unsubscribe("sensors/temperature")
token.Wait(context.Background())
Example with options (MQTT v5.0):
client.Unsubscribe("logs", mq.WithUnsubscribeUserProperty("reason", "done"))
type ClientStats ¶ added in v0.9.2
type ClientStats struct {
PacketsSent uint64
PacketsReceived uint64
BytesSent uint64
BytesReceived uint64
ReconnectCount uint64
Connected bool
}
ClientStats holds connection and throughput statistics.
type ContextDialer ¶
type ContextDialer interface {
DialContext(ctx context.Context, network, addr string) (net.Conn, error)
}
ContextDialer is an interface for custom network dialing logic. It matches the signature of net.Dialer.DialContext.
type DisconnectError ¶ added in v0.9.3
type DisconnectError struct {
ReasonCode ReasonCode
ReasonString string
SessionExpiryInterval uint32 // 0 if not set
ServerReference string // Empty if not set
UserProperties map[string]string // Nil if not set
}
DisconnectError represents a DISCONNECT packet received from the server, containing potential MQTT v5.0 properties.
func (*DisconnectError) Error ¶ added in v0.9.3
func (e *DisconnectError) Error() string
func (*DisconnectError) Is ¶ added in v0.9.3
func (e *DisconnectError) Is(target error) bool
type DisconnectOption ¶
type DisconnectOption func(*DisconnectOptions)
DisconnectOption is a functional option for configuring a disconnection.
func WithDisconnectProperties ¶
func WithDisconnectProperties(props *Properties) DisconnectOption
WithDisconnectProperties sets the MQTT v5.0 properties for the DISCONNECT packet.
This can be used to set:
- Session Expiry Interval (to update expiry on disconnect)
- Reason String (diagnostic information)
- User Properties (custom metadata)
This option is ignored when using MQTT v3.1.1.
func WithReason ¶
func WithReason(code ReasonCode) DisconnectOption
WithReason sets the reason code for the DISCONNECT packet (MQTT v5.0). Common codes include:
- 0x00: Normal Disconnect (default)
- 0x04: Disconnect with Will Message
This option is ignored when using MQTT v3.1.1.
type DisconnectOptions ¶
type DisconnectOptions struct {
ReasonCode ReasonCode
Properties *Properties
}
DisconnectOptions holds configuration for a disconnection.
type FileStore ¶
type FileStore struct {
// contains filtered or unexported fields
}
FileStore implements SessionStore using JSON files on disk. Each client ID gets its own directory containing separate files for pending publishes, subscriptions, and received QoS 2 packet IDs.
File organization:
baseDir/
clientID/
pending_1.json
pending_2.json
subscriptions.json
qos2_received.json
This implementation is synchronous - all operations block until complete. For async/batched writes, users can implement a custom SessionStore.
func NewFileStore ¶
func NewFileStore(baseDir, clientID string, opts ...FileStoreOption) (*FileStore, error)
NewFileStore creates a file-based session store for the specified client ID.
The baseDir will contain a subdirectory for each client ID, allowing multiple clients to share the same base directory.
Example:
store, err := mq.NewFileStore("/var/lib/mqtt", "sensor-1")
if err != nil {
log.Fatal(err)
}
client, err := mq.Dial("tcp://localhost:1883",
mq.WithClientID("sensor-1"),
mq.WithCleanSession(false),
mq.WithSessionStore(store))
func (*FileStore) ClearPendingPublishes ¶
ClearPendingPublishes removes all pending publishes from disk.
func (*FileStore) ClearReceivedQoS2 ¶
ClearReceivedQoS2 removes all received QoS 2 packet IDs.
func (*FileStore) ClientID ¶
ClientID returns the client ID this store is bound to. This can be used to validate that the store matches the client.
func (*FileStore) DeletePendingPublish ¶
DeletePendingPublish removes a pending publish from disk.
func (*FileStore) DeleteReceivedQoS2 ¶
DeleteReceivedQoS2 removes a QoS 2 packet ID.
func (*FileStore) DeleteSubscription ¶
DeleteSubscription removes a subscription from disk.
func (*FileStore) LoadPendingPublishes ¶
func (f *FileStore) LoadPendingPublishes() (map[uint16]*PersistedPublish, error)
LoadPendingPublishes loads all pending publishes from disk.
func (*FileStore) LoadReceivedQoS2 ¶
LoadReceivedQoS2 loads all received QoS 2 packet IDs.
func (*FileStore) LoadSubscriptions ¶
func (f *FileStore) LoadSubscriptions() (map[string]*PersistedSubscription, error)
LoadSubscriptions loads all subscriptions from disk.
func (*FileStore) SavePendingPublish ¶
func (f *FileStore) SavePendingPublish(packetID uint16, pub *PersistedPublish) error
SavePendingPublish stores a pending publish to disk.
func (*FileStore) SaveReceivedQoS2 ¶
SaveReceivedQoS2 marks a QoS 2 packet ID as received.
func (*FileStore) SaveSubscription ¶
func (f *FileStore) SaveSubscription(topic string, sub *PersistedSubscription) error
SaveSubscription stores a subscription to disk.
type FileStoreOption ¶
type FileStoreOption func(*fileStoreConfig)
FileStoreOption configures a FileStore.
func WithPermissions ¶
func WithPermissions(perm os.FileMode) FileStoreOption
WithPermissions sets the file permissions for stored files. Default is 0600 (owner read/write, group/others none).
Example:
store, _ := mq.NewFileStore("/var/lib/mqtt", "sensor-1",
mq.WithPermissions(0600)) // Owner read/write only
type HandlerInterceptor ¶ added in v0.9.4
type HandlerInterceptor func(MessageHandler) MessageHandler
HandlerInterceptor is a function that wraps a MessageHandler. It allows cross-cutting concerns like logging, metrics, or tracing to be applied to all message processing.
Example (Logging):
func LoggingInterceptor(next mq.MessageHandler) mq.MessageHandler {
return func(client *mq.Client, msg mq.Message) {
log.Printf("Received message on topic %s", msg.Topic)
next(client, msg)
}
}
type LimitPolicy ¶ added in v0.9.2
type LimitPolicy int
LimitPolicy determines how the client enforces limits (like ReceiveMaximum).
const ( // LimitPolicyIgnore logs a warning once per connection but continues processing. LimitPolicyIgnore LimitPolicy = iota // LimitPolicyStrict sends a DISCONNECT with Reason Code 0x93 (Receive Maximum exceeded) // when the limit is reached. // // Note: Auto-reconnect should be disabled or carefully managed when using this policy, // as a misbehaving server could cause an infinite loop of connect -> overflow -> disconnect. LimitPolicyStrict )
type Message ¶
type Message struct {
// Topic the message was published to
Topic string
// Message payload
Payload []byte
// Quality of Service level
QoS QoS
// Retained message flag
Retained bool
// Duplicate delivery flag
Duplicate bool
// MQTT v5.0 properties.
// This field is nil for MQTT v3.1.1 connections or when no properties are present.
Properties *Properties
}
Message represents an MQTT message received on a subscribed topic.
This struct is designed to be compatible with both MQTT v3.1.1 and v5.0.
The message is passed to subscription handlers and contains all relevant information about the received message including topic, payload, QoS level, and flags.
type MessageHandler ¶
MessageHandler is called when a message is received on a subscribed topic.
type MqttError ¶
type MqttError struct {
ReasonCode ReasonCode
Message string
Parent error
}
MqttError represents an error returned by the MQTT server, including the MQTT v5.0 reason code.
type Option ¶
type Option func(*clientOptions)
Option is a functional option for configuring the client.
func WithAuthenticator ¶
func WithAuthenticator(auth Authenticator) Option
WithAuthenticator sets the authenticator for enhanced authentication (MQTT v5.0).
Enhanced authentication allows challenge/response authentication mechanisms such as SCRAM, OAuth, Kerberos, or custom methods. The authenticator handles the authentication exchange via the AUTH packet flow.
If set, the client will:
- Send AuthenticationMethod + InitialData in CONNECT
- Handle AUTH challenges from the server via HandleChallenge
- Complete authentication when CONNACK is received
This option is only relevant for MQTT v5.0. For v3.1.1, use WithCredentials for simple username/password authentication.
Example:
auth := &MyScramAuthenticator{username: "user", password: "pass"}
client, err := mq.Dial("tcp://localhost:1883",
mq.WithProtocolVersion(mq.ProtocolV50),
mq.WithAuthenticator(auth))
func WithAutoProtocolVersion ¶ added in v0.9.4
WithAutoProtocolVersion enables or disables automatic protocol version negotiation.
When enabled (default: true), the client will first attempt to connect using MQTT v5.0. If the server rejects the connection with an "Unacceptable Protocol Version" error (0x01), the client will automatically fall back to MQTT v3.1.1 and try again.
This is useful when connecting to unknown servers or during migrations where some brokers might not yet support MQTT v5.0.
Note: This only applies to the initial connection. Once a version is negotiated, it is used for all subsequent automatic reconnections.
func WithAutoReconnect ¶
WithAutoReconnect enables or disables automatic reconnection (default: true).
func WithCleanSession ¶
WithCleanSession sets the clean session flag.
When set to true (default), the server will discard any previous session state and subscriptions for this client ID. Each connection starts fresh.
When set to false, the server maintains session state across disconnections:
- Subscriptions persist and are restored on reconnect
- QoS 1 and 2 messages sent while offline are queued for delivery
- The client MUST use a non-empty client ID (via WithClientID)
- The server will reject the connection if client ID is empty
MQTT v3.1.1 vs v5.0 Differences:
- In v3.1.1: false ("Clean Session") means the session persists indefinitely after disconnect.
- In v5.0: false ("Clean Start") means the session RESUMES on connect, but EXPIRES immediately on disconnect unless WithSessionExpiryInterval is set.
To achieve persistent sessions in MQTT v5.0, you must use both:
mq.WithCleanSession(false) mq.WithSessionExpiryInterval(0xFFFFFFFF) // Persist indefinitely
Use false for reliable message delivery across network interruptions. Use true for stateless clients or when you don't need message persistence.
Example (persistent session):
client, err := mq.Dial("tcp://localhost:1883",
mq.WithClientID("sensor-1"), // Required for CleanSession=false
mq.WithCleanSession(false))
func WithClientID ¶
WithClientID sets the client identifier.
The client ID uniquely identifies this client to the MQTT server.
Empty client ID behavior (MQTT v3.1.1 spec):
- With CleanSession=true: Server will auto-generate a unique ID
- With CleanSession=false: Server will reject the connection (identifier rejected)
For persistent sessions (CleanSession=false), you MUST provide a non-empty client ID.
func WithConnectTimeout ¶
WithConnectTimeout sets the connection timeout (default: 30s).
func WithConnectUserProperties ¶ added in v0.9.3
WithConnectUserProperties sets the User Properties to be sent in the CONNECT packet.
Only applicable for MQTT v5.0 connections. User Properties are key-value pairs that allow the client to send custom metadata to the server during the connection handshake.
This option is ignored when using MQTT v3.1.1.
Example:
client, _ := mq.Dial("tcp://localhost:1883",
mq.WithProtocolVersion(mq.ProtocolV50),
mq.WithConnectUserProperties(map[string]string{
"region": "us-east-1",
"app-version": "1.0.2",
}))
func WithCredentials ¶
WithCredentials sets the username and password for authentication.
func WithDefaultPublishHandler ¶
func WithDefaultPublishHandler(handler MessageHandler) Option
WithDefaultPublishHandler sets a fallback handler for incoming PUBLISH messages that do not match any registered subscription.
This is useful for:
- Handling messages received during reconnection race conditions
- Handling persistent subscriptions restored without a registered handler (orphans)
- Debugging or logging unexpected messages
- Implementing a catch-all strategy
If not set (default), messages matching no subscription are silently dropped (but still acknowledged to comply with the protocol).
Example:
client, _ := mq.Dial(uri,
mq.WithDefaultPublishHandler(func(c *mq.Client, msg mq.Message) {
log.Printf("Unexpected message on %s: %s", msg.Topic, msg.Payload)
}),
)
func WithDialer ¶
func WithDialer(dialer ContextDialer) Option
WithDialer sets a custom dialer for establishing the network connection. This enables support for alternative transports like WebSockets, Unix sockets, or proxying, without adding dependencies to the core library.
If provided, the library will skip its standard scheme validation and delegate the connection creation entirely to the dialer.
The dialer's DialContext method receives:
- ctx: The context provided to DialContext (or one created from WithConnectTimeout if using Dial)
- network: The scheme from the server URL (e.g. "ws", "tcp", "unix")
- addr: The original server string passed to Dial
Example (WebSockets using nhooyr.io/websocket):
client, _ := mq.Dial("ws://server.example.com/mqtt",
mq.WithDialer(mq.DialFunc(func(ctx context.Context, network, addr string) (net.Conn, error) {
c, _, err := websocket.Dial(ctx, addr, &websocket.DialOptions{
Subprotocols: []string{"mqtt"}, // Crucial for MQTT over WebSockets
})
if err != nil {
return nil, err
}
return websocket.NetConn(ctx, c, websocket.MessageBinary), nil
})))
func WithHandlerInterceptor ¶ added in v0.9.4
func WithHandlerInterceptor(interceptor HandlerInterceptor) Option
WithHandlerInterceptor adds an interceptor to the incoming message handler chain. Interceptors are called in the order they are added.
Example (Logging):
client, _ := mq.Dial(uri,
mq.WithHandlerInterceptor(func(next mq.MessageHandler) mq.MessageHandler {
return func(c *mq.Client, m mq.Message) {
log.Printf("Received message on topic %s", m.Topic)
next(c, m)
}
}),
)
func WithIncomingQueueSize ¶ added in v0.9.4
WithIncomingQueueSize sets the size of the internal incoming packet buffer (default: 100).
func WithKeepAlive ¶
WithKeepAlive sets the MQTT keep alive interval (default: 60s).
func WithLogger ¶
WithLogger sets a custom logger for the client. If not provided, the client will use a logger that discards all output. Use this to integrate with your application's logging system.
Example:
logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))
client, _ := mq.Dial("tcp://localhost:1883", mq.WithLogger(logger))
func WithMaxIncomingPacket ¶
WithMaxIncomingPacket sets the maximum allowed incoming packet size. Default is 268435455 (256MB, MQTT spec maximum). Set to a lower value to protect against memory exhaustion from large incoming packets. Example: WithMaxIncomingPacket(1024 * 1024) limits incoming packets to 1MB.
func WithMaxPayloadSize ¶
WithMaxPayloadSize sets the maximum allowed outgoing payload size. Default is 268435455 (256MB, MQTT spec maximum). Set to a lower value to prevent sending large messages.
func WithMaxTopicLength ¶
WithMaxTopicLength sets the maximum allowed topic length. Default is 65535 (MQTT spec maximum). Set to a lower value to reject topics exceeding your application's needs.
func WithOnConnect ¶
WithOnConnect sets the handler to be called when the client connects. This is called for the initial connection and every successful reconnection.
The handler is invoked asynchronously in a separate goroutine. This allows implementing complex setup logic (e.g., subscribing, publishing) without blocking the connection process or logic loop.
func WithOnConnectionLost ¶
WithOnConnectionLost sets the handler to be called when the connection is lost. The error parameter provides the reason for disconnection.
The handler is invoked asynchronously in a separate goroutine to ensure it does not block internal cleanup or reconnection attempts.
func WithOnServerRedirect ¶
WithOnServerRedirect sets the handler to be called when the server provides a redirection reference (MQTT v5.0 only).
The server can send a ServerReference in the CONNACK or DISCONNECT packet to suggest that the client connect to a different server. This is used for:
- Load balancing: Distribute clients across multiple servers
- Maintenance: Move clients before server shutdown
- Geographic routing: Direct clients to nearest server
- Failover: Redirect to backup server
The handler receives the server URI as provided by the server. The client does NOT automatically redirect - the handler should decide whether to accept the redirect and manually reconnect if desired.
The handler is invoked asynchronously in a separate goroutine to prevent blocking the processing of the CONNACK packet.
The server reference is also available via the ServerReference() method for polling/checking later.
This option is only relevant for MQTT v5.0 connections. For v3.1.1, the callback will never be invoked.
Example:
client, err := mq.Dial("tcp://server1.example.com:1883",
mq.WithOnServerRedirect(func(newServer string) {
log.Printf("Server suggests redirect to: %s", newServer)
// Application decides whether to reconnect
}))
func WithOutgoingQueueSize ¶ added in v0.9.4
WithOutgoingQueueSize sets the size of the internal outgoing packet buffer (default: 1000).
func WithProtocolVersion ¶
WithProtocolVersion sets the MQTT protocol version to use. Use ProtocolV50 (default) for MQTT v5.0 or ProtocolV311 for MQTT v3.1.1.
Example for v3.1.1 server:
client, _ := mq.Dial("tcp://localhost:1883", mq.WithProtocolVersion(mq.ProtocolV311))
func WithPublishInterceptor ¶ added in v0.9.4
func WithPublishInterceptor(interceptor PublishInterceptor) Option
WithPublishInterceptor adds an interceptor to the outbound publish chain. Interceptors are called in the order they are added.
Example (Tracing):
client, _ := mq.Dial(uri,
mq.WithPublishInterceptor(func(next mq.PublishFunc) mq.PublishFunc {
return func(topic string, payload []byte, opts ...mq.PublishOption) mq.Token {
// Inject tracing headers or log the publish
return next(topic, payload, opts...)
}
}),
)
func WithQoS0LimitPolicy ¶ added in v0.9.4
func WithQoS0LimitPolicy(policy QoS0LimitPolicy) Option
WithQoS0LimitPolicy sets the policy for handling QoS 0 messages when the buffer is full.
The default policy is QoS0LimitPolicyDrop, which ensures the client remains non-blocking and responsive even under extreme network congestion.
func WithReceiveMaximum ¶
func WithReceiveMaximum(max uint16, policy LimitPolicy) Option
WithReceiveMaximum sets the maximum number of unacknowledged QoS 1 and QoS 2 messages the client is willing to process concurrently.
Only applicable for MQTT v5.0. This value is sent in the CONNECT packet. The default value is 65535 (maximum allowed by spec).
This allows the client to limit the number of messages it has to manage buffering for. If the client cannot process messages fast enough, it can lower this value to apply backpressure to the server.
The policy argument determines behavior when the limit is exceeded:
- LimitPolicyIgnore (recommended): Log a warning once and continue processing. This protects the client from disconnecting due to server bugs, while still processing messages (potentially unbounded).
- LimitPolicyStrict: Disconnect with Reason Code 0x93. Use this if strict flow control compliance is required.
This option is ignored when using MQTT v3.1.1.
func WithRequestProblemInformation ¶
WithRequestProblemInformation requests that the server include detailed problem information (ReasonString and UserProperties) in error responses.
Only applicable for MQTT v5.0 connections. When set to true, the server should include diagnostic information in CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK, and DISCONNECT packets when errors occur.
This is useful for debugging and understanding server behavior, but may increase bandwidth usage. Most servers send problem information by default.
This option is ignored when using MQTT v3.1.1.
Example:
client, _ := mq.Dial("tcp://localhost:1883",
mq.WithProtocolVersion(mq.ProtocolV50),
mq.WithRequestProblemInformation(true))
func WithRequestResponseInformation ¶
WithRequestResponseInformation requests that the server provide response information in the CONNACK packet.
Only applicable for MQTT v5.0 connections. When set to true, the server may include a ResponseInformation string that the client can use as the basis for creating response topics in request/response patterns.
This is useful in multi-tenant or managed cloud environments where the server controls topic naming conventions.
This option is ignored when using MQTT v3.1.1.
Example:
client, _ := mq.Dial("tcp://localhost:1883",
mq.WithProtocolVersion(mq.ProtocolV50),
mq.WithRequestResponseInformation(true))
if respInfo := client.ResponseInformation(); respInfo != "" {
// Use server's suggested prefix
responseTopic := respInfo + "my-responses"
}
func WithSessionExpiryInterval ¶
WithSessionExpiryInterval sets how long the server should maintain session state after the client disconnects (in seconds).
Only applicable for MQTT v5.0. For v3.1.1, use WithCleanSession instead.
Values:
0: Session ends immediately on disconnect (can be explicitly set)
1-4294967294: Session persists for this many seconds
4294967295 (0xFFFFFFFF): Session never expires
The server may override this value (e.g., to enforce a maximum limit). Use client.SessionExpiryInterval() after connecting to see the actual value negotiated with the server.
Note: This can be combined with WithCleanSession(false) to resume a previous session while also controlling how long it persists.
This option is ignored when using MQTT v3.1.1.
Example (short-lived session for mobile app):
client, _ := mq.Dial("tcp://localhost:1883",
mq.WithProtocolVersion(mq.ProtocolV50),
mq.WithCleanSession(false),
mq.WithSessionExpiryInterval(300)) // 5 minutes
Example (long-lived session for IoT device):
client, _ := mq.Dial("tcp://localhost:1883",
mq.WithProtocolVersion(mq.ProtocolV50),
mq.WithCleanSession(false),
mq.WithSessionExpiryInterval(86400)) // 24 hours
func WithSessionStore ¶
func WithSessionStore(store SessionStore) Option
WithSessionStore sets a custom session store for persistence.
If set, session state (pending publishes, subscriptions, received QoS 2 IDs) will be persisted across process restarts. This enables the client to resume unacknowledged messages and subscriptions after a crash or reboot.
The store is only loaded when the process starts (not on network reconnects). During normal reconnections, the in-memory state is used directly.
Example with file-based storage:
store, err := mq.NewFileStore("/var/lib/mqtt", "sensor-1")
if err != nil {
log.Fatal(err)
}
client, err := mq.Dial("tcp://localhost:1883",
mq.WithClientID("sensor-1"),
mq.WithCleanSession(false),
mq.WithSessionStore(store))
func WithSubscription ¶
func WithSubscription(topic string, handler MessageHandler) Option
WithSubscription defines a subscription that the client should maintain.
This serves two purposes:
- Registers the MessageHandler locally before connection (preventing race conditions).
- Automatically subscribes to the topic on connection/reconnection if needed.
For persistent sessions (CleanSession=false):
- If SessionPresent=true: The server has the subscription; we just register the handler locally.
- If SessionPresent=false: The client will automatically resubscribe to this topic.
For clean sessions (CleanSession=true):
- The client will automatically subscribe to this topic on every connection.
func WithTLS ¶
WithTLS sets the TLS configuration for secure connections. Pass nil for default TLS settings, or provide a custom *tls.Config. The server URL should use "tls://", "ssl://", or "mqtts://" scheme, or this option will enable TLS for "tcp://" URLs as well.
func WithTopicAliasMaximum ¶
WithTopicAliasMaximum sets the maximum number of topic aliases the client will accept from the server when receiving PUBLISH messages.
Only applicable for MQTT v5.0. This value is sent in the CONNECT packet to tell the server how many aliases it can send to the client.
The server will also send its own TopicAliasMaximum in CONNACK, which tells the client how many aliases it can send to the server when publishing. These are independent values - one for each direction.
Topic aliases allow short numeric IDs to be used instead of full topic names, reducing bandwidth usage for frequently published topics.
Values:
- 0: Topic aliases disabled (default)
- 1-65535: Maximum number of aliases to accept from server
This option is ignored when using MQTT v3.1.1.
Example:
client, _ := mq.Dial("tcp://localhost:1883",
mq.WithProtocolVersion(mq.ProtocolV50),
mq.WithTopicAliasMaximum(100)) // Accept up to 100 aliases from server
// Publishing with alias (uses server's limit from CONNACK)
client.Publish("sensors/building-a/floor-3/room-42/temperature", data,
mq.WithAlias())
// Receiving with alias (uses our declared limit of 100)
client.Subscribe("sensors/#", mq.AtLeastOnce, func(c *mq.Client, msg mq.Message) {
// Topic is automatically resolved from alias
fmt.Printf("Topic: %s\n", msg.Topic)
})
func WithWill ¶
func WithWill(topic string, payload []byte, qos uint8, retained bool, properties ...*Properties) Option
WithWill sets the Last Will and Testament (LWT) message.
The LWT is a message that the MQTT server will automatically publish on behalf of the client if the client disconnects unexpectedly (e.g., network failure, crash, or power loss). It is NOT sent on graceful disconnects via Disconnect().
This is commonly used to notify other clients that a device has gone offline.
Parameters:
- topic: The topic to publish the will message to
- payload: The message content (e.g., "offline", "disconnected")
- qos: Quality of Service level (0, 1, or 2)
- retained: Whether the will message should be retained by the server
The will message is sent by the server when:
- The client fails to send a PINGREQ within the keepalive period
- The network connection is lost without a proper DISCONNECT packet
- The client crashes or loses power
The will message is NOT sent when:
- The client calls Disconnect() normally
- The connection is closed gracefully
Example (status monitoring):
client, err := mq.Dial("tcp://localhost:1883",
mq.WithClientID("sensor-1"),
mq.WithWill("devices/sensor-1/status", []byte("offline"), 1, true))
Other clients can subscribe to "devices/+/status" to monitor device connectivity. WithWill sets the Last Will and Testament message. The properties argument is optional and can be used to set Will Properties (MQTT v5.0).
type PersistedPublish ¶ added in v0.9.1
type PersistedPublish struct {
Topic string
Payload []byte
QoS uint8
Retain bool
Properties *PublishProperties
}
PersistedPublish represents a publish for persistence. This is a simplified representation containing only the data needed to restore a pending publish after reconnection.
type PersistedSubscription ¶ added in v0.9.1
type PersistedSubscription struct {
QoS uint8
Options *PersistedSubscriptionOptions
}
PersistedSubscription represents a subscription for persistence. This contains the data needed to restore a subscription after reconnection.
type PersistedSubscriptionOptions ¶ added in v0.9.1
type PersistedSubscriptionOptions struct {
NoLocal bool
RetainAsPublished bool
RetainHandling uint8
SubscriptionID *uint32
UserProperties map[string]string
}
PersistedSubscriptionOptions represents MQTT v5.0 subscription options for persistence.
type Properties ¶
type Properties struct {
// ContentType specifies the MIME content type of the message payload.
// Example: "application/json", "text/plain", "application/octet-stream"
ContentType string
// ResponseTopic specifies the topic for response messages.
// Used in request/response messaging patterns.
ResponseTopic string
// CorrelationData is used to correlate request and response messages.
// Typically used with ResponseTopic for request/response patterns.
CorrelationData []byte
// MessageExpiry specifies the message expiry interval in seconds.
// If set, the message will be discarded if not delivered within this time.
MessageExpiry *uint32
// PayloadFormat indicates the format of the payload.
// 0 = unspecified bytes (default)
// 1 = UTF-8 encoded character data
PayloadFormat *uint8
// SubscriptionIdentifier contains the subscription identifier(s) that matched
// this message. Only present in received messages when the server supports
// subscription identifiers and the subscription was created with an ID.
// This is a receive-only property. If set when publishing, it will be silently
// ignored and not sent to the server.
SubscriptionIdentifier []int
// ReasonString contains a human-readable explanation from the server.
// Typically used for diagnostic purposes when operations fail or behave
// unexpectedly. Common in error responses and server notifications.
// This is a receive-only property. If set when publishing, it will be silently
// ignored and not sent to the server.
ReasonString string
// WillDelayInterval specifies the delay in seconds before the Will Message is sent.
// If the connection is re-established before this time, the Will Message is not sent.
WillDelayInterval *uint32
// SessionExpiryInterval specifies the session expiry interval in seconds.
// Used in DISCONNECT packets to update the expiry interval.
SessionExpiryInterval *uint32
// UserProperties contains application-specific properties as key-value pairs.
// These can be used to pass custom metadata with messages.
UserProperties map[string]string
}
Properties represents MQTT v5.0 properties for messages.
All fields are optional and only used when the protocol version is 5.0. For MQTT v3.1.1 connections, properties are ignored.
Common properties for application use:
- ContentType: MIME type of the payload (e.g., "application/json")
- ResponseTopic: Topic for response messages in request/response pattern
- CorrelationData: Correlation data for matching requests with responses
- UserProperties: Application-specific key-value pairs
- MessageExpiry: Message expiry interval in seconds
func NewProperties ¶
func NewProperties() *Properties
NewProperties creates a new Properties instance with initialized maps.
func (*Properties) GetUserProperty ¶
func (p *Properties) GetUserProperty(key string) string
GetUserProperty retrieves a user property value. Returns empty string if the property doesn't exist.
func (*Properties) SetUserProperty ¶
func (p *Properties) SetUserProperty(key, value string)
SetUserProperty adds or updates a user property.
type PublishFunc ¶ added in v0.9.4
type PublishFunc func(topic string, payload []byte, opts ...PublishOption) Token
PublishFunc matches the signature of Client.Publish.
type PublishInterceptor ¶ added in v0.9.4
type PublishInterceptor func(PublishFunc) PublishFunc
PublishInterceptor is a function that wraps a PublishFunc. It allows cross-cutting concerns to be applied to all outbound messages.
Example (Tracing):
func TracingInterceptor(next mq.PublishFunc) mq.PublishFunc {
return func(topic string, payload []byte, opts ...mq.PublishOption) mq.Token {
// Inject tracing headers into opts or log the publish
return next(topic, payload, opts...)
}
}
type PublishOption ¶
type PublishOption func(*PublishOptions)
PublishOption is a functional option for configuring a PUBLISH packet.
func WithAlias ¶
func WithAlias() PublishOption
WithAlias enables topic alias optimization for this publish.
Only applicable for MQTT v5.0 when WithTopicAliasMaximum() is set. Topic aliases allow the client to send a short alias ID instead of the full topic name, reducing bandwidth usage for frequently published topics.
On the first publish to a topic with WithAlias():
- Sends full topic name + assigns an alias ID
- Subsequent publishes automatically use the alias (sends empty topic)
The library automatically manages alias allocation and tracking. If the alias limit is reached, gracefully falls back to sending the full topic.
Example:
client, _ := mq.Dial("tcp://localhost:1883",
mq.WithProtocolVersion(mq.ProtocolV50),
mq.WithTopicAliasMaximum(100))
// First publish - sends full topic + assigns alias
client.Publish("sensors/building-a/floor-3/room-42/temperature", data,
mq.WithAlias())
// Subsequent publishes - automatically uses alias (saves ~50 bytes)
client.Publish("sensors/building-a/floor-3/room-42/temperature", data,
mq.WithAlias())
func WithContentType ¶
func WithContentType(contentType string) PublishOption
WithContentType sets the MQTT v5.0 content type property. This specifies the MIME type of the message payload. Only used when protocol version is 5.0, ignored for v3.1.1.
Example:
client.Publish("data/json", payload, mq.WithContentType("application/json"))
func WithCorrelationData ¶
func WithCorrelationData(data []byte) PublishOption
WithCorrelationData sets correlation data for request/response pattern. Used to match responses with requests. Only used when protocol version is 5.0, ignored for v3.1.1.
Example:
client.Publish("requests/data", payload,
mq.WithResponseTopic("responses/data"),
mq.WithCorrelationData([]byte("req-123")))
func WithMessageExpiry ¶
func WithMessageExpiry(seconds uint32) PublishOption
WithMessageExpiry sets the message expiry interval in seconds. The message will be discarded if not delivered within this time. Only used when protocol version is 5.0, ignored for v3.1.1.
Example:
client.Publish("events/temp", payload, mq.WithMessageExpiry(300)) // 5 minutes
func WithPayloadFormat ¶
func WithPayloadFormat(format uint8) PublishOption
WithPayloadFormat sets the payload format indicator.
Format values:
- mq.PayloadFormatBytes (0): Unspecified bytes (default)
- mq.PayloadFormatUTF8 (1): UTF-8 encoded character data
If PayloadFormatUTF8 is used, the client will validate that the payload is valid UTF-8 before sending.
Only used when protocol version is 5.0, ignored for v3.1.1.
Example:
client.Publish("data/text", []byte("hello"), mq.WithPayloadFormat(mq.PayloadFormatUTF8))
func WithProperties ¶
func WithProperties(props *Properties) PublishOption
WithProperties sets multiple v5.0 properties at once. This is a convenience function for setting multiple properties. Only used when protocol version is 5.0, ignored for v3.1.1.
Example:
props := mq.NewProperties()
props.ContentType = "application/json"
props.SetUserProperty("version", "1.0")
client.Publish("data/json", payload, mq.WithProperties(props))
func WithQoS ¶
func WithQoS(qos QoS) PublishOption
WithQoS sets the Quality of Service level for the publish.
QoS levels:
- 0: At most once delivery (fire and forget)
- 1: At least once delivery (acknowledged)
- 2: Exactly once delivery (assured)
Default is QoS 0.
func WithResponseTopic ¶
func WithResponseTopic(topic string) PublishOption
WithResponseTopic sets the response topic for request/response pattern. The receiver can publish responses to this topic. Only used when protocol version is 5.0, ignored for v3.1.1.
Example:
client.Publish("requests/data", payload,
mq.WithResponseTopic("responses/data"),
mq.WithCorrelationData([]byte("req-123")))
func WithRetain ¶
func WithRetain(retain bool) PublishOption
WithRetain sets the retain flag for the publish.
When true, the server stores the message and delivers it to future subscribers of the topic. Only the most recent retained message per topic is stored.
Default is false.
func WithUserProperty ¶
func WithUserProperty(key, value string) PublishOption
WithUserProperty adds a user-defined property key-value pair. Can be called multiple times to add multiple properties. Only used when protocol version is 5.0, ignored for v3.1.1.
Example:
client.Publish("sensors/temp", payload,
mq.WithUserProperty("sensor-id", "temp-01"),
mq.WithUserProperty("location", "warehouse-a"))
type PublishOptions ¶
type PublishOptions struct {
QoS uint8
Retain bool
Properties *Properties
UseAlias bool
}
PublishOptions holds configuration for a publish operation.
type PublishProperties ¶
type PublishProperties struct {
PayloadFormat *uint8
MessageExpiry *uint32
TopicAlias *uint16
ResponseTopic string
CorrelationData []byte
UserProperties map[string]string
SubscriptionIdentifier *uint32
ContentType string
}
PublishProperties represents MQTT v5.0 publish properties for persistence.
type QoS ¶
type QoS uint8
QoS represents the MQTT Quality of Service level.
const ( // AtMostOnce (QoS 0) - Fire and forget delivery. // The message is delivered at most once, or it may not be delivered at all. // No acknowledgment is sent by the receiver, and the message is not retried. AtMostOnce QoS = 0 // AtLeastOnce (QoS 1) - Acknowledged delivery. // The message is always delivered at least once. The receiver sends an // acknowledgment (PUBACK), and the sender retries until acknowledged. // Duplicate messages may occur. AtLeastOnce QoS = 1 // ExactlyOnce (QoS 2) - Assured delivery. // The message is always delivered exactly once using a four-step handshake // (PUBLISH, PUBREC, PUBREL, PUBCOMP). This is the safest but slowest option. ExactlyOnce QoS = 2 )
MQTT Quality of Service levels.
These constants provide readable names for the three QoS levels defined in the MQTT specification. Using named constants improves code readability compared to numeric literals.
Example:
// More readable
client.Subscribe("sensors/temp", mq.AtLeastOnce, handler)
client.Publish("alert", data, mq.WithQoS(mq.ExactlyOnce))
// vs numeric literals
client.Subscribe("sensors/temp", 1, handler)
client.Publish("alert", data, mq.WithQoS(2))
type QoS0LimitPolicy ¶ added in v0.9.4
type QoS0LimitPolicy int
QoS0LimitPolicy determines how the client handles QoS 0 messages when the internal buffer is full.
const ( // QoS0LimitPolicyDrop drops the QoS 0 message immediately if the internal buffer is full. // This prevents the caller from blocking and avoids goroutine leaks. // The token's Dropped() method will return true. QoS0LimitPolicyDrop QoS0LimitPolicy = iota // QoS0LimitPolicyBlock blocks the caller until space is available in the internal buffer. // Use this if reliability is more important than preventing temporary blocking. // This is safe to use as it still respects client shutdown. QoS0LimitPolicyBlock )
type ReasonCode ¶ added in v0.9.1
type ReasonCode uint8
ReasonCode represents a MQTT v5.0 reason code. It implements the error interface so it can be used with errors.Is.
const ( ReasonCodeNormalDisconnect ReasonCode = 0x00 ReasonCodeDisconnectWithWill ReasonCode = 0x04 ReasonCodeUnspecifiedError ReasonCode = 0x80 ReasonCodeMalformedPacket ReasonCode = 0x81 ReasonCodeProtocolError ReasonCode = 0x82 ReasonCodeImplementationError ReasonCode = 0x83 ReasonCodeNotAuthorized ReasonCode = 0x87 ReasonCodeServerBusy ReasonCode = 0x89 ReasonCodeServerShuttingDown ReasonCode = 0x8B ReasonCodeKeepAliveTimeout ReasonCode = 0x8D ReasonCodeSessionTakenOver ReasonCode = 0x8E ReasonCodeTopicFilterInvalid ReasonCode = 0x90 ReasonCodeTopicNameInvalid ReasonCode = 0x91 ReasonCodeReceiveMaximumExceed ReasonCode = 0x93 ReasonCodeTopicAliasInvalid ReasonCode = 0x94 ReasonCodePacketTooLarge ReasonCode = 0x95 ReasonCodeMessageRateTooHigh ReasonCode = 0x96 ReasonCodeQuotaExceeded ReasonCode = 0x97 ReasonCodeAdministrativeAction ReasonCode = 0x98 ReasonCodePayloadFormatInvalid ReasonCode = 0x99 ReasonCodeRetainNotSupported ReasonCode = 0x9A ReasonCodeQoSNotSupported ReasonCode = 0x9B ReasonCodeUseAnotherServer ReasonCode = 0x9C ReasonCodeServerMoved ReasonCode = 0x9D ReasonCodeConnectionRateExceed ReasonCode = 0x9F ReasonCodeMaximumConnectTime ReasonCode = 0xA0 ReasonCodeSubscriptionIDNotSupp ReasonCode = 0xA1 ReasonCodeWildcardSubNotSupp ReasonCode = 0xA2 )
MQTT v5.0 Reason Codes
These constants represent the reason codes defined in the MQTT v5.0 specification. Reason codes are used in DISCONNECT, CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, and UNSUBACK packets to provide detailed information about the outcome of an operation.
Use these constants with errors.Is to check for specific error conditions.
Example (checking for specific disconnect reason):
token := client.Publish("topic", data, mq.WithQoS(1))
if err := token.Wait(ctx); err != nil {
if errors.Is(err, mq.ReasonCodeQuotaExceeded) {
log.Println("Server quota exceeded, backing off...")
} else if errors.Is(err, mq.ReasonCodeNotAuthorized) {
log.Println("Not authorized to publish to this topic")
}
}
Reason codes 0x00-0x7F indicate success, while 0x80-0xFF indicate failure.
func (ReasonCode) Error ¶ added in v0.9.1
func (r ReasonCode) Error() string
type ServerCapabilities ¶
type ServerCapabilities struct {
// MaximumPacketSize is the maximum packet size the server will accept.
// 0 means no limit was specified by the server.
MaximumPacketSize uint32
// ReceiveMaximum is the maximum number of QoS 1 and QoS 2 publications
// the server is willing to process concurrently.
ReceiveMaximum uint16
// TopicAliasMaximum is the maximum topic alias value the server accepts.
// 0 means topic aliases are not supported by the server.
TopicAliasMaximum uint16
// MaximumQoS is the maximum QoS level the server supports (0, 1, or 2).
MaximumQoS uint8
// RetainAvailable indicates if the server supports retained messages.
RetainAvailable bool
// WildcardAvailable indicates if the server supports wildcard subscriptions.
WildcardAvailable bool
// SubscriptionIDAvailable indicates if the server supports subscription identifiers.
SubscriptionIDAvailable bool
SharedSubscriptionAvailable bool
}
ServerCapabilities represents the capabilities and limits advertised by the MQTT server. These are only available when using MQTT v5.0.
type SessionStore ¶
type SessionStore interface {
// SavePendingPublish stores an outgoing publish that hasn't been acknowledged.
// Called when a QoS 1/2 publish is sent.
// MAY return immediately and persist asynchronously.
SavePendingPublish(packetID uint16, pub *PersistedPublish) error
// DeletePendingPublish removes a publish after it's been acknowledged.
// Called when PUBACK (QoS 1) or PUBCOMP (QoS 2) is received.
// MAY return immediately and delete asynchronously.
DeletePendingPublish(packetID uint16) error
// LoadPendingPublishes retrieves all pending publishes on reconnect.
// Called once during connection establishment.
// MUST complete synchronously and return actual data.
LoadPendingPublishes() (map[uint16]*PersistedPublish, error)
// ClearPendingPublishes removes all pending publishes.
// Called when SessionPresent=false (server lost our session).
ClearPendingPublishes() error
// SaveSubscription stores an active subscription.
// Called when SUBACK is received.
// MAY return immediately and persist asynchronously.
SaveSubscription(topic string, sub *PersistedSubscription) error
// DeleteSubscription removes a subscription.
// Called when UNSUBACK is received.
// MAY return immediately and delete asynchronously.
DeleteSubscription(topic string) error
// LoadSubscriptions retrieves all subscriptions on reconnect.
// Called once during connection establishment.
//
// Note: Only topic filters and options are restored. The associated MessageHandlers
// are NOT persisted.
// - Callers should use mq.WithSubscription to re-associate handlers with these topics.
// - If no handler is found, messages will fallback to the DefaultPublishHandler if set.
//
// MUST complete synchronously and return actual data.
LoadSubscriptions() (map[string]*PersistedSubscription, error)
// SaveReceivedQoS2 marks a QoS 2 packet ID as received (prevent duplicates).
// Called when QoS 2 PUBLISH is received.
// MAY return immediately and persist asynchronously.
SaveReceivedQoS2(packetID uint16) error
// DeleteReceivedQoS2 removes a QoS 2 packet ID after PUBCOMP sent.
// Called when QoS 2 flow completes.
// MAY return immediately and delete asynchronously.
DeleteReceivedQoS2(packetID uint16) error
// LoadReceivedQoS2 retrieves all received QoS 2 packet IDs.
// Called once during connection establishment.
// MUST complete synchronously and return actual data.
LoadReceivedQoS2() (map[uint16]struct{}, error)
// ClearReceivedQoS2 removes all received QoS 2 packet IDs.
// Called when SessionPresent=false (server lost our session).
ClearReceivedQoS2() error
// Clear removes all session state.
// Called when CleanSession=true or session expires.
Clear() error
}
SessionStore handles persistence of session state across process restarts. This enables session state to survive client restarts, crashes, or reboots.
Note: State is only loaded from the store when the client process starts. During normal network reconnections (when in-memory state is still available), the store is not consulted - the in-memory state is used directly.
What Gets Persisted:
- Pending QoS 1 and QoS 2 publishes (not yet acknowledged by server)
- Active subscriptions (to restore on reconnect)
- Received QoS 2 packet IDs (to prevent duplicate delivery)
What Does NOT Get Persisted:
- QoS 0 publishes (fire-and-forget, no delivery guarantee)
- Messages already acknowledged (PUBACK/PUBCOMP received)
- Connection state (handled by MQTT protocol on reconnect)
Threading Model:
All methods are called from a single goroutine (the client's logic loop). Implementations do NOT need to be thread-safe for concurrent calls from mq.
Async Implementations:
Save/Delete methods MAY return immediately and perform I/O asynchronously in a background goroutine. This allows implementations to batch writes or use async I/O without blocking the client's logic loop.
However, Load methods MUST complete synchronously and return the actual data, as they are called during connection setup when the data is needed immediately.
Error Handling:
- Save/Delete errors are logged but do not fail the operation. The in-memory state is authoritative. Implementations should handle errors gracefully (e.g., retry, log, alert).
- Load errors will cause connection failure, as session state cannot be restored.
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
SubscribeOption is a functional option for configuring a subscription.
func WithNoLocal ¶
func WithNoLocal(noLocal bool) SubscribeOption
WithNoLocal (MQTT v5.0) prevents the server from sending messages published by this client back to this client.
This option is ignored when using MQTT v3.1.1.
func WithPersistence ¶
func WithPersistence(persistence bool) SubscribeOption
WithPersistence sets whether the subscription should be persisted to the session store. If true (default), the subscription is saved and restored on process restart. If false, the subscription is ephemeral and lost on client restart. This is independent of the MQTT CleanSession/CleanStart flag which controls server-side persistence.
func WithRetainAsPublished ¶
func WithRetainAsPublished(retain bool) SubscribeOption
WithRetainAsPublished (MQTT v5.0) requests that the server keeps the Retain flag as set by the publisher when forwarding the message.
This option is ignored when using MQTT v3.1.1.
func WithRetainHandling ¶
func WithRetainHandling(handling uint8) SubscribeOption
WithRetainHandling (MQTT v5.0) specifies when retained messages are sent. 0 = Send retained messages at time of subscribe (default) 1 = Send retained messages at subscribe only if subscription doesn't exist 2 = Do not send retained messages at time of subscribe
This option is ignored when using MQTT v3.1.1.
func WithSubscribeUserProperty ¶ added in v0.9.1
func WithSubscribeUserProperty(key, value string) SubscribeOption
WithSubscribeUserProperty (MQTT v5.0) adds a user property to the subscription. User properties are key-value pairs that can be used to send metadata to the server.
This option is ignored when using MQTT v3.1.1.
func WithSubscriptionIdentifier ¶
func WithSubscriptionIdentifier(id int) SubscribeOption
WithSubscriptionIdentifier (MQTT v5.0) sets a subscription identifier for this subscription. The identifier will be included in PUBLISH packets that match this subscription, allowing the application to determine which subscription(s) matched the message.
Subscription identifiers must be in the range 1-268,435,455. A value of 0 means no identifier (default).
This is useful when:
- Multiple subscriptions have overlapping topic filters
- You need to route messages differently based on which subscription matched
- Implementing complex message routing logic
The subscription identifier is available in received messages via msg.Properties.SubscriptionIdentifier (a slice, as multiple subscriptions may match).
This option is ignored when using MQTT v3.1.1.
Example:
client.Subscribe("sensors/+/temp", 1, tempHandler,
mq.WithSubscriptionIdentifier(100))
type SubscribeOptions ¶
type SubscribeOptions struct {
NoLocal bool
RetainAsPublished bool
RetainHandling uint8
Persistence bool // Persistence enabled by default (must be manually set to true by default logic)
SubscriptionID int // MQTT v5.0: Subscription identifier (1-268435455, 0 = none).
UserProperties map[string]string // MQTT v5.0: User properties
}
SubscribeOptions holds configuration for a subscription.
type Token ¶
type Token interface {
// Wait blocks until the operation completes or the context is cancelled.
// It returns nil if successful, or the error (timeout/nack/connection loss).
Wait(ctx context.Context) error
// Done returns a channel that closes when the operation is complete.
// This allows the token to be used in select statements.
Done() <-chan struct{}
// Error returns the error if finished, mostly for use with Done().
Error() error
// Dropped returns true if the message was dropped due to a full internal buffer (QoS 0).
// This only occurs when Using QoS0LimitPolicyDrop.
Dropped() bool
}
Token represents an asynchronous operation that can be waited on.
Tokens are returned by Publish, Subscribe, and Unsubscribe operations. They provide both blocking (Wait) and non-blocking (Done + Error) patterns for handling operation completion.
Example (blocking wait):
token := client.Publish("topic", []byte("data"), mq.WithQoS(1))
if err := token.Wait(context.Background()); err != nil {
log.Printf("Operation failed: %v", err)
}
Example (non-blocking with select):
token := client.Publish("topic", []byte("data"), mq.WithQoS(1))
select {
case <-token.Done():
if err := token.Error(); err != nil {
log.Printf("Failed: %v", err)
}
case <-time.After(5 * time.Second):
log.Println("Timeout")
}
Example (with context timeout):
token := client.Subscribe("topic", 1, handler)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := token.Wait(ctx); err != nil {
log.Printf("Subscribe failed or timed out: %v", err)
}
type UnsubscribeOption ¶ added in v0.9.1
type UnsubscribeOption func(*UnsubscribeOptions)
UnsubscribeOption is a functional option for configuring an unsubscription.
func WithUnsubscribeUserProperty ¶ added in v0.9.1
func WithUnsubscribeUserProperty(key, value string) UnsubscribeOption
WithUnsubscribeUserProperty (MQTT v5.0) adds a user property to the unsubscribe packet. User properties are key-value pairs that can be used to send metadata to the server.
This option is ignored when using MQTT v3.1.1.
type UnsubscribeOptions ¶ added in v0.9.1
UnsubscribeOptions holds configuration for an unsubscription.
Source Files
¶
- auth.go
- auth_handler.go
- client.go
- client_persistence.go
- codes.go
- doc.go
- errors.go
- file_store.go
- logic.go
- logic_queue.go
- message.go
- middleware.go
- options.go
- options_limits.go
- properties.go
- properties_convert.go
- publish.go
- publish_alias.go
- qos.go
- reauthenticate.go
- requests.go
- session_store.go
- subscribe.go
- token.go
- topic.go
- topic_alias.go