mq

package module
v0.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2026 License: MIT Imports: 20 Imported by: 0

README

MQTT Client Library for Go

Go Reference Tests

A lightweight, idiomatic MQTT client library for Go with full support for v3.1.1 and v5.0 with a unified API, built using only the standard library.

Supported Features

  • MQTT v3.1.1 & v5.0: Full support for both protocol versions
    • Unified API: Write modern v5-style code (Properties, Reason Codes) that automatically degrades on v3 servers.
    • Auto-Negotiation: Automatically falls back to v3.1.1 if v5.0 is not supported by the server.
  • Auto-Reconnect: Built-in exponential backoff (see examples/auto_reconnect)
  • Persistence: Optional Durable Session Persistence (CleanSession=false) (see docs/persistence.md)
  • Transport: TCP and TLS directly, WebSockets via WithDialer (see examples/websocket)
  • Middleware/Interceptors: Intercept inbound/outbound messages for logging, metrics, or tracing.
  • Optimized: High throughput, low memory footprint
  • Thread-Safe: Safe for concurrent use
  • Context Awareness: context.Context support for cancellation/timeouts
  • MQTT v5.0 Features:
    • Message Properties: Content Type, User Properties (see examples/v5_properties), Request/Response (see examples/v5_request_response), Message Expiry
    • Connection Config: Session Expiry, Request Problem/Response Info, User Properties
    • Bandwidth: Topic Aliases (Client & Server) (see examples/topic_aliases)
    • Flow Control: Receive Maximum, Max Packet Size
    • Subscription: NoLocal, RetainAsPublished, RetainHandling, Shared Subscriptions

All features are tested with an extensive unit and integration test suite.

Performance (updated with mosquitto v2.1.0 test results)

The mq library is built for high-performance scenarios, consistently outperforming other Go MQTT clients in throughput and resource efficiency. Key highlights from our benchmarks include:

  • Throughput: Up to 3x faster than Paho v5 and 4x faster than Paho v3 in high-concurrency scenarios. Peak publish rates exceed 1.3 million messages per second with end-to-end delivery of over 710,000 msg/s (Mosquitto v2.1.0).
  • Reliability: 100% message delivery in all tests, while Paho clients frequently dropped messages or stalled due to internal deadlocks under high load.
  • Memory Efficiency: 10x lower memory allocation compared to Paho v5 (108 MiB vs 1,137 MiB in 50-worker small-packet workloads).
  • GC Overhead: Significantly fewer garbage collection cycles (e.g., 56 vs 723 for Paho v5), resulting in more predictable latencies and higher stability.

For a detailed comparative analysis, see the Performance Analysis Report.

Installation

go get github.com/gonzalop/mq

Quick Start

For a detailed walkthrough of connecting, publishing, and subscribing, see the Getting Started & Usage Guide.

package main

import (
    "context"
    "fmt"
    "log/slog"
    "os"
    "time"

    "github.com/gonzalop/mq"
)

func main() {
    // Connect to server
    client, err := mq.Dial(
        "tcp://localhost:1883",
        mq.WithClientID("my-client"),
        mq.WithKeepAlive(60*time.Second),
    )
    if err != nil {
        slog.Error("Failed to connect", "error", err)
        os.Exit(1)
    }
    defer client.Disconnect(context.Background())

    // Subscribe to a topic
    token := client.Subscribe("sensors/+/temperature", mq.AtLeastOnce, func(c *mq.Client, msg mq.Message) {
        fmt.Printf("Topic: %s, Payload: %s\n", msg.Topic, string(msg.Payload))
    })

    // Wait for subscription
    if err := token.Wait(context.Background()); err != nil {
        slog.Error("Subscription failed", "error", err)
        os.Exit(1)
    }

    // Publish a message
    pubToken := client.Publish("sensors/living-room/temperature", []byte("22.5"), mq.WithQoS(mq.AtLeastOnce))
    pubToken.Wait(context.Background())

    time.Sleep(2 * time.Second)
}

Middleware / Interceptors

The library supports an interceptor pattern (middleware) for both incoming and outgoing messages. This is perfect for cross-cutting concerns like logging, metrics, or OpenTelemetry tracing.

// Incoming message logging
client, _ := mq.Dial(uri, mq.WithHandlerInterceptor(func(next mq.MessageHandler) mq.MessageHandler {
    return func(c *mq.Client, m mq.Message) {
        log.Printf("Received: %s", m.Topic)
        next(c, m)
    }
}))

// Outgoing message tracing
client, _ := mq.Dial(uri, mq.WithPublishInterceptor(func(next mq.PublishFunc) mq.PublishFunc {
    return func(topic string, payload []byte, opts ...mq.PublishOption) mq.Token {
        // Add tracing or modify payload
        return next(topic, payload, opts...)
    }
}))

Documentation

  • Getting Started: Detailed guide on Connecting, Publishing, Subscribing, and Options.
  • Best Practices: Production-grade configuration guide (Security, Resource Limits, Session Management). A MUST-read.
  • Troubleshooting: Solutions for common issues like client ID thrashing, zombie messages, and flow control.
  • Persistence: Detailed guide on configuring durable sessions across restarts.
  • Internals: Deep dive into the library's concurrency model.
  • Compliance: MQTT 3.1.1 and MQTT 5.0 compliance reports.

Examples

See the examples directory for various use cases:

Contributing

Contributions are welcome! Please see CONTRIBUTING.md for guidelines on development, testing, and submitting pull requests.

Acknowledgements

This library was developed with the assistance of Antigravity, powered by Gemini and Claude.

License

This software is under the MIT License. See LICENSE file for details.

Documentation

Overview

Package mq provides a lightweight, idiomatic MQTT v5.0 and v3.1.1 client library for Go.

The library is designed with zero external dependencies (only Go standard library) and provides a clean, functional options-based API for connecting to MQTT servers, publishing messages, and subscribing to topics.

Features

  • Full MQTT v5.0 and v3.1.1 support
  • (v5.0) User Properties & Packet Properties
  • (v5.0) Topic Aliases (auto-managed)
  • (v5.0) Request/Response pattern support
  • (v5.0) Session & Message Expiry
  • (v5.0) Shared Subscriptions
  • (v5.0) Reason Codes & Enhanced Error Handling
  • TLS/SSL encrypted connections
  • Automatic reconnection with exponential backoff
  • Clean, idiomatic Go API with functional options
  • Context-based cancellation and timeouts
  • Zero external dependencies (main library)

Unified API Philosophy

The library exposes a single, unified API that embraces modern MQTT v5.0 concepts (Properties, Reason Codes, Session Expiry). When connecting to an MQTT v3.1.1 server, these v5-specific features are handled gracefully—they are simply ignored during packet encoding. This allows you to write code once using modern idioms while maintaining compatibility with older servers.

Quick Start

Connect to a server and publish a message:

client, err := mq.Dial("tcp://localhost:1883",
    mq.WithClientID("my-client"),
    mq.WithProtocolVersion(mq.ProtocolV50)) // Use MQTT v5.0
if err != nil {
    log.Fatal(err)
}
defer client.Disconnect(context.Background())

token := client.Publish("sensors/temperature", []byte("22.5"), mq.WithQoS(1))
err = token.Wait(context.Background())  // 'select' also supported, see further down

Subscribe to a topic:

client.Subscribe("sensors/+/temperature", mq.AtLeastOnce,
    func(c *mq.Client, msg mq.Message) {
        fmt.Printf("%s: %s\n", msg.Topic, string(msg.Payload))
    })

Connection Options

The Dial and DialContext functions accept various options to configure the client:

  • WithProtocolVersion(v) - Set MQTT version (ProtocolV50 or ProtocolV311)
  • WithAutoProtocolVersion(bool) - Enable automatic protocol version negotiation (default: true)
  • WithClientID(id) - Set the MQTT client identifier
  • WithCredentials(user, pass) - Set username and password
  • WithKeepAlive(duration) - Set keepalive interval (default: 60s)
  • WithCleanSession(bool) - Set clean start/session flag
  • WithSessionExpiryInterval(secs) - Set session expiry (v5.0)
  • WithConnectUserProperties(map) - Set user properties for CONNECT (v5.0)
  • WithAutoReconnect(bool) - Enable auto-reconnect (default: true)
  • WithTLS(config) - Enable TLS encryption
  • WithWill(topic, payload, qos, retained) - Set Last Will and Testament
  • WithOutgoingQueueSize(int) - Set internal outgoing buffer size
  • WithIncomingQueueSize(int) - Set internal incoming buffer size
  • WithQoS0LimitPolicy(policy) - Set reliability policy for QoS 0
  • WithHandlerInterceptor(interceptor) - Add an interceptor for incoming messages
  • WithPublishInterceptor(interceptor) - Add an interceptor for outgoing messages

Interceptors (Middleware)

The library supports an interceptor pattern (middleware) for both incoming and outgoing messages. This is useful for cross-cutting concerns like logging, metrics, tracing (OpenTelemetry), or message auditing.

Handler Interceptors wrap message handlers for incoming messages:

loggingInterceptor := func(next mq.MessageHandler) mq.MessageHandler {
    return func(c *mq.Client, m mq.Message) {
        log.Printf("Received: %s", m.Topic)
        next(c, m)
    }
}

client, _ := mq.Dial(uri, mq.WithHandlerInterceptor(loggingInterceptor))

Publish Interceptors wrap the Publish call for outgoing messages:

tracingInterceptor := func(next mq.PublishFunc) mq.PublishFunc {
    return func(topic string, payload []byte, opts ...mq.PublishOption) mq.Token {
        // Inject tracing headers into opts here
        return next(topic, payload, opts...)
    }
}

client, _ := mq.Dial(uri, mq.WithPublishInterceptor(tracingInterceptor))

TLS Connections

The library supports TLS/SSL encrypted connections:

client, err := mq.Dial("tls://server:8883",
    mq.WithClientID("secure-client"),
    mq.WithTLS(&tls.Config{
        InsecureSkipVerify: false,
    }))

Supported URL schemes: tcp://, mqtt://, tls://, ssl://, mqtts://

Quality of Service

The library supports all three MQTT QoS levels:

  • QoS 0 (mq.AtMostOnce): At most once delivery (fire and forget)
  • QoS 1 (mq.AtLeastOnce): At least once delivery (acknowledged)
  • QoS 2 (mq.ExactlyOnce): Exactly once delivery (assured)

Example:

// Using named constants (recommended)
client.Publish("topic", []byte("data"), mq.WithQoS(mq.AtLeastOnce))

// Using numeric values
client.Publish("topic", []byte("data"), mq.WithQoS(1))

Wildcard Subscriptions

MQTT supports two wildcard characters in topic filters:

  • '+' matches a single level (e.g., "sensors/+/temperature")
  • '#' matches multiple levels (e.g., "sensors/#")

Example:

// Subscribe to all temperature sensors
client.Subscribe("sensors/+/temperature", mq.AtLeastOnce, handler)

// Subscribe to all sensor data
client.Subscribe("sensors/#", mq.AtMostOnce, handler)

MQTT v5.0 Properties

MQTT v5.0 introduces "Properties" that can be attached to packets. This library provides a clean API for using common properties:

client.Publish("sensors/temp", payload,
    mq.WithContentType("application/json"),
    mq.WithUserProperty("sensor-id", "temp-01"),
    mq.WithMessageExpiry(3600)) // Expire in 1 hour

Supported properties include:

  • ContentType: Specifies the MIME type of the payload
  • MessageExpiry: How long the message should be kept by the server
  • UserProperties: Custom key-value pairs (metadata)
  • ConnectionUserProperties: Custom metadata received from the server on connect (v5.0)
  • ResponseTopic & CorrelationData: For request/response patterns

Topic Aliases

Topic Aliases (v5.0) allow reducing bandwidth by using a short numeric ID instead of the full topic string for repeated publications.

// Enable topic alias for this publication
client.Publish("very/long/topic/name/for/bandwidth/saving", data,
    mq.WithAlias())

The library automatically manages alias assignment and mapping.

Subscription Options (v5.0)

MQTT v5.0 adds options to control subscription behavior:

  • WithNoLocal: Don't receive messages you published yourself
  • WithRetainAsPublished: Keep the original retain flag from the publisher
  • WithRetainHandling: Control when the server sends retained messages
  • WithSubscriptionIdentifier: Set a numeric identifier for the subscription
  • WithSubscribeUserProperty: Add custom metadata to the subscription

Example:

client.Subscribe("chat/room", mq.AtLeastOnce, handler,
    mq.WithNoLocal(true))

Client-side Session Persistence

The library supports pluggable session persistence to save pending messages (QoS 1 & 2) and subscriptions across restarts.

store, _ := mq.NewFileStore("/path/to/persist", "client-id")
client, _ := mq.Dial(server,
    mq.WithClientID("client-id"),
    mq.WithCleanSession(false),
    mq.WithSessionStore(store),
    // persistent subscription
    mq.WithSubscription("topic", handler),
)

Error Handling

Operations return a Token that can be used for both blocking and non-blocking error handling. In MQTT v5.0, errors often include Reason Codes.

// Blocking with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := token.Wait(ctx); err != nil {
    // Check for specific reason codes (v5.0) using errors.Is
    if errors.Is(err, mq.ReasonCodeUnspecifiedError) {
        log.Printf("Server rejected operation: %v", err)
    }
}

// Non-blocking with select
select {
case <-token.Done():
    if err := token.Error(); err != nil {
        log.Printf("Failed: %v", err)
    }
case <-time.After(5 * time.Second):
    log.Println("Timeout")
}

// Connection can be closed with a specific reason code and properties (MQTT v5.0):

expiry := uint32(3600)
client.Disconnect(ctx,
    mq.WithReason(mq.ReasonCodeNormalDisconnect),
    mq.WithDisconnectProperties(&mq.Properties{
        SessionExpiryInterval: &expiry,
        ReasonString:          "Shutting down",
    }),
)

The client handles reconnection automatically unless configured otherwise.

Index

Examples

Constants

View Source
const (
	// ProtocolV311 is MQTT version 3.1.1
	ProtocolV311 uint8 = 4
	// ProtocolV50 is MQTT version 5.0 (default)
	ProtocolV50 uint8 = 5
)
View Source
const (
	PayloadFormatBytes uint8 = 0
	PayloadFormatUTF8  uint8 = 1
)

Payload format indicators

View Source
const (
	// DefaultMaxTopicLength is the maximum length of an MQTT topic (2 bytes for length prefix)
	DefaultMaxTopicLength = 65535

	// DefaultMaxPayloadSize is the maximum size of an MQTT message payload (256MB)
	DefaultMaxPayloadSize = 268435455 // 256MB - 1

	// DefaultMaxIncomingPacket is the maximum size of an incoming MQTT packet
	DefaultMaxIncomingPacket = 268435455 // 256MB - 1

	// MaxClientIDLength is the recommended maximum client ID length
	MaxClientIDLength = 23
)

MQTT specification limits (defaults when not configured)

Variables

View Source
var (
	// ErrConnectionRefused is returned when the server rejects the connection.
	// You can unwrap this error to find the specific reason if available.
	ErrConnectionRefused = errors.New("connection refused")

	// Specific connection refusal reasons (v3.1.1)
	ErrUnacceptableProtocolVersion = errors.New("unacceptable protocol version")
	ErrIdentifierRejected          = errors.New("identifier rejected")
	ErrServerUnavailable           = errors.New("server unavailable")
	ErrBadUsernameOrPassword       = errors.New("bad username or password")
	ErrNotAuthorized               = errors.New("not authorized")

	// ErrSubscriptionFailed is returned when the server rejects a subscription.
	ErrSubscriptionFailed = errors.New("subscription failed")

	// ErrClientDisconnected is returned when an operation is cancelled because
	// the client was disconnected or stopped.
	ErrClientDisconnected = errors.New("client disconnected")
)

Standard errors returned by the client

Functions

func MatchTopic added in v0.9.1

func MatchTopic(filter, topic string) bool

MatchTopic checks if a topic matches a topic filter with MQTT wildcards. It follows MQTT-4.7 rules for wildcard matching.

Supports: - '+' matches exactly one topic level - '#' matches multiple topic levels (must be the last character in the filter)

Example
filter := "sensors/+/temperature"
topic1 := "sensors/living-room/temperature"
topic2 := "sensors/kitchen/humidity"

fmt.Printf("%s matches %s: %v\n", topic1, filter, MatchTopic(filter, topic1))
fmt.Printf("%s matches %s: %v\n", topic2, filter, MatchTopic(filter, topic2))

filterHash := "sensors/#"
topic3 := "sensors/basement/temperature/current"
fmt.Printf("%s matches %s: %v\n", topic3, filterHash, MatchTopic(filterHash, topic3))
Output:

sensors/living-room/temperature matches sensors/+/temperature: true
sensors/kitchen/humidity matches sensors/+/temperature: false
sensors/basement/temperature/current matches sensors/#: true

Types

type Authenticator

type Authenticator interface {
	// Method returns the authentication method name.
	//
	// This is sent in the CONNECT packet's AuthenticationMethod property.
	// Common values: "SCRAM-SHA-1", "SCRAM-SHA-256", "OAUTH2", "KERBEROS".
	//
	// The method name should match what the server expects.
	Method() string

	// InitialData returns the initial authentication data to send in CONNECT.
	//
	// This data is included in the CONNECT packet's AuthenticationData property.
	// Return nil or empty slice if no initial data is needed.
	//
	// For SCRAM, this would be the client-first-message.
	// For OAuth, this might be an access token.
	InitialData() ([]byte, error)

	// HandleChallenge processes a challenge from the server and returns response data.
	//
	// This method is called when the client receives an AUTH packet from the server
	// during the authentication exchange. The reasonCode will typically be:
	//   - 0x18 (Continue authentication) - Server is continuing the exchange
	//   - 0x00 (Success) - Authentication completed successfully
	//
	// Return the response data to send back to the server in an AUTH packet.
	// Return an error if the challenge cannot be processed or authentication fails.
	//
	// IMPORTANT: This method is called synchronously in the packet processing loop.
	// It should complete quickly (< 100ms) to avoid blocking other packets.
	//
	// This is especially critical during re-authentication, where a slow HandleChallenge
	// will block processing of PUBLISH, PUBACK, and other packets, potentially causing
	// timeouts or degraded performance.
	//
	// If you need to perform expensive operations (network calls, heavy crypto), consider:
	//   - Pre-computing data in InitialData()
	//   - Caching results
	//   - Using fast cryptographic libraries
	//
	// For most authentication methods (SCRAM, token-based), this is not an issue.
	HandleChallenge(challengeData []byte, reasonCode uint8) ([]byte, error)

	// Complete is called when authentication succeeds (CONNACK received).
	//
	// This allows the authenticator to perform any cleanup, store tokens, or
	// finalize the authentication state.
	//
	// Return an error if post-authentication setup fails. This will be logged
	// but won't affect the connection (CONNACK was already successful).
	Complete() error
}

Authenticator handles the authentication exchange for a specific authentication method.

Users implement this interface to provide custom authentication logic for MQTT v5.0 Enhanced Authentication (AUTH packet flow). This enables support for challenge/response mechanisms such as SCRAM, OAuth, Kerberos, or custom methods.

The authentication flow:

  1. InitialData() is called to get data for CONNECT packet
  2. HandleChallenge() is called for each AUTH packet from server
  3. Complete() is called when CONNACK is received (authentication succeeded)

Example implementation (simple token):

type TokenAuth struct {
    token string
}

func (t *TokenAuth) Method() string {
    return "TOKEN"
}

func (t *TokenAuth) InitialData() ([]byte, error) {
    return []byte(t.token), nil
}

func (t *TokenAuth) HandleChallenge(data []byte, code uint8) ([]byte, error) {
    return nil, fmt.Errorf("unexpected challenge")
}

func (t *TokenAuth) Complete() error {
    return nil
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents an MQTT client connection.

func Dial

func Dial(server string, opts ...Option) (*Client, error)

Dial establishes a connection to an MQTT server and returns a Client.

It is a wrapper around DialContext that uses the configured connection timeout (see WithConnectTimeout) to control the initial handshake.

The server parameter specifies the server address with scheme and port. Supported schemes:

  • tcp:// or mqtt:// - Unencrypted connection (default port 1883)
  • tls://, ssl://, or mqtts:// - TLS encrypted connection (default port 8883)

Options can be provided to configure the client behavior. Common options include WithClientID, WithCredentials, WithKeepAlive, WithTLS, and WithAutoReconnect.

The function performs the MQTT handshake and starts background goroutines for reading, writing, and managing the connection. If AutoReconnect is enabled (default: true), the client will automatically reconnect on connection loss.

Example (basic connection):

client, err := mq.Dial("tcp://localhost:1883",
    mq.WithClientID("my-client"))
if err != nil {
    log.Fatal(err)
}
defer client.Disconnect(context.Background())

Example (with authentication):

client, err := mq.Dial("tcp://server:1883",
    mq.WithClientID("secure-client"),
    mq.WithCredentials("username", "password"))

Example (TLS connection):

client, err := mq.Dial("tls://server:8883",
    mq.WithClientID("tls-client"),
    mq.WithTLS(&tls.Config{
        InsecureSkipVerify: false,
    }))

Example (all options):

client, err := mq.Dial("tcp://server:1883",
    mq.WithClientID("full-client"),
    mq.WithCredentials("user", "pass"),
    mq.WithKeepAlive(60*time.Second),
    mq.WithCleanSession(true),
    mq.WithAutoReconnect(true),
    mq.WithConnectTimeout(30*time.Second),
    mq.WithWill("status/offline", []byte("disconnected"), 1, true))

func DialContext added in v0.9.1

func DialContext(ctx context.Context, server string, opts ...Option) (*Client, error)

DialContext establishes a connection to an MQTT server with a context and returns a Client.

The context is used to control the initial connection establishment, including the network dial, TLS handshake, and MQTT CONNECT handshake. If the context is cancelled or expires before the handshake completes, DialContext returns an error.

When using DialContext, the WithConnectTimeout option is ignored for the initial connection (as the provided context takes precedence), but it is still used for subsequent automatic reconnection attempts.

Once the initial connection is established, the context's expiration has no effect on the ongoing connection or background maintenance.

Example:

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

client, err := mq.DialContext(ctx, "tcp://localhost:1883",
    mq.WithClientID("my-client"))

func (*Client) AssignedClientID

func (c *Client) AssignedClientID() string

AssignedClientID returns the client ID assigned by the server.

When connecting with an empty client ID, the server may assign a unique identifier and return it in the CONNACK packet. This method returns that assigned ID if one was provided.

This is only populated for MQTT v5.0 connections where the client connected with an empty client ID and the server assigned one. For v3.1.1 connections or when the client provided their own ID, this returns an empty string.

The assigned ID can be useful for:

  • Logging and debugging
  • Reconnection with the same session
  • Understanding what ID the server chose

Example:

client, _ := mq.Dial("tcp://localhost:1883",
    mq.WithClientID(""),  // Empty - let server assign
    mq.WithProtocolVersion(mq.ProtocolV50))

assignedID := client.AssignedClientID()
if assignedID != "" {
    fmt.Printf("Server assigned ID: %s\n", assignedID)
}

func (*Client) ConnectionUserProperties added in v0.9.3

func (c *Client) ConnectionUserProperties() map[string]string

ConnectionUserProperties returns the User Properties received from the server in the CONNACK packet. These are application-specific key-value pairs provided by the server during the connection handshake.

This is only populated for MQTT v5.0 connections. Returns a copy of the map to prevent concurrent modification.

func (*Client) Disconnect

func (c *Client) Disconnect(ctx context.Context, opts ...DisconnectOption) error

Disconnect gracefully disconnects from the server.

It sends a DISCONNECT packet to the server, stops all background goroutines, and closes the network connection. The function blocks until all goroutines have exited or the context is cancelled.

If AutoReconnect is enabled, it will be disabled after calling Disconnect. To reconnect, create a new client with Dial.

If the client is connected with MQTT v5.0, you can provide options such as WithReason to specify the reason code. These options are ignored when using MQTT v3.1.1.

Example:

// Normal disconnect (v3.1.1 or v5.0)
client.Disconnect(context.Background())

// Disconnect with reason (v5.0 only)
client.Disconnect(context.Background(), mq.WithReason(mq.ReasonCodeDisconnectWithWill))

func (*Client) GetStats added in v0.9.2

func (c *Client) GetStats() ClientStats

GetStats returns the current client statistics.

func (*Client) IsConnected

func (c *Client) IsConnected() bool

IsConnected returns true if the client is currently connected to the server. This method is thread-safe.

func (*Client) Publish

func (c *Client) Publish(topic string, payload []byte, opts ...PublishOption) Token

Publish publishes a message to the specified topic.

The function returns a Token that can be used to wait for completion. For QoS 0, the token completes immediately after sending. For QoS 1 and 2, the token completes after receiving the appropriate acknowledgment from the server.

Example (QoS 0 - fire and forget):

client.Publish("sensors/temp", []byte("22.5"), mq.WithQoS(0))

Example (QoS 1 - wait for acknowledgment):

token := client.Publish("sensors/temp", []byte("22.5"), mq.WithQoS(1))
if err := token.Wait(context.Background()); err != nil {
    log.Printf("Publish failed: %v", err)
}

Example (retained message):

client.Publish("status/online", []byte("true"),
    mq.WithQoS(1),
    mq.WithRetain(true))

Example (QoS 2 with timeout):

token := client.Publish("critical/alert", []byte("fire"),
    mq.WithQoS(2))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := token.Wait(ctx); err != nil {
    log.Printf("Publish timeout or failed: %v", err)
}

func (*Client) Reauthenticate

func (c *Client) Reauthenticate(ctx context.Context) error

Reauthenticate initiates re-authentication with the server (MQTT v5.0).

This sends an AUTH packet with reason code 0x19 (Re-authenticate) to start a new authentication exchange. The authenticator's HandleChallenge method will be called for each challenge from the server.

Re-authentication is useful for:

  • Refreshing expired tokens
  • Rotating credentials
  • Periodic security validation

Note: The Client and Server can continue to send and receive other Control Packets (such as PUBLISH) during the re-authentication exchange. The connection remains fully functional.

This method returns immediately. Authentication happens asynchronously. Use the authenticator's Complete method to know when it succeeds.

Returns an error if:

  • Not using MQTT v5.0
  • No authenticator configured
  • Not connected

Example:

// Refresh authentication periodically
ticker := time.NewTicker(30 * time.Minute)
go func() {
    for range ticker.C {
        if err := client.Reauthenticate(context.Background()); err != nil {
            log.Printf("Re-authentication failed: %v", err)
        }
    }
}()

func (*Client) ResponseInformation

func (c *Client) ResponseInformation() string

ResponseInformation returns the response information string provided by the server.

In MQTT v5.0, the server can provide a ResponseInformation string in the CONNACK packet. This string can be used by the client as the basis for creating response topics in request/response messaging patterns.

This is typically used in multi-tenant or managed cloud environments where the server wants to control topic naming conventions and ensure proper namespace isolation between clients.

This method returns the server's response information if one was provided, or an empty string if:

  • The connection is using MQTT v3.1.1 (property not supported)
  • The server did not provide response information
  • Not yet connected

Example usage:

client, _ := mq.Dial("tcp://localhost:1883",
    mq.WithProtocolVersion(mq.ProtocolV50))

if respInfo := client.ResponseInformation(); respInfo != "" {
    // Use server's suggested prefix for response topics
    responseTopic := respInfo + "my-responses"
    client.Publish("requests/data", payload,
        mq.WithResponseTopic(responseTopic))
}

func (*Client) ServerCapabilities

func (c *Client) ServerCapabilities() ServerCapabilities

ServerCapabilities returns the server capabilities received in the CONNACK packet. This is only populated for MQTT v5.0 connections. For v3.1.1 connections, default values are returned.

func (*Client) ServerKeepAlive

func (c *Client) ServerKeepAlive() uint16

ServerKeepAlive returns the keepalive interval (in seconds) that the server wants the client to use.

In MQTT v5.0, the server can override the client's requested keepalive interval by sending a Server Keep Alive property in the CONNACK packet. When this happens, the client MUST use the server's value instead of its requested value.

This method returns the server's keepalive value if one was provided, or 0 if:

  • The connection is using MQTT v3.1.1 (property not supported)
  • The server did not override the keepalive (accepted client's request)
  • Not yet connected

The library automatically adjusts its internal keepalive timer when the server overrides the value, so users typically don't need to take action. This method is primarily useful for logging and debugging.

Example:

client, _ := mq.Dial("tcp://localhost:1883",
    mq.WithKeepAlive(60*time.Second),
    mq.WithProtocolVersion(mq.ProtocolV50))

if serverKA := client.ServerKeepAlive(); serverKA > 0 {
    fmt.Printf("Server overrode keepalive to %d seconds\n", serverKA)
}

func (*Client) ServerReference

func (c *Client) ServerReference() string

ServerReference returns the server reference URI provided by the server.

In MQTT v5.0, the server can provide a ServerReference string in the CONNACK or DISCONNECT packet to tell the client to connect to a different server. This is used for:

  • Load balancing: Distribute clients across multiple servers
  • Maintenance: Move clients before server shutdown
  • Geographic routing: Direct clients to nearest server
  • Failover: Redirect to backup server

IMPORTANT: The library does NOT automatically redirect to the referenced server. Users must check this value and manually reconnect if desired or use the WithOnServerRedirect option.

This method returns the server's reference URI if one was provided, or an empty string if:

  • The connection is using MQTT v3.1.1 (property not supported)
  • The server did not provide a server reference
  • Not yet connected

Example usage with callback (recommended):

client, _ := mq.Dial("tcp://server-a.example.com:1883",
    mq.WithOnServerRedirect(func(newServer string) {
        log.Printf("Server suggests: %s", newServer)
        // Decide whether to reconnect
    }))

Example usage with polling:

client, _ := mq.Dial("tcp://server-a.example.com:1883",
    mq.WithProtocolVersion(mq.ProtocolV50))

if ref := client.ServerReference(); ref != "" {
    fmt.Printf("Server suggests redirecting to: %s\n", ref)
    // Application decides whether to reconnect
    client.Disconnect(context.Background())
    newClient, _ := mq.Dial(ref, mq.WithProtocolVersion(mq.ProtocolV50))
}

func (*Client) SessionExpiryInterval

func (c *Client) SessionExpiryInterval() uint32

SessionExpiryInterval returns the session expiry interval (in seconds) that the server is using for this connection.

For MQTT v5.0, this returns the actual value negotiated with the server. The server can override the client's requested value.

For MQTT v3.1.1:

  • If CleanSession is false (persistent), this returns 0xFFFFFFFF (MaxUint32), reflecting that the session does not expire on disconnect.
  • If CleanSession is true, this returns 0.

Returns 0 if session expires immediately on disconnect or not yet connected.

Example:

client, _ := mq.Dial("tcp://localhost:1883",
    mq.WithProtocolVersion(mq.ProtocolV50),
    mq.WithSessionExpiryInterval(3600))

actual := client.SessionExpiryInterval()
if actual != 3600 {
    fmt.Printf("Server overrode session expiry to %d seconds\n", actual)
}

func (*Client) Subscribe

func (c *Client) Subscribe(topic string, qos QoS, handler MessageHandler, opts ...SubscribeOption) Token

Subscribe subscribes to a topic with the specified QoS level.

The handler function is called for each message received on topics matching the subscription filter. If a message matches multiple subscription filters, the handlers for all matching subscriptions will be called.

The handler is called in a separate goroutine, so it should not block for long periods.

Topic filters support MQTT wildcards:

  • '+' matches a single level (e.g., "sensors/+/temperature")
  • '#' matches multiple levels (e.g., "sensors/#")

The function returns a Token that completes when the subscription is acknowledged by the server.

For persistent sessions (CleanSession=false), it is recommended to use the mq.WithSubscription option during Dial instead. This ensures handlers are automatically re-registered if the session is lost and the client must re-subscribe.

Example (simple subscription):

token := client.Subscribe("sensors/temperature", 1,
    func(c *mq.Client, msg mq.Message) {
        fmt.Printf("Temperature: %s\n", string(msg.Payload))
    })
if err := token.Wait(context.Background()); err != nil {
    log.Fatal(err)
}

Example with options (MQTT v5.0):

client.Subscribe("chat/room", 1, handler, mq.WithNoLocal(true))

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(topic string, opts ...UnsubscribeOption) Token

Unsubscribe unsubscribes from a single topic.

After unsubscribing, the client will no longer receive messages on the specified topic. The function returns a Token that completes when the unsubscription is acknowledged by the server.

This method supports options for MQTT v5.0 features like User Properties.

Example:

token := client.Unsubscribe("sensors/temperature")
token.Wait(context.Background())

Example with options (MQTT v5.0):

client.Unsubscribe("logs", mq.WithUnsubscribeUserProperty("reason", "done"))

type ClientStats added in v0.9.2

type ClientStats struct {
	PacketsSent     uint64
	PacketsReceived uint64
	BytesSent       uint64
	BytesReceived   uint64
	ReconnectCount  uint64
	Connected       bool
}

ClientStats holds connection and throughput statistics.

type ContextDialer

type ContextDialer interface {
	DialContext(ctx context.Context, network, addr string) (net.Conn, error)
}

ContextDialer is an interface for custom network dialing logic. It matches the signature of net.Dialer.DialContext.

type DialFunc

type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)

DialFunc is a helper to convert a function to the ContextDialer interface.

func (DialFunc) DialContext

func (f DialFunc) DialContext(ctx context.Context, network, addr string) (net.Conn, error)

DialContext implements ContextDialer.

type DisconnectError added in v0.9.3

type DisconnectError struct {
	ReasonCode            ReasonCode
	ReasonString          string
	SessionExpiryInterval uint32            // 0 if not set
	ServerReference       string            // Empty if not set
	UserProperties        map[string]string // Nil if not set
}

DisconnectError represents a DISCONNECT packet received from the server, containing potential MQTT v5.0 properties.

func (*DisconnectError) Error added in v0.9.3

func (e *DisconnectError) Error() string

func (*DisconnectError) Is added in v0.9.3

func (e *DisconnectError) Is(target error) bool

type DisconnectOption

type DisconnectOption func(*DisconnectOptions)

DisconnectOption is a functional option for configuring a disconnection.

func WithDisconnectProperties

func WithDisconnectProperties(props *Properties) DisconnectOption

WithDisconnectProperties sets the MQTT v5.0 properties for the DISCONNECT packet.

This can be used to set:

  • Session Expiry Interval (to update expiry on disconnect)
  • Reason String (diagnostic information)
  • User Properties (custom metadata)

This option is ignored when using MQTT v3.1.1.

func WithReason

func WithReason(code ReasonCode) DisconnectOption

WithReason sets the reason code for the DISCONNECT packet (MQTT v5.0). Common codes include:

  • 0x00: Normal Disconnect (default)
  • 0x04: Disconnect with Will Message

This option is ignored when using MQTT v3.1.1.

type DisconnectOptions

type DisconnectOptions struct {
	ReasonCode ReasonCode
	Properties *Properties
}

DisconnectOptions holds configuration for a disconnection.

type FileStore

type FileStore struct {
	// contains filtered or unexported fields
}

FileStore implements SessionStore using JSON files on disk. Each client ID gets its own directory containing separate files for pending publishes, subscriptions, and received QoS 2 packet IDs.

File organization:

baseDir/
  clientID/
    pending_1.json
    pending_2.json
    subscriptions.json
    qos2_received.json

This implementation is synchronous - all operations block until complete. For async/batched writes, users can implement a custom SessionStore.

func NewFileStore

func NewFileStore(baseDir, clientID string, opts ...FileStoreOption) (*FileStore, error)

NewFileStore creates a file-based session store for the specified client ID.

The baseDir will contain a subdirectory for each client ID, allowing multiple clients to share the same base directory.

Example:

store, err := mq.NewFileStore("/var/lib/mqtt", "sensor-1")
if err != nil {
    log.Fatal(err)
}

client, err := mq.Dial("tcp://localhost:1883",
    mq.WithClientID("sensor-1"),
    mq.WithCleanSession(false),
    mq.WithSessionStore(store))

func (*FileStore) Clear

func (f *FileStore) Clear() error

Clear removes all session state from disk.

func (*FileStore) ClearPendingPublishes

func (f *FileStore) ClearPendingPublishes() error

ClearPendingPublishes removes all pending publishes from disk.

func (*FileStore) ClearReceivedQoS2

func (f *FileStore) ClearReceivedQoS2() error

ClearReceivedQoS2 removes all received QoS 2 packet IDs.

func (*FileStore) ClientID

func (f *FileStore) ClientID() string

ClientID returns the client ID this store is bound to. This can be used to validate that the store matches the client.

func (*FileStore) DeletePendingPublish

func (f *FileStore) DeletePendingPublish(packetID uint16) error

DeletePendingPublish removes a pending publish from disk.

func (*FileStore) DeleteReceivedQoS2

func (f *FileStore) DeleteReceivedQoS2(packetID uint16) error

DeleteReceivedQoS2 removes a QoS 2 packet ID.

func (*FileStore) DeleteSubscription

func (f *FileStore) DeleteSubscription(topic string) error

DeleteSubscription removes a subscription from disk.

func (*FileStore) LoadPendingPublishes

func (f *FileStore) LoadPendingPublishes() (map[uint16]*PersistedPublish, error)

LoadPendingPublishes loads all pending publishes from disk.

func (*FileStore) LoadReceivedQoS2

func (f *FileStore) LoadReceivedQoS2() (map[uint16]struct{}, error)

LoadReceivedQoS2 loads all received QoS 2 packet IDs.

func (*FileStore) LoadSubscriptions

func (f *FileStore) LoadSubscriptions() (map[string]*PersistedSubscription, error)

LoadSubscriptions loads all subscriptions from disk.

func (*FileStore) SavePendingPublish

func (f *FileStore) SavePendingPublish(packetID uint16, pub *PersistedPublish) error

SavePendingPublish stores a pending publish to disk.

func (*FileStore) SaveReceivedQoS2

func (f *FileStore) SaveReceivedQoS2(packetID uint16) error

SaveReceivedQoS2 marks a QoS 2 packet ID as received.

func (*FileStore) SaveSubscription

func (f *FileStore) SaveSubscription(topic string, sub *PersistedSubscription) error

SaveSubscription stores a subscription to disk.

type FileStoreOption

type FileStoreOption func(*fileStoreConfig)

FileStoreOption configures a FileStore.

func WithPermissions

func WithPermissions(perm os.FileMode) FileStoreOption

WithPermissions sets the file permissions for stored files. Default is 0600 (owner read/write, group/others none).

Example:

store, _ := mq.NewFileStore("/var/lib/mqtt", "sensor-1",
    mq.WithPermissions(0600)) // Owner read/write only

type HandlerInterceptor added in v0.9.4

type HandlerInterceptor func(MessageHandler) MessageHandler

HandlerInterceptor is a function that wraps a MessageHandler. It allows cross-cutting concerns like logging, metrics, or tracing to be applied to all message processing.

Example (Logging):

func LoggingInterceptor(next mq.MessageHandler) mq.MessageHandler {
    return func(client *mq.Client, msg mq.Message) {
        log.Printf("Received message on topic %s", msg.Topic)
        next(client, msg)
    }
}

type LimitPolicy added in v0.9.2

type LimitPolicy int

LimitPolicy determines how the client enforces limits (like ReceiveMaximum).

const (
	// LimitPolicyIgnore logs a warning once per connection but continues processing.
	LimitPolicyIgnore LimitPolicy = iota

	// LimitPolicyStrict sends a DISCONNECT with Reason Code 0x93 (Receive Maximum exceeded)
	// when the limit is reached.
	//
	// Note: Auto-reconnect should be disabled or carefully managed when using this policy,
	// as a misbehaving server could cause an infinite loop of connect -> overflow -> disconnect.
	LimitPolicyStrict
)

type Message

type Message struct {
	// Topic the message was published to
	Topic string

	// Message payload
	Payload []byte

	// Quality of Service level
	QoS QoS

	// Retained message flag
	Retained bool

	// Duplicate delivery flag
	Duplicate bool

	// MQTT v5.0 properties.
	// This field is nil for MQTT v3.1.1 connections or when no properties are present.
	Properties *Properties
}

Message represents an MQTT message received on a subscribed topic.

This struct is designed to be compatible with both MQTT v3.1.1 and v5.0.

The message is passed to subscription handlers and contains all relevant information about the received message including topic, payload, QoS level, and flags.

type MessageHandler

type MessageHandler func(*Client, Message)

MessageHandler is called when a message is received on a subscribed topic.

type MqttError

type MqttError struct {
	ReasonCode ReasonCode
	Message    string
	Parent     error
}

MqttError represents an error returned by the MQTT server, including the MQTT v5.0 reason code.

func (*MqttError) Error

func (e *MqttError) Error() string

func (*MqttError) Is added in v0.9.1

func (e *MqttError) Is(target error) bool

Is implements the errors.Is interface, allowing checks against ReasonCode constants.

func (*MqttError) Unwrap

func (e *MqttError) Unwrap() error

type Option

type Option func(*clientOptions)

Option is a functional option for configuring the client.

func WithAuthenticator

func WithAuthenticator(auth Authenticator) Option

WithAuthenticator sets the authenticator for enhanced authentication (MQTT v5.0).

Enhanced authentication allows challenge/response authentication mechanisms such as SCRAM, OAuth, Kerberos, or custom methods. The authenticator handles the authentication exchange via the AUTH packet flow.

If set, the client will:

  1. Send AuthenticationMethod + InitialData in CONNECT
  2. Handle AUTH challenges from the server via HandleChallenge
  3. Complete authentication when CONNACK is received

This option is only relevant for MQTT v5.0. For v3.1.1, use WithCredentials for simple username/password authentication.

Example:

auth := &MyScramAuthenticator{username: "user", password: "pass"}
client, err := mq.Dial("tcp://localhost:1883",
    mq.WithProtocolVersion(mq.ProtocolV50),
    mq.WithAuthenticator(auth))

func WithAutoProtocolVersion added in v0.9.4

func WithAutoProtocolVersion(auto bool) Option

WithAutoProtocolVersion enables or disables automatic protocol version negotiation.

When enabled (default: true), the client will first attempt to connect using MQTT v5.0. If the server rejects the connection with an "Unacceptable Protocol Version" error (0x01), the client will automatically fall back to MQTT v3.1.1 and try again.

This is useful when connecting to unknown servers or during migrations where some brokers might not yet support MQTT v5.0.

Note: This only applies to the initial connection. Once a version is negotiated, it is used for all subsequent automatic reconnections.

func WithAutoReconnect

func WithAutoReconnect(enable bool) Option

WithAutoReconnect enables or disables automatic reconnection (default: true).

func WithCleanSession

func WithCleanSession(clean bool) Option

WithCleanSession sets the clean session flag.

When set to true (default), the server will discard any previous session state and subscriptions for this client ID. Each connection starts fresh.

When set to false, the server maintains session state across disconnections:

  • Subscriptions persist and are restored on reconnect
  • QoS 1 and 2 messages sent while offline are queued for delivery
  • The client MUST use a non-empty client ID (via WithClientID)
  • The server will reject the connection if client ID is empty

MQTT v3.1.1 vs v5.0 Differences:

  • In v3.1.1: false ("Clean Session") means the session persists indefinitely after disconnect.
  • In v5.0: false ("Clean Start") means the session RESUMES on connect, but EXPIRES immediately on disconnect unless WithSessionExpiryInterval is set.

To achieve persistent sessions in MQTT v5.0, you must use both:

mq.WithCleanSession(false)
mq.WithSessionExpiryInterval(0xFFFFFFFF) // Persist indefinitely

Use false for reliable message delivery across network interruptions. Use true for stateless clients or when you don't need message persistence.

Example (persistent session):

client, err := mq.Dial("tcp://localhost:1883",
    mq.WithClientID("sensor-1"),        // Required for CleanSession=false
    mq.WithCleanSession(false))

func WithClientID

func WithClientID(id string) Option

WithClientID sets the client identifier.

The client ID uniquely identifies this client to the MQTT server.

Empty client ID behavior (MQTT v3.1.1 spec):

  • With CleanSession=true: Server will auto-generate a unique ID
  • With CleanSession=false: Server will reject the connection (identifier rejected)

For persistent sessions (CleanSession=false), you MUST provide a non-empty client ID.

func WithConnectTimeout

func WithConnectTimeout(duration time.Duration) Option

WithConnectTimeout sets the connection timeout (default: 30s).

func WithConnectUserProperties added in v0.9.3

func WithConnectUserProperties(props map[string]string) Option

WithConnectUserProperties sets the User Properties to be sent in the CONNECT packet.

Only applicable for MQTT v5.0 connections. User Properties are key-value pairs that allow the client to send custom metadata to the server during the connection handshake.

This option is ignored when using MQTT v3.1.1.

Example:

client, _ := mq.Dial("tcp://localhost:1883",
    mq.WithProtocolVersion(mq.ProtocolV50),
    mq.WithConnectUserProperties(map[string]string{
        "region": "us-east-1",
        "app-version": "1.0.2",
    }))

func WithCredentials

func WithCredentials(username, password string) Option

WithCredentials sets the username and password for authentication.

func WithDefaultPublishHandler

func WithDefaultPublishHandler(handler MessageHandler) Option

WithDefaultPublishHandler sets a fallback handler for incoming PUBLISH messages that do not match any registered subscription.

This is useful for:

  • Handling messages received during reconnection race conditions
  • Handling persistent subscriptions restored without a registered handler (orphans)
  • Debugging or logging unexpected messages
  • Implementing a catch-all strategy

If not set (default), messages matching no subscription are silently dropped (but still acknowledged to comply with the protocol).

Example:

client, _ := mq.Dial(uri,
    mq.WithDefaultPublishHandler(func(c *mq.Client, msg mq.Message) {
        log.Printf("Unexpected message on %s: %s", msg.Topic, msg.Payload)
    }),
)

func WithDialer

func WithDialer(dialer ContextDialer) Option

WithDialer sets a custom dialer for establishing the network connection. This enables support for alternative transports like WebSockets, Unix sockets, or proxying, without adding dependencies to the core library.

If provided, the library will skip its standard scheme validation and delegate the connection creation entirely to the dialer.

The dialer's DialContext method receives:

  • ctx: The context provided to DialContext (or one created from WithConnectTimeout if using Dial)
  • network: The scheme from the server URL (e.g. "ws", "tcp", "unix")
  • addr: The original server string passed to Dial

Example (WebSockets using nhooyr.io/websocket):

client, _ := mq.Dial("ws://server.example.com/mqtt",
    mq.WithDialer(mq.DialFunc(func(ctx context.Context, network, addr string) (net.Conn, error) {
        c, _, err := websocket.Dial(ctx, addr, &websocket.DialOptions{
            Subprotocols: []string{"mqtt"}, // Crucial for MQTT over WebSockets
        })
        if err != nil {
            return nil, err
        }
        return websocket.NetConn(ctx, c, websocket.MessageBinary), nil
    })))

func WithHandlerInterceptor added in v0.9.4

func WithHandlerInterceptor(interceptor HandlerInterceptor) Option

WithHandlerInterceptor adds an interceptor to the incoming message handler chain. Interceptors are called in the order they are added.

Example (Logging):

client, _ := mq.Dial(uri,
    mq.WithHandlerInterceptor(func(next mq.MessageHandler) mq.MessageHandler {
        return func(c *mq.Client, m mq.Message) {
            log.Printf("Received message on topic %s", m.Topic)
            next(c, m)
        }
    }),
)

func WithIncomingQueueSize added in v0.9.4

func WithIncomingQueueSize(size int) Option

WithIncomingQueueSize sets the size of the internal incoming packet buffer (default: 100).

func WithKeepAlive

func WithKeepAlive(duration time.Duration) Option

WithKeepAlive sets the MQTT keep alive interval (default: 60s).

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger sets a custom logger for the client. If not provided, the client will use a logger that discards all output. Use this to integrate with your application's logging system.

Example:

logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
    Level: slog.LevelDebug,
}))
client, _ := mq.Dial("tcp://localhost:1883", mq.WithLogger(logger))

func WithMaxIncomingPacket

func WithMaxIncomingPacket(max int) Option

WithMaxIncomingPacket sets the maximum allowed incoming packet size. Default is 268435455 (256MB, MQTT spec maximum). Set to a lower value to protect against memory exhaustion from large incoming packets. Example: WithMaxIncomingPacket(1024 * 1024) limits incoming packets to 1MB.

func WithMaxPayloadSize

func WithMaxPayloadSize(max int) Option

WithMaxPayloadSize sets the maximum allowed outgoing payload size. Default is 268435455 (256MB, MQTT spec maximum). Set to a lower value to prevent sending large messages.

func WithMaxTopicLength

func WithMaxTopicLength(max int) Option

WithMaxTopicLength sets the maximum allowed topic length. Default is 65535 (MQTT spec maximum). Set to a lower value to reject topics exceeding your application's needs.

func WithOnConnect

func WithOnConnect(onConnect func(*Client)) Option

WithOnConnect sets the handler to be called when the client connects. This is called for the initial connection and every successful reconnection.

The handler is invoked asynchronously in a separate goroutine. This allows implementing complex setup logic (e.g., subscribing, publishing) without blocking the connection process or logic loop.

func WithOnConnectionLost

func WithOnConnectionLost(onConnectionLost func(*Client, error)) Option

WithOnConnectionLost sets the handler to be called when the connection is lost. The error parameter provides the reason for disconnection.

The handler is invoked asynchronously in a separate goroutine to ensure it does not block internal cleanup or reconnection attempts.

func WithOnServerRedirect

func WithOnServerRedirect(onServerRedirect func(serverURI string)) Option

WithOnServerRedirect sets the handler to be called when the server provides a redirection reference (MQTT v5.0 only).

The server can send a ServerReference in the CONNACK or DISCONNECT packet to suggest that the client connect to a different server. This is used for:

  • Load balancing: Distribute clients across multiple servers
  • Maintenance: Move clients before server shutdown
  • Geographic routing: Direct clients to nearest server
  • Failover: Redirect to backup server

The handler receives the server URI as provided by the server. The client does NOT automatically redirect - the handler should decide whether to accept the redirect and manually reconnect if desired.

The handler is invoked asynchronously in a separate goroutine to prevent blocking the processing of the CONNACK packet.

The server reference is also available via the ServerReference() method for polling/checking later.

This option is only relevant for MQTT v5.0 connections. For v3.1.1, the callback will never be invoked.

Example:

client, err := mq.Dial("tcp://server1.example.com:1883",
    mq.WithOnServerRedirect(func(newServer string) {
        log.Printf("Server suggests redirect to: %s", newServer)
        // Application decides whether to reconnect
    }))

func WithOutgoingQueueSize added in v0.9.4

func WithOutgoingQueueSize(size int) Option

WithOutgoingQueueSize sets the size of the internal outgoing packet buffer (default: 1000).

func WithProtocolVersion

func WithProtocolVersion(version uint8) Option

WithProtocolVersion sets the MQTT protocol version to use. Use ProtocolV50 (default) for MQTT v5.0 or ProtocolV311 for MQTT v3.1.1.

Example for v3.1.1 server:

client, _ := mq.Dial("tcp://localhost:1883", mq.WithProtocolVersion(mq.ProtocolV311))

func WithPublishInterceptor added in v0.9.4

func WithPublishInterceptor(interceptor PublishInterceptor) Option

WithPublishInterceptor adds an interceptor to the outbound publish chain. Interceptors are called in the order they are added.

Example (Tracing):

client, _ := mq.Dial(uri,
    mq.WithPublishInterceptor(func(next mq.PublishFunc) mq.PublishFunc {
        return func(topic string, payload []byte, opts ...mq.PublishOption) mq.Token {
            // Inject tracing headers or log the publish
            return next(topic, payload, opts...)
        }
    }),
)

func WithQoS0LimitPolicy added in v0.9.4

func WithQoS0LimitPolicy(policy QoS0LimitPolicy) Option

WithQoS0LimitPolicy sets the policy for handling QoS 0 messages when the buffer is full.

The default policy is QoS0LimitPolicyDrop, which ensures the client remains non-blocking and responsive even under extreme network congestion.

func WithReceiveMaximum

func WithReceiveMaximum(max uint16, policy LimitPolicy) Option

WithReceiveMaximum sets the maximum number of unacknowledged QoS 1 and QoS 2 messages the client is willing to process concurrently.

Only applicable for MQTT v5.0. This value is sent in the CONNECT packet. The default value is 65535 (maximum allowed by spec).

This allows the client to limit the number of messages it has to manage buffering for. If the client cannot process messages fast enough, it can lower this value to apply backpressure to the server.

The policy argument determines behavior when the limit is exceeded:

  • LimitPolicyIgnore (recommended): Log a warning once and continue processing. This protects the client from disconnecting due to server bugs, while still processing messages (potentially unbounded).
  • LimitPolicyStrict: Disconnect with Reason Code 0x93. Use this if strict flow control compliance is required.

This option is ignored when using MQTT v3.1.1.

func WithRequestProblemInformation

func WithRequestProblemInformation(request bool) Option

WithRequestProblemInformation requests that the server include detailed problem information (ReasonString and UserProperties) in error responses.

Only applicable for MQTT v5.0 connections. When set to true, the server should include diagnostic information in CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK, and DISCONNECT packets when errors occur.

This is useful for debugging and understanding server behavior, but may increase bandwidth usage. Most servers send problem information by default.

This option is ignored when using MQTT v3.1.1.

Example:

client, _ := mq.Dial("tcp://localhost:1883",
    mq.WithProtocolVersion(mq.ProtocolV50),
    mq.WithRequestProblemInformation(true))

func WithRequestResponseInformation

func WithRequestResponseInformation(request bool) Option

WithRequestResponseInformation requests that the server provide response information in the CONNACK packet.

Only applicable for MQTT v5.0 connections. When set to true, the server may include a ResponseInformation string that the client can use as the basis for creating response topics in request/response patterns.

This is useful in multi-tenant or managed cloud environments where the server controls topic naming conventions.

This option is ignored when using MQTT v3.1.1.

Example:

client, _ := mq.Dial("tcp://localhost:1883",
    mq.WithProtocolVersion(mq.ProtocolV50),
    mq.WithRequestResponseInformation(true))

if respInfo := client.ResponseInformation(); respInfo != "" {
    // Use server's suggested prefix
    responseTopic := respInfo + "my-responses"
}

func WithSessionExpiryInterval

func WithSessionExpiryInterval(seconds uint32) Option

WithSessionExpiryInterval sets how long the server should maintain session state after the client disconnects (in seconds).

Only applicable for MQTT v5.0. For v3.1.1, use WithCleanSession instead.

Values:

  • 0: Session ends immediately on disconnect (can be explicitly set)

  • 1-4294967294: Session persists for this many seconds

  • 4294967295 (0xFFFFFFFF): Session never expires

The server may override this value (e.g., to enforce a maximum limit). Use client.SessionExpiryInterval() after connecting to see the actual value negotiated with the server.

Note: This can be combined with WithCleanSession(false) to resume a previous session while also controlling how long it persists.

This option is ignored when using MQTT v3.1.1.

Example (short-lived session for mobile app):

client, _ := mq.Dial("tcp://localhost:1883",
    mq.WithProtocolVersion(mq.ProtocolV50),
    mq.WithCleanSession(false),
    mq.WithSessionExpiryInterval(300))  // 5 minutes

Example (long-lived session for IoT device):

client, _ := mq.Dial("tcp://localhost:1883",
    mq.WithProtocolVersion(mq.ProtocolV50),
    mq.WithCleanSession(false),
    mq.WithSessionExpiryInterval(86400))  // 24 hours

func WithSessionStore

func WithSessionStore(store SessionStore) Option

WithSessionStore sets a custom session store for persistence.

If set, session state (pending publishes, subscriptions, received QoS 2 IDs) will be persisted across process restarts. This enables the client to resume unacknowledged messages and subscriptions after a crash or reboot.

The store is only loaded when the process starts (not on network reconnects). During normal reconnections, the in-memory state is used directly.

Example with file-based storage:

store, err := mq.NewFileStore("/var/lib/mqtt", "sensor-1")
if err != nil {
    log.Fatal(err)
}

client, err := mq.Dial("tcp://localhost:1883",
    mq.WithClientID("sensor-1"),
    mq.WithCleanSession(false),
    mq.WithSessionStore(store))

func WithSubscription

func WithSubscription(topic string, handler MessageHandler) Option

WithSubscription defines a subscription that the client should maintain.

This serves two purposes:

  1. Registers the MessageHandler locally before connection (preventing race conditions).
  2. Automatically subscribes to the topic on connection/reconnection if needed.

For persistent sessions (CleanSession=false):

  • If SessionPresent=true: The server has the subscription; we just register the handler locally.
  • If SessionPresent=false: The client will automatically resubscribe to this topic.

For clean sessions (CleanSession=true):

  • The client will automatically subscribe to this topic on every connection.

func WithTLS

func WithTLS(config *tls.Config) Option

WithTLS sets the TLS configuration for secure connections. Pass nil for default TLS settings, or provide a custom *tls.Config. The server URL should use "tls://", "ssl://", or "mqtts://" scheme, or this option will enable TLS for "tcp://" URLs as well.

func WithTopicAliasMaximum

func WithTopicAliasMaximum(max uint16) Option

WithTopicAliasMaximum sets the maximum number of topic aliases the client will accept from the server when receiving PUBLISH messages.

Only applicable for MQTT v5.0. This value is sent in the CONNECT packet to tell the server how many aliases it can send to the client.

The server will also send its own TopicAliasMaximum in CONNACK, which tells the client how many aliases it can send to the server when publishing. These are independent values - one for each direction.

Topic aliases allow short numeric IDs to be used instead of full topic names, reducing bandwidth usage for frequently published topics.

Values:

  • 0: Topic aliases disabled (default)
  • 1-65535: Maximum number of aliases to accept from server

This option is ignored when using MQTT v3.1.1.

Example:

client, _ := mq.Dial("tcp://localhost:1883",
    mq.WithProtocolVersion(mq.ProtocolV50),
    mq.WithTopicAliasMaximum(100))  // Accept up to 100 aliases from server

// Publishing with alias (uses server's limit from CONNACK)
client.Publish("sensors/building-a/floor-3/room-42/temperature", data,
    mq.WithAlias())

// Receiving with alias (uses our declared limit of 100)
client.Subscribe("sensors/#", mq.AtLeastOnce, func(c *mq.Client, msg mq.Message) {
    // Topic is automatically resolved from alias
    fmt.Printf("Topic: %s\n", msg.Topic)
})

func WithWill

func WithWill(topic string, payload []byte, qos uint8, retained bool, properties ...*Properties) Option

WithWill sets the Last Will and Testament (LWT) message.

The LWT is a message that the MQTT server will automatically publish on behalf of the client if the client disconnects unexpectedly (e.g., network failure, crash, or power loss). It is NOT sent on graceful disconnects via Disconnect().

This is commonly used to notify other clients that a device has gone offline.

Parameters:

  • topic: The topic to publish the will message to
  • payload: The message content (e.g., "offline", "disconnected")
  • qos: Quality of Service level (0, 1, or 2)
  • retained: Whether the will message should be retained by the server

The will message is sent by the server when:

  • The client fails to send a PINGREQ within the keepalive period
  • The network connection is lost without a proper DISCONNECT packet
  • The client crashes or loses power

The will message is NOT sent when:

  • The client calls Disconnect() normally
  • The connection is closed gracefully

Example (status monitoring):

client, err := mq.Dial("tcp://localhost:1883",
    mq.WithClientID("sensor-1"),
    mq.WithWill("devices/sensor-1/status", []byte("offline"), 1, true))

Other clients can subscribe to "devices/+/status" to monitor device connectivity. WithWill sets the Last Will and Testament message. The properties argument is optional and can be used to set Will Properties (MQTT v5.0).

type PersistedPublish added in v0.9.1

type PersistedPublish struct {
	Topic      string
	Payload    []byte
	QoS        uint8
	Retain     bool
	Properties *PublishProperties
}

PersistedPublish represents a publish for persistence. This is a simplified representation containing only the data needed to restore a pending publish after reconnection.

type PersistedSubscription added in v0.9.1

type PersistedSubscription struct {
	QoS     uint8
	Options *PersistedSubscriptionOptions
}

PersistedSubscription represents a subscription for persistence. This contains the data needed to restore a subscription after reconnection.

type PersistedSubscriptionOptions added in v0.9.1

type PersistedSubscriptionOptions struct {
	NoLocal           bool
	RetainAsPublished bool
	RetainHandling    uint8
	SubscriptionID    *uint32
	UserProperties    map[string]string
}

PersistedSubscriptionOptions represents MQTT v5.0 subscription options for persistence.

type Properties

type Properties struct {
	// ContentType specifies the MIME content type of the message payload.
	// Example: "application/json", "text/plain", "application/octet-stream"
	ContentType string

	// ResponseTopic specifies the topic for response messages.
	// Used in request/response messaging patterns.
	ResponseTopic string

	// CorrelationData is used to correlate request and response messages.
	// Typically used with ResponseTopic for request/response patterns.
	CorrelationData []byte

	// MessageExpiry specifies the message expiry interval in seconds.
	// If set, the message will be discarded if not delivered within this time.
	MessageExpiry *uint32

	// PayloadFormat indicates the format of the payload.
	// 0 = unspecified bytes (default)
	// 1 = UTF-8 encoded character data
	PayloadFormat *uint8

	// SubscriptionIdentifier contains the subscription identifier(s) that matched
	// this message. Only present in received messages when the server supports
	// subscription identifiers and the subscription was created with an ID.
	// This is a receive-only property. If set when publishing, it will be silently
	// ignored and not sent to the server.
	SubscriptionIdentifier []int

	// ReasonString contains a human-readable explanation from the server.
	// Typically used for diagnostic purposes when operations fail or behave
	// unexpectedly. Common in error responses and server notifications.
	// This is a receive-only property. If set when publishing, it will be silently
	// ignored and not sent to the server.
	ReasonString string

	// WillDelayInterval specifies the delay in seconds before the Will Message is sent.
	// If the connection is re-established before this time, the Will Message is not sent.
	WillDelayInterval *uint32

	// SessionExpiryInterval specifies the session expiry interval in seconds.
	// Used in DISCONNECT packets to update the expiry interval.
	SessionExpiryInterval *uint32

	// UserProperties contains application-specific properties as key-value pairs.
	// These can be used to pass custom metadata with messages.
	UserProperties map[string]string
}

Properties represents MQTT v5.0 properties for messages.

All fields are optional and only used when the protocol version is 5.0. For MQTT v3.1.1 connections, properties are ignored.

Common properties for application use:

  • ContentType: MIME type of the payload (e.g., "application/json")
  • ResponseTopic: Topic for response messages in request/response pattern
  • CorrelationData: Correlation data for matching requests with responses
  • UserProperties: Application-specific key-value pairs
  • MessageExpiry: Message expiry interval in seconds

func NewProperties

func NewProperties() *Properties

NewProperties creates a new Properties instance with initialized maps.

func (*Properties) GetUserProperty

func (p *Properties) GetUserProperty(key string) string

GetUserProperty retrieves a user property value. Returns empty string if the property doesn't exist.

func (*Properties) SetUserProperty

func (p *Properties) SetUserProperty(key, value string)

SetUserProperty adds or updates a user property.

type PublishFunc added in v0.9.4

type PublishFunc func(topic string, payload []byte, opts ...PublishOption) Token

PublishFunc matches the signature of Client.Publish.

type PublishInterceptor added in v0.9.4

type PublishInterceptor func(PublishFunc) PublishFunc

PublishInterceptor is a function that wraps a PublishFunc. It allows cross-cutting concerns to be applied to all outbound messages.

Example (Tracing):

func TracingInterceptor(next mq.PublishFunc) mq.PublishFunc {
    return func(topic string, payload []byte, opts ...mq.PublishOption) mq.Token {
        // Inject tracing headers into opts or log the publish
        return next(topic, payload, opts...)
    }
}

type PublishOption

type PublishOption func(*PublishOptions)

PublishOption is a functional option for configuring a PUBLISH packet.

func WithAlias

func WithAlias() PublishOption

WithAlias enables topic alias optimization for this publish.

Only applicable for MQTT v5.0 when WithTopicAliasMaximum() is set. Topic aliases allow the client to send a short alias ID instead of the full topic name, reducing bandwidth usage for frequently published topics.

On the first publish to a topic with WithAlias():

  • Sends full topic name + assigns an alias ID
  • Subsequent publishes automatically use the alias (sends empty topic)

The library automatically manages alias allocation and tracking. If the alias limit is reached, gracefully falls back to sending the full topic.

Example:

client, _ := mq.Dial("tcp://localhost:1883",
    mq.WithProtocolVersion(mq.ProtocolV50),
    mq.WithTopicAliasMaximum(100))

// First publish - sends full topic + assigns alias
client.Publish("sensors/building-a/floor-3/room-42/temperature", data,
    mq.WithAlias())

// Subsequent publishes - automatically uses alias (saves ~50 bytes)
client.Publish("sensors/building-a/floor-3/room-42/temperature", data,
    mq.WithAlias())

func WithContentType

func WithContentType(contentType string) PublishOption

WithContentType sets the MQTT v5.0 content type property. This specifies the MIME type of the message payload. Only used when protocol version is 5.0, ignored for v3.1.1.

Example:

client.Publish("data/json", payload, mq.WithContentType("application/json"))

func WithCorrelationData

func WithCorrelationData(data []byte) PublishOption

WithCorrelationData sets correlation data for request/response pattern. Used to match responses with requests. Only used when protocol version is 5.0, ignored for v3.1.1.

Example:

client.Publish("requests/data", payload,
    mq.WithResponseTopic("responses/data"),
    mq.WithCorrelationData([]byte("req-123")))

func WithMessageExpiry

func WithMessageExpiry(seconds uint32) PublishOption

WithMessageExpiry sets the message expiry interval in seconds. The message will be discarded if not delivered within this time. Only used when protocol version is 5.0, ignored for v3.1.1.

Example:

client.Publish("events/temp", payload, mq.WithMessageExpiry(300)) // 5 minutes

func WithPayloadFormat

func WithPayloadFormat(format uint8) PublishOption

WithPayloadFormat sets the payload format indicator.

Format values:

  • mq.PayloadFormatBytes (0): Unspecified bytes (default)
  • mq.PayloadFormatUTF8 (1): UTF-8 encoded character data

If PayloadFormatUTF8 is used, the client will validate that the payload is valid UTF-8 before sending.

Only used when protocol version is 5.0, ignored for v3.1.1.

Example:

client.Publish("data/text", []byte("hello"), mq.WithPayloadFormat(mq.PayloadFormatUTF8))

func WithProperties

func WithProperties(props *Properties) PublishOption

WithProperties sets multiple v5.0 properties at once. This is a convenience function for setting multiple properties. Only used when protocol version is 5.0, ignored for v3.1.1.

Example:

props := mq.NewProperties()
props.ContentType = "application/json"
props.SetUserProperty("version", "1.0")
client.Publish("data/json", payload, mq.WithProperties(props))

func WithQoS

func WithQoS(qos QoS) PublishOption

WithQoS sets the Quality of Service level for the publish.

QoS levels:

  • 0: At most once delivery (fire and forget)
  • 1: At least once delivery (acknowledged)
  • 2: Exactly once delivery (assured)

Default is QoS 0.

func WithResponseTopic

func WithResponseTopic(topic string) PublishOption

WithResponseTopic sets the response topic for request/response pattern. The receiver can publish responses to this topic. Only used when protocol version is 5.0, ignored for v3.1.1.

Example:

client.Publish("requests/data", payload,
    mq.WithResponseTopic("responses/data"),
    mq.WithCorrelationData([]byte("req-123")))

func WithRetain

func WithRetain(retain bool) PublishOption

WithRetain sets the retain flag for the publish.

When true, the server stores the message and delivers it to future subscribers of the topic. Only the most recent retained message per topic is stored.

Default is false.

func WithUserProperty

func WithUserProperty(key, value string) PublishOption

WithUserProperty adds a user-defined property key-value pair. Can be called multiple times to add multiple properties. Only used when protocol version is 5.0, ignored for v3.1.1.

Example:

client.Publish("sensors/temp", payload,
    mq.WithUserProperty("sensor-id", "temp-01"),
    mq.WithUserProperty("location", "warehouse-a"))

type PublishOptions

type PublishOptions struct {
	QoS        uint8
	Retain     bool
	Properties *Properties
	UseAlias   bool
}

PublishOptions holds configuration for a publish operation.

type PublishProperties

type PublishProperties struct {
	PayloadFormat          *uint8
	MessageExpiry          *uint32
	TopicAlias             *uint16
	ResponseTopic          string
	CorrelationData        []byte
	UserProperties         map[string]string
	SubscriptionIdentifier *uint32
	ContentType            string
}

PublishProperties represents MQTT v5.0 publish properties for persistence.

type QoS

type QoS uint8

QoS represents the MQTT Quality of Service level.

const (
	// AtMostOnce (QoS 0) - Fire and forget delivery.
	// The message is delivered at most once, or it may not be delivered at all.
	// No acknowledgment is sent by the receiver, and the message is not retried.
	AtMostOnce QoS = 0

	// AtLeastOnce (QoS 1) - Acknowledged delivery.
	// The message is always delivered at least once. The receiver sends an
	// acknowledgment (PUBACK), and the sender retries until acknowledged.
	// Duplicate messages may occur.
	AtLeastOnce QoS = 1

	// ExactlyOnce (QoS 2) - Assured delivery.
	// The message is always delivered exactly once using a four-step handshake
	// (PUBLISH, PUBREC, PUBREL, PUBCOMP). This is the safest but slowest option.
	ExactlyOnce QoS = 2
)

MQTT Quality of Service levels.

These constants provide readable names for the three QoS levels defined in the MQTT specification. Using named constants improves code readability compared to numeric literals.

Example:

// More readable
client.Subscribe("sensors/temp", mq.AtLeastOnce, handler)
client.Publish("alert", data, mq.WithQoS(mq.ExactlyOnce))

// vs numeric literals
client.Subscribe("sensors/temp", 1, handler)
client.Publish("alert", data, mq.WithQoS(2))

type QoS0LimitPolicy added in v0.9.4

type QoS0LimitPolicy int

QoS0LimitPolicy determines how the client handles QoS 0 messages when the internal buffer is full.

const (
	// QoS0LimitPolicyDrop drops the QoS 0 message immediately if the internal buffer is full.
	// This prevents the caller from blocking and avoids goroutine leaks.
	// The token's Dropped() method will return true.
	QoS0LimitPolicyDrop QoS0LimitPolicy = iota

	// QoS0LimitPolicyBlock blocks the caller until space is available in the internal buffer.
	// Use this if reliability is more important than preventing temporary blocking.
	// This is safe to use as it still respects client shutdown.
	QoS0LimitPolicyBlock
)

type ReasonCode added in v0.9.1

type ReasonCode uint8

ReasonCode represents a MQTT v5.0 reason code. It implements the error interface so it can be used with errors.Is.

const (
	ReasonCodeNormalDisconnect      ReasonCode = 0x00
	ReasonCodeDisconnectWithWill    ReasonCode = 0x04
	ReasonCodeUnspecifiedError      ReasonCode = 0x80
	ReasonCodeMalformedPacket       ReasonCode = 0x81
	ReasonCodeProtocolError         ReasonCode = 0x82
	ReasonCodeImplementationError   ReasonCode = 0x83
	ReasonCodeNotAuthorized         ReasonCode = 0x87
	ReasonCodeServerBusy            ReasonCode = 0x89
	ReasonCodeServerShuttingDown    ReasonCode = 0x8B
	ReasonCodeKeepAliveTimeout      ReasonCode = 0x8D
	ReasonCodeSessionTakenOver      ReasonCode = 0x8E
	ReasonCodeTopicFilterInvalid    ReasonCode = 0x90
	ReasonCodeTopicNameInvalid      ReasonCode = 0x91
	ReasonCodeReceiveMaximumExceed  ReasonCode = 0x93
	ReasonCodeTopicAliasInvalid     ReasonCode = 0x94
	ReasonCodePacketTooLarge        ReasonCode = 0x95
	ReasonCodeMessageRateTooHigh    ReasonCode = 0x96
	ReasonCodeQuotaExceeded         ReasonCode = 0x97
	ReasonCodeAdministrativeAction  ReasonCode = 0x98
	ReasonCodePayloadFormatInvalid  ReasonCode = 0x99
	ReasonCodeRetainNotSupported    ReasonCode = 0x9A
	ReasonCodeQoSNotSupported       ReasonCode = 0x9B
	ReasonCodeUseAnotherServer      ReasonCode = 0x9C
	ReasonCodeServerMoved           ReasonCode = 0x9D
	ReasonCodeSharedSubNotSupported ReasonCode = 0x9E
	ReasonCodeConnectionRateExceed  ReasonCode = 0x9F
	ReasonCodeMaximumConnectTime    ReasonCode = 0xA0
	ReasonCodeSubscriptionIDNotSupp ReasonCode = 0xA1
	ReasonCodeWildcardSubNotSupp    ReasonCode = 0xA2
)

MQTT v5.0 Reason Codes

These constants represent the reason codes defined in the MQTT v5.0 specification. Reason codes are used in DISCONNECT, CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, and UNSUBACK packets to provide detailed information about the outcome of an operation.

Use these constants with errors.Is to check for specific error conditions.

Example (checking for specific disconnect reason):

token := client.Publish("topic", data, mq.WithQoS(1))
if err := token.Wait(ctx); err != nil {
    if errors.Is(err, mq.ReasonCodeQuotaExceeded) {
        log.Println("Server quota exceeded, backing off...")
    } else if errors.Is(err, mq.ReasonCodeNotAuthorized) {
        log.Println("Not authorized to publish to this topic")
    }
}

Reason codes 0x00-0x7F indicate success, while 0x80-0xFF indicate failure.

func (ReasonCode) Error added in v0.9.1

func (r ReasonCode) Error() string

type ServerCapabilities

type ServerCapabilities struct {
	// MaximumPacketSize is the maximum packet size the server will accept.
	// 0 means no limit was specified by the server.
	MaximumPacketSize uint32

	// ReceiveMaximum is the maximum number of QoS 1 and QoS 2 publications
	// the server is willing to process concurrently.
	ReceiveMaximum uint16

	// TopicAliasMaximum is the maximum topic alias value the server accepts.
	// 0 means topic aliases are not supported by the server.
	TopicAliasMaximum uint16

	// MaximumQoS is the maximum QoS level the server supports (0, 1, or 2).
	MaximumQoS uint8

	// RetainAvailable indicates if the server supports retained messages.
	RetainAvailable bool

	// WildcardAvailable indicates if the server supports wildcard subscriptions.
	WildcardAvailable bool

	// SubscriptionIDAvailable indicates if the server supports subscription identifiers.
	SubscriptionIDAvailable bool

	// SharedSubscriptionAvailable indicates if the server supports shared subscriptions.
	SharedSubscriptionAvailable bool
}

ServerCapabilities represents the capabilities and limits advertised by the MQTT server. These are only available when using MQTT v5.0.

type SessionStore

type SessionStore interface {
	// SavePendingPublish stores an outgoing publish that hasn't been acknowledged.
	// Called when a QoS 1/2 publish is sent.
	// MAY return immediately and persist asynchronously.
	SavePendingPublish(packetID uint16, pub *PersistedPublish) error

	// DeletePendingPublish removes a publish after it's been acknowledged.
	// Called when PUBACK (QoS 1) or PUBCOMP (QoS 2) is received.
	// MAY return immediately and delete asynchronously.
	DeletePendingPublish(packetID uint16) error

	// LoadPendingPublishes retrieves all pending publishes on reconnect.
	// Called once during connection establishment.
	// MUST complete synchronously and return actual data.
	LoadPendingPublishes() (map[uint16]*PersistedPublish, error)

	// ClearPendingPublishes removes all pending publishes.
	// Called when SessionPresent=false (server lost our session).
	ClearPendingPublishes() error

	// SaveSubscription stores an active subscription.
	// Called when SUBACK is received.
	// MAY return immediately and persist asynchronously.
	SaveSubscription(topic string, sub *PersistedSubscription) error

	// DeleteSubscription removes a subscription.
	// Called when UNSUBACK is received.
	// MAY return immediately and delete asynchronously.
	DeleteSubscription(topic string) error

	// LoadSubscriptions retrieves all subscriptions on reconnect.
	// Called once during connection establishment.
	//
	// Note: Only topic filters and options are restored. The associated MessageHandlers
	// are NOT persisted.
	// - Callers should use mq.WithSubscription to re-associate handlers with these topics.
	// - If no handler is found, messages will fallback to the DefaultPublishHandler if set.
	//
	// MUST complete synchronously and return actual data.
	LoadSubscriptions() (map[string]*PersistedSubscription, error)

	// SaveReceivedQoS2 marks a QoS 2 packet ID as received (prevent duplicates).
	// Called when QoS 2 PUBLISH is received.
	// MAY return immediately and persist asynchronously.
	SaveReceivedQoS2(packetID uint16) error

	// DeleteReceivedQoS2 removes a QoS 2 packet ID after PUBCOMP sent.
	// Called when QoS 2 flow completes.
	// MAY return immediately and delete asynchronously.
	DeleteReceivedQoS2(packetID uint16) error

	// LoadReceivedQoS2 retrieves all received QoS 2 packet IDs.
	// Called once during connection establishment.
	// MUST complete synchronously and return actual data.
	LoadReceivedQoS2() (map[uint16]struct{}, error)

	// ClearReceivedQoS2 removes all received QoS 2 packet IDs.
	// Called when SessionPresent=false (server lost our session).
	ClearReceivedQoS2() error

	// Clear removes all session state.
	// Called when CleanSession=true or session expires.
	Clear() error
}

SessionStore handles persistence of session state across process restarts. This enables session state to survive client restarts, crashes, or reboots.

Note: State is only loaded from the store when the client process starts. During normal network reconnections (when in-memory state is still available), the store is not consulted - the in-memory state is used directly.

What Gets Persisted:

  • Pending QoS 1 and QoS 2 publishes (not yet acknowledged by server)
  • Active subscriptions (to restore on reconnect)
  • Received QoS 2 packet IDs (to prevent duplicate delivery)

What Does NOT Get Persisted:

  • QoS 0 publishes (fire-and-forget, no delivery guarantee)
  • Messages already acknowledged (PUBACK/PUBCOMP received)
  • Connection state (handled by MQTT protocol on reconnect)

Threading Model:

All methods are called from a single goroutine (the client's logic loop). Implementations do NOT need to be thread-safe for concurrent calls from mq.

Async Implementations:

Save/Delete methods MAY return immediately and perform I/O asynchronously in a background goroutine. This allows implementations to batch writes or use async I/O without blocking the client's logic loop.

However, Load methods MUST complete synchronously and return the actual data, as they are called during connection setup when the data is needed immediately.

Error Handling:

  • Save/Delete errors are logged but do not fail the operation. The in-memory state is authoritative. Implementations should handle errors gracefully (e.g., retry, log, alert).
  • Load errors will cause connection failure, as session state cannot be restored.

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

SubscribeOption is a functional option for configuring a subscription.

func WithNoLocal

func WithNoLocal(noLocal bool) SubscribeOption

WithNoLocal (MQTT v5.0) prevents the server from sending messages published by this client back to this client.

This option is ignored when using MQTT v3.1.1.

func WithPersistence

func WithPersistence(persistence bool) SubscribeOption

WithPersistence sets whether the subscription should be persisted to the session store. If true (default), the subscription is saved and restored on process restart. If false, the subscription is ephemeral and lost on client restart. This is independent of the MQTT CleanSession/CleanStart flag which controls server-side persistence.

func WithRetainAsPublished

func WithRetainAsPublished(retain bool) SubscribeOption

WithRetainAsPublished (MQTT v5.0) requests that the server keeps the Retain flag as set by the publisher when forwarding the message.

This option is ignored when using MQTT v3.1.1.

func WithRetainHandling

func WithRetainHandling(handling uint8) SubscribeOption

WithRetainHandling (MQTT v5.0) specifies when retained messages are sent. 0 = Send retained messages at time of subscribe (default) 1 = Send retained messages at subscribe only if subscription doesn't exist 2 = Do not send retained messages at time of subscribe

This option is ignored when using MQTT v3.1.1.

func WithSubscribeUserProperty added in v0.9.1

func WithSubscribeUserProperty(key, value string) SubscribeOption

WithSubscribeUserProperty (MQTT v5.0) adds a user property to the subscription. User properties are key-value pairs that can be used to send metadata to the server.

This option is ignored when using MQTT v3.1.1.

func WithSubscriptionIdentifier

func WithSubscriptionIdentifier(id int) SubscribeOption

WithSubscriptionIdentifier (MQTT v5.0) sets a subscription identifier for this subscription. The identifier will be included in PUBLISH packets that match this subscription, allowing the application to determine which subscription(s) matched the message.

Subscription identifiers must be in the range 1-268,435,455. A value of 0 means no identifier (default).

This is useful when:

  • Multiple subscriptions have overlapping topic filters
  • You need to route messages differently based on which subscription matched
  • Implementing complex message routing logic

The subscription identifier is available in received messages via msg.Properties.SubscriptionIdentifier (a slice, as multiple subscriptions may match).

This option is ignored when using MQTT v3.1.1.

Example:

client.Subscribe("sensors/+/temp", 1, tempHandler,
    mq.WithSubscriptionIdentifier(100))

type SubscribeOptions

type SubscribeOptions struct {
	NoLocal           bool
	RetainAsPublished bool
	RetainHandling    uint8
	Persistence       bool              // Persistence enabled by default (must be manually set to true by default logic)
	SubscriptionID    int               // MQTT v5.0: Subscription identifier (1-268435455, 0 = none).
	UserProperties    map[string]string // MQTT v5.0: User properties
}

SubscribeOptions holds configuration for a subscription.

type Token

type Token interface {
	// Wait blocks until the operation completes or the context is cancelled.
	// It returns nil if successful, or the error (timeout/nack/connection loss).
	Wait(ctx context.Context) error

	// Done returns a channel that closes when the operation is complete.
	// This allows the token to be used in select statements.
	Done() <-chan struct{}

	// Error returns the error if finished, mostly for use with Done().
	Error() error

	// Dropped returns true if the message was dropped due to a full internal buffer (QoS 0).
	// This only occurs when Using QoS0LimitPolicyDrop.
	Dropped() bool
}

Token represents an asynchronous operation that can be waited on.

Tokens are returned by Publish, Subscribe, and Unsubscribe operations. They provide both blocking (Wait) and non-blocking (Done + Error) patterns for handling operation completion.

Example (blocking wait):

token := client.Publish("topic", []byte("data"), mq.WithQoS(1))
if err := token.Wait(context.Background()); err != nil {
    log.Printf("Operation failed: %v", err)
}

Example (non-blocking with select):

token := client.Publish("topic", []byte("data"), mq.WithQoS(1))
select {
case <-token.Done():
    if err := token.Error(); err != nil {
        log.Printf("Failed: %v", err)
    }
case <-time.After(5 * time.Second):
    log.Println("Timeout")
}

Example (with context timeout):

token := client.Subscribe("topic", 1, handler)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := token.Wait(ctx); err != nil {
    log.Printf("Subscribe failed or timed out: %v", err)
}

type UnsubscribeOption added in v0.9.1

type UnsubscribeOption func(*UnsubscribeOptions)

UnsubscribeOption is a functional option for configuring an unsubscription.

func WithUnsubscribeUserProperty added in v0.9.1

func WithUnsubscribeUserProperty(key, value string) UnsubscribeOption

WithUnsubscribeUserProperty (MQTT v5.0) adds a user property to the unsubscribe packet. User properties are key-value pairs that can be used to send metadata to the server.

This option is ignored when using MQTT v3.1.1.

type UnsubscribeOptions added in v0.9.1

type UnsubscribeOptions struct {
	UserProperties map[string]string // MQTT v5.0: User properties
}

UnsubscribeOptions holds configuration for an unsubscription.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL