mqttv5

package module
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

README

mqttv5

Go Reference Go Report Card CI License

A fast, ergonomic MQTT v5 client for Go. Single package, stdlib-only core, zero allocations on the receive path, Go-native subscribe surface (<-chan *Message / Queue[*Message] / callback), and the supervisor (reconnect + replay + resubscribe) baked into every Client.

go get github.com/ashtonian/mqttv5

Example

package main

import (
    "context"
    "fmt"

    "github.com/ashtonian/mqttv5"
    jsoncodec "github.com/ashtonian/mqttv5/codec/json"
    "github.com/ashtonian/mqttv5/wire"
)

type Event struct {
    Device string  `json:"device"`
    Temp   float64 `json:"temp"`
}

func main() {
    ctx := context.Background()

    client, _ := mqttv5.New(mqttv5.WithBroker("mqtt://localhost:1883"))
    _ = client.Connect(ctx)
    defer client.Disconnect(ctx)

    // Generic typed pub/sub via Codec[T] (JSON ships in a sibling
    // submodule). Supervisor handles reconnect + auto-resubscribe +
    // QoS 1/2 replay underneath — you just write the consumer loop.
    events := mqttv5.NewTyped(client, jsoncodec.Codec[Event]{})

    msgs, _, _ := events.Subscribe(ctx, []mqttv5.TopicFilter{{Topic: "events/#", QoS: 1}})
    go func() {
        for m := range msgs {
            fmt.Printf("%s: %+v\n", m.Topic, m.Value) // m.Value already decoded
            _ = m.Ack() // PUBACK held for QoS 1 until you ack
        }
    }()

    _ = events.Publish(ctx, wire.PublishOpts{Topic: "events/hello", QoS: 1}, Event{Device: "a1", Temp: 22.5})
}

See examples/ for full demos — TLS, multi-broker failover, publisher pool, durable queue, raw-bytes subscribe, WebSocket, OAuth rotation, and lifecycle observability.

Why this over eclipse/paho.golang + autopaho

  • Go-idiomatic top to bottom. Channels (<-chan *Message) and queues for delivery, not just global OnPublishReceived callbacks. context.Context on every operation. Sentinel errors with errors.Is. Functional options instead of a 40-field ClientOptions struct.
  • One client. Supervisor baked in. No paho / autopaho split — reconnect, replay-in-flight, and auto-resubscribe are always on.
  • 20–60× faster decode, zero steady-state allocs on the receive path. Topic and payload are zero-copy slices into a pooled frame; properties decode lazily. ~30× less GC pressure at sustained load (~29 MB/s of garbage vs ~880 MB/s for autopaho at 100k msg/s) — smaller pauses, less p99 jitter.
  • Multi-broker, kept distinct and composable. Failover (WithBrokers), fan-out across N independent brokers (ClientGroup), publish-only pool against one broker (WithPublisherPool) — three real patterns, each its own API. Compose them: WithBrokers inside a GroupMember for HA-per-region, then WithPublisherPool on top for throughput.
  • Publisher pool that actually scales. paho serialises every write behind one mutex; mqttv5 funnels into MPSC + one writer goroutine, so N publish-only conns parallelise across cores. 2.5× faster under 8-goroutine fan-in.
  • Backpressure as a first-class concept. Per-subscription DropNewest / DropOldest, with the dropped message auto-ack'd so the broker stops retransmitting.
  • Generic typed payloads. Codec[T] boundary; JSON and msgpack codecs ship in separate submodules so the core stays stdlib-only.
  • Durable outbound queue (QueuePublisher + file-backed queue/file/) — enqueue while disconnected, drain on reconnect, survive process restart.
  • Conformance. Full MQTT v5: shared subscriptions, topic aliases (in + out), session expiry, retained messages, will + will properties, enhanced authentication (CONNECT + mid-session §4.12), CONNACK capability flags honoured — Subscribe* errors before the wire when the broker has disabled the feature.
  • WebSocket as a sibling moduletransport/ws brings ws/wss via WithDialFunc(ws.DialFunc(opts)) (see examples/ws). Zero impact on the core's stdlib-only promise.

Runnable examples

In examples/ — one go.mod, run any of them with MQTT_BROKER=mqtt://127.0.0.1:1883:

Path Shows
examples/basic Connect, channel subscribe, publish
examples/typed Typed[T] + JSON codec
examples/reconnect Full lifecycle callback set (Up / Down / ConnectError / ReconnectAttempt) surviving a broker restart
examples/group ClientGroup multi-broker fan-out / fan-in
examples/ws WebSocket transport — WithDialFunc(ws.DialFunc(opts))
examples/stats Client.Stats() snapshot — bridge into Prometheus / OTel / expvar
examples/oauth WithConnectPacketBuilder rotating an OAuth bearer per CONNECT
examples/disconnect DisconnectWith carrying ReasonCode + ReasonString + SessionExpiry override
docker run -d -p 1883:1883 eclipse-mosquitto
go -C examples run ./basic

Install / submodules

The core is stdlib-only. Opt-in submodules each have their own go.mod so importing them doesn't add a runtime dep to the core.

Submodule Import Purpose
core github.com/ashtonian/mqttv5 Client, supervisor, options, in-memory queue
JSON codec github.com/ashtonian/mqttv5/codec/json Codec[T] for Typed[T] (stdlib only)
msgpack codec github.com/ashtonian/mqttv5/codec/msgpack Codec[T] via vmihailenco/msgpack/v5
File session store github.com/ashtonian/mqttv5/store/file Crash-safe in-flight QoS 1/2 state
File publish queue github.com/ashtonian/mqttv5/queue/file Durable outbound publish queue (WAL)
WebSocket transport github.com/ashtonian/mqttv5/transport/ws ws:// and wss:// — WithDialFunc(ws.DialFunc(ws.DialOpts{...}))

Three multi-broker patterns

Three distinct shapes, each its own API:

Goal API Connections
Failover — one logical client across interchangeable brokers (same data) WithBrokers(urls...) 1 at a time, supervisor rotates on drop
Parallel sessions to N independent brokers NewClientGroup(members, opts...) N (one per broker), all live
Publish throughput — saturate one broker WithPublisherPool(N) N publish-only to the same broker

These compose. Use WithBrokers inside a GroupMember.Opts to get HA-per-region fan-out; use WithPublisherPool alongside WithBrokers to get throughput against an HA pair.

Client.SetBrokers(urls...) swaps the failover list at runtime — typical use is inside WithOnServerDisconnect when the broker sends ReasonServerMoved with a ServerReference:

mqttv5.WithOnServerDisconnect(func(d *wire.Disconnect) {
    if ref, ok := d.Properties.String(wire.PropServerReference); ok {
        _ = cli.SetBrokers(ref)
    }
})
ClientGroup policies

ClientGroup is for N parallel sessions to N brokers, each treated as itself — bridges between independent brokers, multi-tenant SaaS with per-broker credentials, or a clustered broker fleet you want N parallel sessions into. If your brokers are interchangeable for the same data, use WithBrokers on a single Client; ClientGroup does not failover between members.

Construction takes a GroupMember list plus group-level options:

g, _ := mqttv5.NewClientGroup(
    []mqttv5.GroupMember{
        {
            Broker: "mqtts://emea.example.com:8883",
            Name:   "emea",
            Opts:   []mqttv5.Option{mqttv5.WithCredentials("emea-svc", []byte(token1))},
        },
        {
            Broker: "mqtts://apac.example.com:8883",
            Name:   "apac",
            Opts:   []mqttv5.Option{mqttv5.WithCredentials("apac-svc", []byte(token2))},
        },
    },
    mqttv5.WithGroupSharedOpts(
        mqttv5.WithClientID("fleet"),
        mqttv5.WithKeepAlive(30),
    ),
    mqttv5.WithGroupPublishPolicy(mqttv5.GroupPublishBroadcast),
)

GroupMember.Opts applies after WithGroupSharedOpts, so per-member auth / TLS / ClientID / callbacks win. Member names default to member-N (1-based) when unset.

Publish policy Behaviour Use case
GroupPublishBroadcast (default) Every member receives. Succeeds if any did. Bridge / mirror — members carry different data
GroupPublishRoundRobin Next healthy member per call. Distribute publishes across a clustered broker fleet
GroupPublishHashByTopic FNV-1a(topic) → member. Per-topic ordering. Fleet throughput with per-topic affinity

Subscribe is always "all members + merge into one channel/queue". The returned token map is keyed by member name — pass to UnsubscribeAll or selectively to Unsubscribe(name, token).

ch, tokens, _ := g.Subscribe(ctx, []mqttv5.TopicFilter{{Topic: "events/#", QoS: 1}})
// tokens["emea"], tokens["apac"]
defer g.UnsubscribeAll(ctx, tokens)

Connect / Disconnect / Subscribe run in parallel across members by default — pass WithGroupSequentialLifecycle if you need deterministic ordering. Use g.Members() or g.Member(name) for direct per-member access (per-member Stats(), etc.).

Subscribe shapes

All take []TopicFilter so multi-filter SUBSCRIBE is a single packet.

Channel — manual ack, ordered flush
msgs, token, err := cli.Subscribe(ctx,
    []mqttv5.TopicFilter{{Topic: "events/#", QoS: 1}},
    mqttv5.SubBuffer(256),
)
for m := range msgs {
    handle(m)
    _ = m.Ack() // PUBACK released in §4.6 arrival order
}
_ = cli.Unsubscribe(ctx, token) // closes msgs

If the buffer fills, the incoming message is auto-ack'd and dropped so the broker stops retrying. Observe drops via SubOnDrop(...).

Queue — unbounded, optional DropOldest
q, _, _ := cli.SubscribeQueue(ctx,
    []mqttv5.TopicFilter{{Topic: "events/#", QoS: 1}},
    mqttv5.SubMaxQueueSize(10_000),
    mqttv5.SubDropPolicy(mqttv5.DropOldest), // keeps freshest 10k
)
for {
    m, ok := q.Dequeue(ctx)
    if !ok { break }
    handle(m)
    _ = m.Ack()
}

DropOldest evicts the queue head and acks it before enqueueing — only the queue variant supports it (channels can't peek-and-pop without racing the consumer).

Callback — sync, auto-ack
cli.SubscribeCallback(ctx,
    []mqttv5.TopicFilter{{Topic: "ctrl/+", QoS: 0}},
    func(m *mqttv5.Message) {
        // Runs on the read goroutine — MUST be non-blocking.
        process(m)
        // Ack auto-fires after return.
    },
)

Typed publish / subscribe

import jsoncodec "github.com/ashtonian/mqttv5/codec/json"

type Reading struct { Device string; Temp float64 }

typed := mqttv5.NewTyped[Reading](cli, jsoncodec.Codec[Reading]{})

_ = typed.Publish(ctx, wire.PublishOpts{Topic: "sensors/a1", QoS: 1},
    Reading{Device: "a1", Temp: 22.5})

ch, _, _ := typed.Subscribe(ctx,
    []mqttv5.TopicFilter{{Topic: "sensors/#", QoS: 1}})
for m := range ch {
    fmt.Println(m.Topic, m.Value.Temp)
    _ = m.Ack()
}

Implement mqttv5.Codec[T] for protobuf, Cap'n Proto, FlatBuffers, custom binary — the core has no codec dependency.

Durable QueuePublisher

QueuePublisher decouples the caller from broker availability: Publish returns as soon as the entry is durably stored. A drain goroutine handles the broker round-trip whenever the client is connected.

import qfile "github.com/ashtonian/mqttv5/queue/file"

q, _ := qfile.Open("/var/lib/myapp/outbound")
pub := mqttv5.NewQueuePublisher(cli, q,
    mqttv5.WithQueueBatchSize(32),
    mqttv5.WithQueueMaxSize(1_000_000),
    mqttv5.WithQueueTTL(24*time.Hour),
    mqttv5.WithDeadLetter(func(e mqttv5.QueueEntry, err error) {
        log.Printf("dropped %s: %v", e.Publish.Topic, err)
    }),
)
defer pub.Close(ctx)

_ = pub.Publish(ctx, wire.PublishOpts{Topic: "logs", Payload: data, QoS: 1})

QoS 0 is rejected (ErrQoS0NotQueueable) — durable enqueue is meaningless when the broker has no obligation to deliver. Use mqttv5.NewMemoryPublisherQueue() for in-process buffering without crash safety.

WebSocket

import (
    "github.com/ashtonian/mqttv5"
    "github.com/ashtonian/mqttv5/transport/ws"
)

cli, _ := mqttv5.New(
    mqttv5.WithBroker("wss://broker.example.com/mqtt"),
    mqttv5.WithDialFunc(ws.DialFunc(ws.DialOpts{TLSConfig: tlsCfg})),
)

wss:// requires a non-nil TLSConfigws.DialFunc returns ErrMissingTLSConfig at the first Connect attempt if you pass nil. No silent downgrade.

Options reference

Client construction
Option Default Effect
WithBroker(url) / WithBrokers(urls...) (required) Broker URL(s). mqtt/tcp/mqtts/tls/ssl schemes; default ports filled in. ws/wss via WithDialFunc(ws.DialFunc(opts)).
WithDialFunc(fn) Replaces the built-in TCP/TLS dial. Takes precedence over WithDialer/WithTLSConfig. Nil rejected.
WithClientID(s) broker-assigned MQTT ClientID. Empty = ask broker to assign one via AssignedClientIdentifier; cli.ClientID() then returns that assigned value after CONNACK.
WithCredentials(user, pass) CONNECT username + password. Static. For per-attempt rotation use WithConnectPacketBuilder.
WithKeepAlive(seconds) 30 Keepalive interval. 0 rejected — use WithoutKeepAlive.
WithoutKeepAlive() Disable PINGREQ entirely. Rarely correct in production.
WithCleanStart(b) true CleanStart on the initial CONNECT.
WithCleanStartOnReconnect(b) false CleanStart on every reconnect. False preserves QoS 1/2 session for resume.
WithSessionExpiry(seconds) 300 (5 min) Session Expiry Interval (§3.1.2.11.2). Default holds the broker session long enough for QoS 1/2 resume across a typical reconnect blip. Pass 0 to end the session with the connection.
WithReceiveMaximum(n) unset (broker default 65535) Cap on concurrent inbound QoS 1/2.
WithMaximumPacketSize(n) 0 (no advertised limit) CONNECT property §3.1.2.11.4 — caps the largest packet the broker may send. Note: with the default, a buggy / hostile broker can send arbitrarily large PUBLISHes; set explicitly when broker trust is limited.
WithInboundTopicAliasMaximum(n) 0 (no inbound aliases) CONNECT property §3.1.2.11.5 — opt into wire compression on inbound PUBLISHes.
WithRequestResponseInformation(b) false CONNECT property §3.1.2.11.6 — broker returns ResponseInformation in CONNACK.
WithRequestProblemInformation(b) true CONNECT property §3.1.2.11.7 — broker returns ReasonString / UserProperties on errors. On by default; pass false to opt out.
WithConnectUserProperty(k, v) / WithConnectUserProperties(p) CONNECT user properties; append-style or bulk replace.
WithConnectTimeout(d) 10 s Dial + CONNECT/CONNACK budget.
WithPingTimeout(d) 10 s PINGRESP budget. Total dead-connection detection = KeepAlive + PingTimeout (40 s with defaults), inside the broker's 1.5×KeepAlive cutoff so the client drives reconnect.
WithDisconnectFlushTimeout(d) 500 ms Flush budget for the DISCONNECT write on graceful shutdown.
WithWriteQueueSize(n) 256 Internal MPSC write buffer.
WithWriteBatch(n) 0 (off) Coalesce up to n pre-encoded packets per writev syscall. Wins under sustained concurrent publishers; measure before enabling.
WithWriteOverflowPolicy(p) WriteBlock QoS 0 only. WriteBlock waits for queue room / ctx; WriteDropNewest returns ErrWriteQueueFull immediately when the writer queue is full. Use for telemetry where head-of-line latency on the producer is worse than occasional loss. QoS 1/2 always block on the broker ack regardless.
WithWill(opts) Will message + properties.
WithReconnectBackoff(b) ExponentialBackoff(1s, 30s, 200ms) ConstantBackoff(d) also shipped.
WithTLSConfig(*tls.Config) TLS for mqtts://.
WithDialer(*net.Dialer) default Override transport net.Dialer.
WithStore(s) in-memory session.Store impl (use store/file for crash safety).
WithLogger(*slog.Logger) slog.Default() Structured logging.
WithStats() Enable in-memory counters for Client.Stats() (off by default to keep the hot path branch-predictor friendly).
WithPublishMode(mode) PublishFireAndForget PublishWaitForFlush makes QoS 0 wait for conn.Write.
WithPublisherPool(N) 0 (off) N dedicated publish-only conns.
WithPublisherPoolRouting(p) PoolRoutingRoundRobin PoolRoutingHashByTopic preserves per-topic ordering.
WithPublisherPoolClientIDFn(fn) "%s-pub-%d" Customise per-member ClientIDs. Required when the parent ClientID is empty (broker-assigned).
WithMaxSubscribeQueueSize(n) 0 (unbounded) Default per-sub queue cap.
WithDropPolicy(p) DropNewest Default drop policy for full sub buffers.
WithOnConnectionUp(fn) func(*wire.Connack). Fires after every successful CONNACK; receives a detached clone safe to retain. Must not block.
WithOnConnectionDown(fn) func() bool. Fires on unexpected disconnect (not user Disconnect). Return false to terminate the supervisor. Must not block.
WithOnConnectError(fn) Fires per failed CONNECT attempt (dial err, CONNACK refusal, AUTH-loop err). Observability only.
WithOnReconnectAttempt(fn) Fires immediately before each reconnect dial. Receives (attempt, brokerURL).
WithOnServerDisconnect(fn) Fires on broker-initiated DISCONNECT with detached *wire.Disconnect. May call Client.SetBrokers(...) to redirect.
WithConnectPacketBuilder(fn) func(ctx, *wire.ConnectOpts) error. Mutate CONNECT immediately before serialisation; canonical OAuth-token-rotation hook.
WithAuthenticator(a) MQTT v5 enhanced auth (CONNECT + mid-session §4.12).
Per-subscribe

SubscribeOption applies to Subscribe, SubscribeQueue, SubscribeCallback, plus the Typed[T] and ClientGroup variants.

Option Effect
SubBuffer(n) Channel buffer size (Subscribe only). Default DefaultSubscribeBuffer (64).
SubMaxQueueSize(n) Queue cap (SubscribeQueue only). 0 = unbounded.
SubDropPolicy(p) DropNewest / DropOldest. SubscribeQueue honours both; chan-based Subscribe returns ErrChanDropOldestUnsupported when DropOldest is set explicitly.
SubOnDrop(fn) Metrics hook fired when a message is dropped + acked.
SubAutoAck() Opt-in: dispatcher acks each delivery before handing it to the consumer; the received *Message is a detached copy (Topic/Payload/Properties cloned, safe to retain) and m.Ack() is a no-op. Trade: 2 allocs/msg + breaks at-least-once semantics (consumer crash between delivery and processing has nothing to replay). Reach for it on QoS 0 / observational consumers. Ignored by SubscribeCallback.
Per-QueuePublisher
Option Effect
WithQueueBatchSize(n) Drain batch ceiling. Default 16.
WithQueueMaxSize(n) Bound the queue; Publish returns ErrQueueFull when at cap (DropNewest) or evicts the head (DropOldest).
WithQueueDropPolicy(p) DropNewest (default) or DropOldest. DropOldest calls PublisherQueue.EvictHead; backends that can't evict return ErrEvictionNotSupported at construct.
WithQueueIdleInterval(d) Drain-loop wakeup tick when no Enqueue signal arrives. Default 500 ms.
WithQueuePublishTimeout(d) Per-message broker handshake cap inside the drain loop. Default 30 s.
WithQueueTTL(d) Drop entries older than d at drain time; mirrors into MessageExpiryInterval so the broker also enforces.
WithDeadLetter(fn) Terminal-failure callback (TTL expiry, DropOldest eviction).

Observability — Client.Stats()

Client.Stats() returns a snapshot of in-memory counters. Opt in via WithStats() — when off, the hot path skips every atomic increment and Stats() returns the zero value.

cli, _ := mqttv5.New(
    mqttv5.WithBroker(broker),
    mqttv5.WithStats(),
)
// ...
s := cli.Stats()
fmt.Printf("sent=%d acked=%d inflight=%d connects=%d failures=%d\n",
    s.PublishesSent, s.PublishesAcked, s.PublishesInflight,
    s.Connects, s.ConnectFailures)

Counters cover connect/disconnect/publish/subscribe lifecycle, inbound drops, pool fallbacks, and ping timeouts. Bridge each field into your own metrics surface (Prometheus / OpenTelemetry / expvar) — the lib intentionally has no metrics-library dependency. Full field list in the Stats godoc. See examples/stats.

Graceful disconnect

Disconnect(ctx) sends ReasonNormalDisconnection with no properties. Use DisconnectWith(ctx, opts) to override:

expiry := uint32(0)
_ = cli.DisconnectWith(ctx, wire.DisconnectOpts{
    ReasonCode:            wire.ReasonAdministrativeAction,
    ReasonString:          "planned shutdown",
    SessionExpiryInterval: &expiry, // override to drop the session immediately
})

The OnConnectionDown callback is not invoked on a user-initiated disconnect — the call site itself is the "going down" signal. See examples/disconnect.

Per-attempt credential rotation

WithConnectPacketBuilder(fn) runs immediately before each CONNECT is serialised. Use it to refresh an OAuth token, fetch a SigV4-signed CONNECT credential, or rotate any other per-attempt secret. The context is bounded by ConnectTimeout.

mqttv5.WithConnectPacketBuilder(func(ctx context.Context, opts *wire.ConnectOpts) error {
    tok, err := oauth.FetchToken(ctx)
    if err != nil {
        return err // fails this attempt; supervisor retries after backoff
    }
    opts.Username = "service-account"
    opts.Password = []byte(tok)
    return nil
}),

Pair with WithOnConnectError for observability — every refusal / network failure fires the callback with the per-attempt error. See examples/oauth.

Sentinel errors

Branch with errors.Is(err, ...); stable across versions.

Error Source Meaning
ErrNotConnected Publish, Subscribe*, Unsubscribe No live connection. Retry / wait for reconnect.
ErrAlreadyConnected Connect Connect called twice.
ErrClosed any after Disconnect Client torn down.
ErrConnectRefused Connect Broker non-success CONNACK reason.
ErrUnexpectedPacket various Broker sent an unexpected packet for the current state. Treat as protocol bug.
ErrMissingBroker New No URLs supplied.
ErrInvalidBrokerURL New, SetBrokers URL failed to parse or has unsupported scheme. WithDialFunc relaxes scheme validation.
ErrChanDropOldestUnsupported Subscribe (chan) Explicit SubDropPolicy(DropOldest) on the channel-based Subscribe. Use SubscribeQueue for DropOldest.
ErrWriteQueueFull Publish (QoS 0) Writer queue at capacity AND client configured with WithWriteOverflowPolicy(WriteDropNewest). The publish never reached the wire.
ErrNilHandler SubscribeCallback Handler argument was nil.
ErrSharedSubsUnsupported Subscribe* Broker disabled $share/... in CONNACK.
ErrWildcardSubsUnsupported Subscribe* Broker disabled + / # in CONNACK.
ErrSubscriptionIDsUnsupported Subscribe* Broker disabled SubscriptionIdentifier property.
ErrNoHealthyPublishers publisher pool internal All pool members down — falls back to main conn.
ErrQueueClosed QueuePublisher.Publish After Close.
ErrQueueFull QueuePublisher.Publish WithQueueMaxSize cap reached (DropNewest).
ErrQoS0NotQueueable QueuePublisher.Publish QoS 0 + durable enqueue is meaningless.
ErrEvictionNotSupported NewQueuePublisher DropOldest requested on a backend that can't evict.
ws.ErrMissingTLSConfig transport/ws.Dial / DialFunc wss:// URL without a TLS config. No silent downgrade.

Performance vs autopaho

Apple M2 Pro, Go 1.26, eclipse-mosquitto on loopback. Full output: benchmarks/e2e_results.txt. WithStats is off in the published numbers; the in-memory counters compile to a nil-check on the hot path when disabled, so enabling them is negligible — re-run the suite if you want exact numbers under your load.

Codec micro (wire vs paho.golang/packets)
Decode 256 B, no props autopaho mqttv5 speedup
ns/op 1,326 50 27×
allocs/op 22 0
B/op 5,187 0
Decode 256 B + 5 user properties (lazy) autopaho mqttv5 speedup
ns/op 5,732 54 106×
allocs/op 93 0

Decode allocation is constant in payload sizeTopic and Payload are zero-copy slices into a pooled frame.

End-to-end vs real broker
Workload, 256 B payload autopaho mqttv5 mqttv5 wins
Publish QoS 0 single goroutine 5,500 ns 4,900 ns 10 % faster, 3.75× fewer allocs
Publish QoS 1 (waits for PUBACK) 200 µs 184 µs 10 % faster, 5× fewer allocs
Publish QoS 1 × 8 goroutines, 1 KiB 32.1 µs 13.0 µs 2.5× faster under fan-in
RoundTrip (pub → broker → sub) 267 µs 256 µs 4× fewer allocs

The single-goroutine QoS-0 case where autopaho's mutex-around-Write edges mqttv5 by ~2 µs is the only loss. That mutex is exactly what blocks autopaho from scaling — the trade-off is 2 µs per call against the 2.5× win under fan-in.

Reliability semantics

Behaviour Detail
Connect Blocks until CONNACK (or ctx). Supervisor handles all subsequent reconnects in the background.
Reconnect ExponentialBackoff(1s, 30s, 200ms) default. With WithBrokers, URLs rotate per attempt; successful connect sticks.
Publish QoS 1/2 across drop Serialised once at register time; replayed on every reconnect with DUP=1 (§3.3.1.1). Caller stays blocked on session's Done and resumes on the eventual ack.
Subscribe across drop Every active subscription re-issued on every reconnect. Subs the broker refused (SUBACK reason ≥ 0x80) drop from the resubscribe set.
Mid-session re-auth (§4.12) Broker AUTH post-CONNACK routes to Authenticator.Continue. No Authenticator configured = client emits DISCONNECT 0x87 and reconnects via fresh CONNECT.
CONNACK capability flags Shared / Wildcard / SubscriptionIdentifier availability honoured — Subscribe* errors before the wire if the broker disabled the feature.
Server-initiated DISCONNECT WithOnServerDisconnect(fn) fires with detached *wire.Disconnect before the generic OnConnectionDown. Callback may call Client.SetBrokers(...) to redirect; new list takes effect on next reconnect.
PINGRESP liveness No PINGRESP within PingTimeout → connection treated as dead → supervisor redials.
Manual ack ordering QoS 1 PUBACK held until m.Ack(), flushed in §4.6 arrival order. QoS 2 PUBREC held until m.Ack(); PUBCOMP fires automatically when PUBREL arrives.
Multi-handler dispatch A PUBLISH matching multiple overlapping filters delivers the same *Message to every handler. m.Ack() is refcounted — only the final call releases the frame.
Topic / payload lifetime Zero-copy slices into a pooled frame; valid until Ack() returns. Use m.CloneTopic() / m.ClonePayload() to retain past Ack().
Topic alias outbound Auto-allocated on QoS 0 publishes when broker advertises TopicAliasMaximum > 0. Skipped for QoS 1/2 so replay carries the full topic.
Disconnect Best-effort graceful DISCONNECT (bounded by ctx + cs.dying), tears down per-conn goroutines, closes consumer channels/queues. Idempotent.

Architecture

One goroutine per connection drives read → decode → trie match → handlers (sync). A dedicated writer goroutine drains an MPSC channel of outbound frames — no mutex-around-Write contention. Packets and frame buffers come from per-type pools; topic and payload are zero-copy slices into the frame, valid until refcounted Message.Ack(). Properties decode lazily. A supervisor handles reconnect with configurable backoff, replays in-flight QoS 1/2 publishes with DUP=1, and re-issues every tracked subscription.

Build / test / bench

# Core — no broker required.
go test ./...
go test -race ./...

# Codec micro benchmarks (no broker).
go -C benchmarks test -bench=. -run=^$ -benchmem -count=3 -benchtime=2s

# End-to-end vs autopaho (needs mosquitto).
docker compose -f conformance/docker-compose.yml up -d mosquitto
go -C benchmarks test -tags e2e -bench='^BenchmarkE2E_' -benchmem -benchtime=2s -count=2

# Conformance suite (mosquitto + emqx).
docker compose -f conformance/docker-compose.yml up -d
go -C conformance test -tags conformance -race -v

Stability

  • Wire protocol: MQTT v5 OASIS, stable.
  • Public Client / Config / option / Stats surface is stable; any breaking change is called out in release notes with a mapping.
  • Sentinel errors above are stable; branch on them with errors.Is.
  • Submodules version independently — each has its own go.mod.
  • wire/ codec internals are mutable — treat as private.

Independence

Independent, clean-room implementation written from the MQTT v5.0 OASIS specification. Not a fork of any existing Go MQTT client. The benchmarks/ submodule imports eclipse/paho.golang for head-to-head comparison only — it is not redistributed.

License

Apache 2.0 — see LICENSE for the full text and NOTICE for the attribution notice. Per-file headers carry SPDX-License-Identifier: Apache-2.0.

Documentation

Overview

Package mqttv5 is a fast, ergonomic MQTT v5 client for Go. The supervisor (reconnect, replay-in-flight, auto-resubscribe) is baked into every Client; there is no separate "auto-reconnect" wrapper.

Why this package

One package, one Client. No paho / autopaho split — reconnect, replay, and resubscribe are always on. Channel-native subscribe: <-chan *Message (Client.Subscribe), Queue of *Message (Client.SubscribeQueue), or a sync callback (Client.SubscribeCallback). Backpressure is a first-class concept — per-subscription DropNewest / DropOldest auto-ack the dropped message so the broker stops retransmitting.

Three multi-broker patterns are kept distinct:

Typed publish / subscribe goes through the generic Codec interface; JSON and msgpack ship in separate submodules so the core stays stdlib-only. The durable outbound QueuePublisher lets you enqueue publishes while disconnected, drain on reconnect, and survive process restart (when combined with the queue/file submodule).

Operator surface:

Full MQTT v5 conformance: shared subscriptions, topic aliases (in + out), session expiry, retained messages, will + will properties, enhanced authentication (CONNECT and mid-session §4.12), and CONNACK capability flags honoured before any matching SUBSCRIBE traffic goes on the wire.

Quick start

cli, err := mqttv5.New(
    mqttv5.WithBroker("mqtt://localhost:1883"),
    mqttv5.WithClientID("ingest-svc-1"),
)
if err != nil { panic(err) }

ctx := context.Background()
if err := cli.Connect(ctx); err != nil { panic(err) }
defer cli.Disconnect(ctx)

// Channel subscribe — manual ack, §4.6-ordered PUBACK flush.
msgs, _, err := cli.Subscribe(ctx,
    []mqttv5.TopicFilter{{Topic: "events/#", QoS: 1}})
if err != nil { panic(err) }
go func() {
    for m := range msgs {
        fmt.Printf("%s: %s\n", m.Topic, m.Payload)
        _ = m.Ack()
    }
}()

// QoS 1 publish — supervisor replays with DUP=1 across drops.
_ = cli.Publish(ctx, wire.PublishOpts{
    Topic:   "events/example",
    Payload: []byte("hello"),
    QoS:     1,
})

See the Example functions below for end-to-end snippets covering each subscribe shape, typed payloads, multi-broker fan-out, and the durable queue publisher.

Options naming

Option helpers split by scope: Option (passed to New) configures the Client and is named With…; SubscribeOption (passed inline to Subscribe / SubscribeQueue) configures one subscription and is named Sub…; ClientGroupOption (passed to NewClientGroup) configures the group and is named WithGroup….

Submodules

Each opt-in submodule has its own go.mod so importing it does not add a runtime dependency to the core:

  • codec/json — JSON Codec implementation, wired into Typed.
  • codec/msgpack — MessagePack Codec via vmihailenco/msgpack/v5.
  • queue/file — Durable outbound publish queue (filesystem WAL).
  • store/file — Crash-safe session store for in-flight QoS 1/2.
  • transport/ws — WebSocket transport via gobwas/ws; wire it in with WithDialFunc.

Architecture

One goroutine per connection drives the read path (read -> decode -> trie match -> handler) with handlers running synchronously on the reader. A second goroutine drains a many-producer-single-consumer write channel — no mutex around net.Conn.Write, so concurrent publishers scale across cores.

Packets and frame buffers come from per-type sync.Pools. Inbound Message aliases the pooled frame for zero-copy Topic and Payload; the frame returns to the pool only when refcounted Message.Ack reaches the last handler. A supervisor goroutine reconnects with configurable backoff, replays in-flight QoS 1/2 publishes with DUP=1, and re-issues every tracked subscription.

Benchmarks

See benchmarks/ for end-to-end and codec micro-benchmarks against eclipse/paho.golang.

Package mqttv5 is a high-performance MQTT v5 client.

The client speaks raw bytes by default. Callers that want typed payloads can wrap with Typed[T] via a Codec[T] from one of the sibling codec submodules. See the package doc in doc.go for an architecture overview.

Example

Example shows the minimal connect → subscribe → publish loop. The supervisor (reconnect, replay, resubscribe) is implicit — once Connect returns, every drop is recovered automatically until Disconnect.

package main

import (
	"context"
	"fmt"

	"github.com/ashtonian/mqttv5"
	"github.com/ashtonian/mqttv5/wire"
)

func main() {
	cli, err := mqttv5.New(
		mqttv5.WithBroker("mqtt://localhost:1883"),
		mqttv5.WithClientID("example"),
	)
	if err != nil {
		panic(err)
	}

	ctx := context.Background()
	if err := cli.Connect(ctx); err != nil {
		panic(err)
	}
	defer cli.Disconnect(ctx)

	msgs, _, err := cli.Subscribe(ctx, []mqttv5.TopicFilter{{Topic: "demo/#", QoS: 1}})
	if err != nil {
		panic(err)
	}
	go func() {
		for m := range msgs {
			fmt.Printf("recv %s %s\n", m.Topic, m.Payload)
			_ = m.Ack()
		}
	}()

	_ = cli.Publish(ctx, wire.PublishOpts{
		Topic:   "demo/hello",
		Payload: []byte("world"),
		QoS:     1,
	})
}

Index

Examples

Constants

View Source
const (
	// DefaultKeepAlive is the keepalive interval used when
	// WithKeepAlive is not called. MQTT v5 spec range is 1..65535
	// seconds.
	DefaultKeepAlive uint16 = 30

	// DefaultConnectTimeout bounds the CONNECT handshake (dial +
	// CONNECT/CONNACK) when WithConnectTimeout is not called.
	DefaultConnectTimeout = 10 * time.Second

	// DefaultPingTimeout is the budget for PINGRESP after a PINGREQ
	// before the connection is declared dead. Chosen flat (not a
	// multiple of KeepAlive) because PINGRESP is a tiny packet whose
	// RTT does not scale with how often PINGREQ is sent. With
	// DefaultKeepAlive (30 s) the total dead-detect time is 40 s —
	// comfortably inside every common broker's 1.5×KeepAlive (~45 s)
	// disconnect window, so the client drives the reconnect rather
	// than getting cut.
	DefaultPingTimeout = 10 * time.Second

	// DefaultSessionExpiry is the Session Expiry Interval (§3.1.2.11.2)
	// advertised on every CONNECT. 5 minutes is long enough to ride out
	// the typical reconnect blip / container redeploy window so QoS 1/2
	// resume actually works out of the box; short enough that the
	// broker isn't asked to hold idle sessions indefinitely.
	DefaultSessionExpiry uint32 = 300

	// DefaultWriteQueueSize sizes the internal MPSC write channel.
	DefaultWriteQueueSize = 256

	// DefaultDisconnectFlushTimeout caps how long writeDisconnectAndClose
	// will wait for the DISCONNECT packet to flush before the
	// connection is forcibly torn down.
	DefaultDisconnectFlushTimeout = 500 * time.Millisecond
)

Default values used by Config.defaults. Exported so callers can reference them when overriding selectively.

View Source
const (
	// DefaultQueueBatchSize caps the per-PeekBatch entry count.
	DefaultQueueBatchSize = 16

	// DefaultQueueIdleInterval is the drain-loop wakeup tick when no
	// Enqueue signals have arrived. Lower values reduce the worst-
	// case latency to pick up restored entries; higher values reduce
	// wakeup cost.
	DefaultQueueIdleInterval = 500 * time.Millisecond

	// DefaultQueuePublishTimeout caps each per-message broker
	// handshake (ack wait) inside the drain loop.
	DefaultQueuePublishTimeout = 30 * time.Second
)

Default values used by NewQueuePublisher when QueueOptions leave a field unset.

View Source
const DefaultPublisherPoolClientIDFormat = "%s-pub-%d"

DefaultPublisherPoolClientIDFormat is the format string used by the default pool-member ClientID function: parent ClientID, then "-pub-", then the 1-based member index.

View Source
const DefaultSubscribeBuffer = 64

DefaultSubscribeBuffer is the channel buffer size used by Subscribe when SubBuffer is not called.

Variables

View Source
var (
	ErrNotConnected     = errors.New("mqttv5: client not connected")
	ErrAlreadyConnected = errors.New("mqttv5: client already connected")
	ErrClosed           = errors.New("mqttv5: client closed")
	ErrConnectRefused   = errors.New("mqttv5: broker refused CONNECT")
	ErrUnexpectedPacket = errors.New("mqttv5: unexpected packet from broker")

	// ErrWriteQueueFull is returned by QoS 0 Publish when the client
	// is configured with WriteDropNewest and the writer queue has no
	// room. The publish never reaches the wire.
	ErrWriteQueueFull = errors.New("mqttv5: write queue full")

	// Subscribe-time errors raised when the broker advertised the
	// feature as unsupported in CONNACK (MQTT v5 §3.2.2.3.{11,12,13}).
	// Returned before any SUBSCRIBE traffic goes on the wire.
	ErrSharedSubsUnsupported      = errors.New("mqttv5: broker does not support shared subscriptions")
	ErrWildcardSubsUnsupported    = errors.New("mqttv5: broker does not support wildcard subscriptions")
	ErrSubscriptionIDsUnsupported = errors.New("mqttv5: broker does not support subscription identifiers")
)

Errors returned by the client.

View Source
var (
	// ErrQueueClosed is returned by Enqueue / Publish after Close.
	ErrQueueClosed = errors.New("mqttv5: queue is closed")

	// ErrQueueFull is returned by [QueuePublisher.Publish] when the
	// queue is at its [WithQueueMaxSize] cap and DropNewest is in
	// effect.
	ErrQueueFull = errors.New("mqttv5: queue is full")

	// ErrQoS0NotQueueable is returned when a QoS 0 publish reaches
	// [QueuePublisher.Publish] — durable enqueue is meaningless when
	// the broker has no delivery obligation.
	ErrQoS0NotQueueable = errors.New("mqttv5: QueuePublisher requires QoS >= 1")

	// ErrEvictionNotSupported is returned by
	// [PublisherQueue.EvictHead] for backends that cannot evict
	// (append-only disk queues, etc.). [NewQueuePublisher] surfaces
	// this at construct time when [DropOldest] is requested.
	ErrEvictionNotSupported = errors.New("mqttv5: queue backend does not support eviction")
)

Errors returned by the QueuePublisher / PublisherQueue surface.

View Source
var DefaultReconnectBackoff = ExponentialBackoff(
	time.Second, 30*time.Second, 200*time.Millisecond,
)

DefaultReconnectBackoff is used when WithReconnectBackoff is not called: 1s -> 2s -> 4s -> ... up to 30s with 200ms jitter.

View Source
var ErrChanDropOldestUnsupported = errors.New(
	"mqttv5: Subscribe (chan) does not support DropOldest; use SubscribeQueue",
)

ErrChanDropOldestUnsupported is returned by Client.Subscribe when called with SubDropPolicy(DropOldest); use Client.SubscribeQueue for DropOldest semantics.

View Source
var ErrInvalidBrokerURL = errors.New("mqttv5: invalid broker URL")

ErrInvalidBrokerURL is returned when a broker URL fails to parse or has an unsupported scheme. WithDialFunc relaxes scheme validation.

View Source
var ErrMissingBroker = errors.New("mqttv5: missing broker URL (use WithBroker)")

ErrMissingBroker is returned by New when no broker URL was provided.

View Source
var ErrNilHandler = errors.New("mqttv5: subscribe handler must not be nil")

ErrNilHandler is returned by Client.SubscribeCallback when the handler argument is nil.

View Source
var ErrNoHealthyPublishers = errors.New("mqttv5: no healthy publishers in pool")

ErrNoHealthyPublishers is returned by publisherPool.publish when every pool member is disconnected. The Client falls back to the main connection in this case (and increments PoolFallbacks).

Functions

This section is empty.

Types

type Authenticator

type Authenticator interface {
	Method() string
	Begin() (data []byte, err error)
	Continue(brokerData []byte) (response []byte, done bool, err error)
}

Authenticator drives the client side of MQTT v5 enhanced authentication — the challenge-response handshake that runs during CONNECT (SCRAM, Kerberos, OAuth challenge, etc.).

Method() supplies the AuthenticationMethod property; Begin() the initial AuthenticationData. For each AUTH packet from the broker before CONNACK, Continue(brokerData) returns the next response. done=true is informational — the broker decides when to send CONNACK. A non-nil error from Continue aborts the connection.

Implementations need only be safe for concurrent use when shared across multiple Clients (e.g. with WithPublisherPool).

type Backoff

type Backoff func(attempt int) time.Duration

Backoff returns the delay before retry attempt N. The first retry after a failed connect is attempt 0 (the initial connect itself runs immediately, no backoff). Implementations must be safe for concurrent use.

func ConstantBackoff

func ConstantBackoff(d time.Duration) Backoff

ConstantBackoff returns the same delay for every attempt. Predictable but vulnerable to thundering-herd reconnect storms; ExponentialBackoff is usually preferable.

func ExponentialBackoff

func ExponentialBackoff(min, max, jitter time.Duration) Backoff

ExponentialBackoff doubles min each attempt up to max with ±jitter added (range [computed-jitter, computed+jitter]).

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is an MQTT v5 client. After New(opts...), call Connect to establish the first connection and start the supervisor that reconnects on drop.

func New

func New(opts ...Option) (*Client, error)

New constructs a Client. Apply WithBroker (required) plus any other options. The Client is not connected — call Connect.

func (*Client) ClientID

func (c *Client) ClientID() string

ClientID returns the configured ClientID. ClientID returns the ClientID currently in effect. When the broker assigned an identifier via the AssignedClientIdentifier CONNACK property (typical when WithClientID("") was used), the broker-side value is returned; otherwise the value configured via WithClientID is returned.

func (*Client) Connect

func (c *Client) Connect(ctx context.Context) error

Connect performs the initial CONNECT/CONNACK handshake (bounded by ctx) and starts the supervisor. Reconnect attempts run on internal background contexts until Client.Disconnect. Returns ErrAlreadyConnected when called twice without an intervening Disconnect.

func (*Client) Connected

func (c *Client) Connected() bool

Connected reports whether the client currently has a live connection.

func (*Client) Disconnect

func (c *Client) Disconnect(ctx context.Context) error

Disconnect sends a graceful DISCONNECT (reason Normal Disconnection), stops the supervisor, and tears down the current connection. For a custom reason / properties use Client.DisconnectWith. Does not fire OnConnectionDown. Idempotent.

func (*Client) DisconnectWith

func (c *Client) DisconnectWith(ctx context.Context, opts wire.DisconnectOpts) error

DisconnectWith sends a DISCONNECT with opts, stops the supervisor, and tears down the current connection. Pool-member disconnect errors are joined into the returned error. Idempotent.

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, opts wire.PublishOpts) error

Publish sends a PUBLISH packet.

QoS 1/2 packets are stashed on the session entry so the supervisor replays them with DUP=1 across reconnect; callers stay blocked on the ack across the drop. With WithPublisherPool the call first tries the pool per PoolRoutingPolicy and falls back to the main connection if every pool member is unhealthy.

Example

ExampleClient_Publish sends a QoS 1 publish. Publish returns after PUBACK arrives. If the connection drops mid-flight the session entry persists and the supervisor replays the packet with DUP=1 on reconnect; the caller stays blocked across the drop.

package main

import (
	"context"
	"fmt"

	"github.com/ashtonian/mqttv5"
	"github.com/ashtonian/mqttv5/wire"
)

func main() {
	cli, _ := mqttv5.New(mqttv5.WithBroker("mqtt://localhost:1883"))
	ctx := context.Background()
	_ = cli.Connect(ctx)
	defer cli.Disconnect(ctx)

	err := cli.Publish(ctx, wire.PublishOpts{
		Topic:   "events/example",
		Payload: []byte(`{"value":42}`),
		QoS:     1,
	})
	if err != nil {
		fmt.Println("publish:", err)
	}
}

func (*Client) SetBrokers

func (c *Client) SetBrokers(urls ...string) error

SetBrokers replaces the broker URL list at runtime; the next reconnect uses it. Typical caller: WithOnServerDisconnect on a ServerMoved / UseAnotherServer redirect. URLs are validated as in New.

Example

ExampleClient_SetBrokers shows multi-broker failover. The supervisor rotates through mqttv5.Config.BrokerURLs on every reconnect attempt; mqttv5.Client.SetBrokers replaces the list at runtime — the canonical hook is a ServerMoved DISCONNECT.

package main

import (
	"crypto/tls"

	"github.com/ashtonian/mqttv5"
	"github.com/ashtonian/mqttv5/wire"
)

func main() {
	var cli *mqttv5.Client
	cli, _ = mqttv5.New(
		mqttv5.WithBrokers(
			"mqtts://broker-a.example.com:8883",
			"mqtts://broker-b.example.com:8883",
		),
		mqttv5.WithTLSConfig(&tls.Config{MinVersion: tls.VersionTLS13}),
		mqttv5.WithOnServerDisconnect(func(d *wire.Disconnect) {
			if ref, ok := d.Properties.String(wire.PropServerReference); ok {
				_ = cli.SetBrokers(ref)
			}
		}),
	)
	_ = cli
}

func (*Client) Stats

func (c *Client) Stats() Stats

Stats returns a snapshot of the Client's counters. Cheap to call — the returned value is detached. Returns the zero value when stats are disabled (WithStats not set on the Config).

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (<-chan *Message, SubscriptionToken, error)

Subscribe sends one SUBSCRIBE for filters and returns a buffered channel of matching inbound messages. Callers MUST call Message.Ack on each. Returns once SUBACK arrives; the channel closes on Client.Unsubscribe or Client.Disconnect. Full-buffer messages are dropped + acked (DropNewest); observe drops via SubOnDrop. Returns ErrChanDropOldestUnsupported when called with explicit SubDropPolicy of DropOldest.

Example

ExampleClient_Subscribe receives matched messages on a buffered channel. The caller MUST call mqttv5.Message.Ack on each one — for QoS 1/2 the broker-side ack is held until Ack is called, in §4.6 arrival order across overlapping filters.

package main

import (
	"context"
	"fmt"

	"github.com/ashtonian/mqttv5"
)

func main() {
	cli, _ := mqttv5.New(mqttv5.WithBroker("mqtt://localhost:1883"))
	ctx := context.Background()
	_ = cli.Connect(ctx)
	defer cli.Disconnect(ctx)

	msgs, token, err := cli.Subscribe(ctx,
		[]mqttv5.TopicFilter{{Topic: "events/#", QoS: 1}},
		mqttv5.SubBuffer(256),
		mqttv5.SubOnDrop(func(m *mqttv5.Message) {
			// Dropped messages have already been ack'd; this is for
			// observability only. Must not call Ack and must not retain
			// Topic/Payload past return.
			fmt.Println("dropped", m.Topic)
		}),
	)
	if err != nil {
		return
	}
	for m := range msgs {
		fmt.Println(m.Topic, string(m.Payload))
		_ = m.Ack()
	}
	_ = cli.Unsubscribe(ctx, token)
}

func (*Client) SubscribeCallback

func (c *Client) SubscribeCallback(ctx context.Context, filters []TopicFilter, h HandlerFunc) (SubscriptionToken, error)

SubscribeCallback registers a subscription that invokes h for every matching inbound PUBLISH.

h runs synchronously on the read goroutine and MUST NOT block — a slow handler stalls PINGRESP and drops the connection. The runtime auto-acks via Message.Ack after h returns; use Client.Subscribe or Client.SubscribeQueue when ack must be deferred past h. Returns ErrNilHandler when h is nil.

Example

ExampleClient_SubscribeCallback registers a synchronous handler invoked on the read goroutine. The handler must not block — a slow callback stalls PINGRESP processing and drops the connection. Ack fires automatically after the callback returns.

package main

import (
	"context"
	"fmt"

	"github.com/ashtonian/mqttv5"
)

func main() {
	cli, _ := mqttv5.New(mqttv5.WithBroker("mqtt://localhost:1883"))
	ctx := context.Background()
	_ = cli.Connect(ctx)
	defer cli.Disconnect(ctx)

	_, err := cli.SubscribeCallback(ctx,
		[]mqttv5.TopicFilter{{Topic: "ctrl/+", QoS: 0}},
		func(m *mqttv5.Message) {
			fmt.Println(m.Topic, string(m.Payload))
		},
	)
	if err != nil {
		return
	}
}

func (*Client) SubscribeQueue

func (c *Client) SubscribeQueue(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (*Queue[*Message], SubscriptionToken, error)

SubscribeQueue sends one SUBSCRIBE and returns a Queue of matching messages. Unbounded by default; cap with SubMaxQueueSize. On overflow, SubDropPolicy decides: DropNewest acks + drops the inbound; DropOldest dequeues + acks the head and admits the new one. Closes on Client.Unsubscribe or Client.Disconnect.

Example

ExampleClient_SubscribeQueue stores matched messages in an unbounded queue. mqttv5.DropOldest evicts the queue head when SubMaxQueueSize is reached so the freshest N messages survive.

package main

import (
	"context"
	"fmt"

	"github.com/ashtonian/mqttv5"
)

func main() {
	cli, _ := mqttv5.New(mqttv5.WithBroker("mqtt://localhost:1883"))
	ctx := context.Background()
	_ = cli.Connect(ctx)
	defer cli.Disconnect(ctx)

	q, _, err := cli.SubscribeQueue(ctx,
		[]mqttv5.TopicFilter{{Topic: "events/#", QoS: 1}},
		mqttv5.SubMaxQueueSize(10_000),
		mqttv5.SubDropPolicy(mqttv5.DropOldest),
	)
	if err != nil {
		return
	}
	for {
		m, ok := q.Dequeue(ctx)
		if !ok {
			break
		}
		fmt.Println(m.Topic, string(m.Payload))
		_ = m.Ack()
	}
}

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(ctx context.Context, token SubscriptionToken) error

Unsubscribe sends an UNSUBSCRIBE for the topics covered by token, removes the handlers from the matcher, and drops them from the resubscribe set so reconnects don't re-request them.

type ClientGroup

type ClientGroup struct {
	// contains filtered or unexported fields
}

ClientGroup manages N parallel sessions to N independent brokers. Each member is a full Client with its own session and supervisor; the group adds dispatch policy on top. Use for bridges across independent brokers, multi-tenant SaaS with per-broker credentials, or a clustered broker fleet you want N parallel sessions into. For HA failover use WithBrokers on a single Client instead — ClientGroup does not failover between members.

func NewClientGroup

func NewClientGroup(members []GroupMember, opts ...ClientGroupOption) (*ClientGroup, error)

NewClientGroup constructs a ClientGroup over the given members. Each GroupMember becomes a full *Client; the group applies the configured publish policy on top.

Member ClientIDs are deduped: if two members resolve to the same ClientID, the second gets a "-N" suffix appended. Set GroupMember.Opts with WithClientID for explicit per-member IDs.

Defaults: GroupPublishBroadcast, parallel Connect/Disconnect.

Example

ExampleNewClientGroup creates one mqttv5.Client per broker and dispatches via the configured GroupPublishPolicy. Subscribe merges inbound messages from every member into one channel.

For failover within one logical client (interchangeable brokers, same data), use mqttv5.WithBrokers on a single Client instead — see [Example_ClientSetBrokers].

package main

import (
	"context"
	"fmt"

	"github.com/ashtonian/mqttv5"
)

func main() {
	group, err := mqttv5.NewClientGroup(
		[]mqttv5.GroupMember{
			{
				Broker: "mqtts://emea.example.com:8883",
				Name:   "emea",
				Opts:   []mqttv5.Option{mqttv5.WithCredentials("emea-svc", []byte("token-1"))},
			},
			{
				Broker: "mqtts://apac.example.com:8883",
				Name:   "apac",
				Opts:   []mqttv5.Option{mqttv5.WithCredentials("apac-svc", []byte("token-2"))},
			},
		},
		mqttv5.WithGroupSharedOpts(
			mqttv5.WithClientID("fanout"),
			mqttv5.WithKeepAlive(30),
		),
		mqttv5.WithGroupPublishPolicy(mqttv5.GroupPublishBroadcast),
	)
	if err != nil {
		panic(err)
	}

	ctx := context.Background()
	if err := group.Connect(ctx); err != nil {
		panic(err)
	}
	defer group.Disconnect(ctx)

	msgs, _, err := group.Subscribe(ctx,
		[]mqttv5.TopicFilter{{Topic: "events/#", QoS: 1}})
	if err != nil {
		return
	}
	for m := range msgs {
		fmt.Println(m.Topic)
		_ = m.Ack()
	}
}

func (*ClientGroup) Connect

func (g *ClientGroup) Connect(ctx context.Context) error

Connect dials every member. The default is parallel; use WithGroupSequentialLifecycle to serialise. The returned error is errors.Join of every member that failed — nil on full success. If at least one member succeeded the group is still usable for the healthy subset.

func (*ClientGroup) Disconnect

func (g *ClientGroup) Disconnect(ctx context.Context) error

Disconnect tears down every member. Bridge goroutines for outstanding Subscribe / SubscribeQueue calls drain first (bounded by ctx). The default is parallel; use WithGroupSequentialLifecycle to serialise. Returned error is errors.Join of every member's Disconnect error.

func (*ClientGroup) Member

func (g *ClientGroup) Member(name string) *Client

Member returns the Client for the named member, or nil when no such member exists.

func (*ClientGroup) Members

func (g *ClientGroup) Members() []*Client

Members returns the underlying Clients in member order. Use for per-member operations the group API doesn't expose directly (Stats per member, manual Publish to a specific member, etc.).

func (*ClientGroup) Names

func (g *ClientGroup) Names() []string

Names returns the member names in order.

func (*ClientGroup) Publish

func (g *ClientGroup) Publish(ctx context.Context, opts wire.PublishOpts) error

Publish dispatches opts across members per the configured GroupPublishPolicy. Returns nil on success (one member acked for RoundRobin/HashByTopic; at least one acked for Broadcast); the joined per-member errors otherwise.

func (*ClientGroup) Subscribe

func (g *ClientGroup) Subscribe(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (<-chan *Message, map[string]SubscriptionToken, error)

Subscribe subscribes on every member and merges all inbound messages into one channel. The caller MUST call msg.Ack() on each received message; Ack dispatches back to the member that delivered it.

The returned token map is keyed by member name — pass an entry to Unsubscribe to tear down that member's subscription, or use UnsubscribeAll to tear down everything.

Behaviour mirrors Client.Subscribe — Subscribe is "subscribe on all" only; for failover across interchangeable brokers use WithBrokers on a single Client.

func (*ClientGroup) SubscribeQueue

func (g *ClientGroup) SubscribeQueue(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (*Queue[*Message], map[string]SubscriptionToken, error)

SubscribeQueue is the queue-merged variant of Subscribe.

func (*ClientGroup) Unsubscribe

func (g *ClientGroup) Unsubscribe(ctx context.Context, name string, token SubscriptionToken) error

Unsubscribe tears down the named member's subscription.

func (*ClientGroup) UnsubscribeAll

func (g *ClientGroup) UnsubscribeAll(ctx context.Context, tokens map[string]SubscriptionToken) error

UnsubscribeAll tears down every token in the supplied map. Errors are joined.

type ClientGroupOption

type ClientGroupOption func(*clientGroupConfig)

ClientGroupOption configures a ClientGroup at construct time.

func WithGroupPublishPolicy

func WithGroupPublishPolicy(p GroupPublishPolicy) ClientGroupOption

WithGroupPublishPolicy selects the ClientGroup.Publish dispatch policy. See GroupPublishPolicy.

func WithGroupSequentialLifecycle

func WithGroupSequentialLifecycle() ClientGroupOption

WithGroupSequentialLifecycle disables the default parallel Connect / Disconnect / Subscribe across members. Useful in tests where deterministic ordering matters more than wall time.

func WithGroupSharedOpts

func WithGroupSharedOpts(opts ...Option) ClientGroupOption

WithGroupSharedOpts applies opts to every member before that member's own [GroupMember.Opts]. Use for shared settings (KeepAlive, Logger, ReconnectBackoff). Per-member Opts override.

type Codec

type Codec[T any] interface {
	Encode(v T) ([]byte, error)
	Decode(b []byte) (T, error)
}

Codec encodes and decodes payloads of type T. Implementations live in separate submodules so the core stays codec-agnostic.

type Config

type Config struct {
	// BrokerURLs is the ordered list of broker URLs. The supervisor
	// rotates through it on reconnect; [Client.SetBrokers] replaces
	// it at runtime.
	BrokerURLs []string
	ClientID   string
	Username   string
	Password   []byte
	KeepAlive  uint16

	// CleanStart sets the CleanStart flag on the initial CONNECT
	// (§3.1.2.4). Default true.
	CleanStart bool

	// CleanStartOnReconnect sets CleanStart on every CONNECT after
	// the first. Default false (preserve broker session for QoS 1/2
	// resume).
	CleanStartOnReconnect bool

	SessionExpiry  uint32
	ReceiveMaximum uint16

	// MaximumPacketSize caps the largest packet the broker may send
	// to this client (§3.1.2.11.4). Zero advertises no limit.
	MaximumPacketSize uint32

	// InboundTopicAliasMaximum is the inbound TopicAliasMaximum
	// (§3.1.2.11.5). Zero (default) tells the broker not to alias
	// inbound PUBLISHes. The outbound budget is taken automatically
	// from the broker's CONNACK.
	InboundTopicAliasMaximum uint16

	// RequestResponseInformation asks the broker for
	// ResponseInformation in CONNACK (§3.1.2.11.6).
	RequestResponseInformation bool

	// RequestProblemInformation asks the broker for ReasonString /
	// UserProperties on error responses (§3.1.2.11.7).
	RequestProblemInformation bool

	// ConnectUserProperties are sent verbatim as CONNECT user
	// properties.
	ConnectUserProperties []wire.UserProperty

	ConnectTimeout time.Duration
	PingTimeout    time.Duration

	// DisconnectFlushTimeout bounds the wait for a graceful
	// DISCONNECT write before the socket is forcibly closed.
	// Default 500ms.
	DisconnectFlushTimeout time.Duration

	WriteQueueSize int

	// WriteOverflowPolicy controls what QoS 0 Publish does when the
	// writer queue is full. WriteBlock (default) waits for room or
	// ctx; WriteDropNewest returns ErrWriteQueueFull immediately.
	// QoS 1/2 unaffected.
	WriteOverflowPolicy WriteOverflowPolicy

	// WriteBatchMax caps how many pre-encoded packets the writer
	// goroutine coalesces into one writev. 0 disables batching.
	// Worth enabling only for sustained concurrent publishers; see
	// [WithWriteBatch].
	WriteBatchMax int

	WillMessage *wire.WillOpts

	// MaxSubscribeQueueSize caps the per-subscription buffer.
	// Zero = unbounded.
	MaxSubscribeQueueSize int

	// DropPolicy sets the default policy for full subscription
	// buffers. [Client.Subscribe] (chan) supports DropNewest only;
	// [Client.SubscribeQueue] honors both.
	DropPolicy DropPolicy

	// PublisherPoolSize > 1 enables a publish-only connection pool
	// that falls back to the main connection if every member is
	// unhealthy.
	PublisherPoolSize int

	// PublisherPoolRouting selects the pool member-picking strategy.
	PublisherPoolRouting PoolRoutingPolicy

	// PublisherPoolClientIDFn derives each pool member's ClientID
	// from the parent ClientID and a 1-based index. Default is
	// fmt.Sprintf("%s-pub-%d", parent, idx).
	PublisherPoolClientIDFn func(parent string, idx int) string

	// PublishMode selects QoS 0 write-completion semantics. QoS 1/2
	// always wait for the broker's ack.
	PublishMode PublishMode

	// OnConnectionUp fires on each successful CONNECT/CONNACK after
	// pending replays and resubscribes. The supplied *wire.Connack
	// is a detached clone safe to retain. Must not block.
	OnConnectionUp func(*wire.Connack)

	// OnConnectionDown fires on unexpected connection loss (not on
	// user-initiated Disconnect). Return false to stop the
	// supervisor; a subsequent Connect restarts it. Must not block.
	OnConnectionDown func() bool

	// OnConnectError fires per failed CONNECT attempt (dial failure,
	// CONNACK refusal, AUTH-loop error). Observability only; the
	// supervisor retries regardless. Must not block.
	OnConnectError func(err error)

	// OnReconnectAttempt fires immediately before each reconnect
	// dial (not the initial Connect). attempt starts at 1. Must not
	// block.
	OnReconnectAttempt func(attempt int, brokerURL string)

	// StatsEnabled toggles the [Client.Stats] counter set. Default
	// false — when disabled the hot path skips every atomic
	// increment and Stats returns the zero value.
	StatsEnabled bool

	// OnServerDisconnect fires when the broker sends a DISCONNECT
	// (§3.14) — the supplied *wire.Disconnect is a detached clone.
	// Fires after the connection is marked down and before
	// OnConnectionDown. May call [Client.SetBrokers] to honour a
	// ServerMoved / UseAnotherServer redirect on the next attempt.
	OnServerDisconnect func(*wire.Disconnect)

	// Authenticator drives MQTT v5 enhanced authentication when set.
	Authenticator Authenticator

	// ConnectPacketBuilder is invoked immediately before each CONNECT
	// is serialised. The callback may mutate any field of opts —
	// typical use is rotating Username/Password per attempt for
	// OAuth refresh. ctx is the per-attempt context bounded by
	// ConnectTimeout. A non-nil error fails the attempt; the
	// supervisor retries after backoff and fires OnConnectError.
	ConnectPacketBuilder func(ctx context.Context, opts *wire.ConnectOpts) error

	// ReconnectBackoff returns the delay before retry attempt N.
	// Defaults to [DefaultReconnectBackoff].
	ReconnectBackoff Backoff

	Logger *slog.Logger
	Store  session.Store

	TLSConfig *tls.Config
	Dialer    *net.Dialer

	// DialFunc replaces the built-in TCP/TLS dial path. Takes
	// precedence over TLSConfig and Dialer. Use for SOCKS / mTLS /
	// WebSocket (see transport/ws) / in-memory test transports.
	// When set, broker URL scheme validation is skipped.
	DialFunc transport.DialFunc
	// contains filtered or unexported fields
}

Config carries the resolved configuration for a Client. Construct via New.

type DropPolicy

type DropPolicy uint8

DropPolicy controls behaviour when a bounded subscription buffer fills.

const (
	// DropNewest discards the inbound message + acks it so the
	// broker doesn't retry.
	DropNewest DropPolicy = iota
	// DropOldest evicts the buffer head to admit the inbound
	// message. SubscribeQueue only — chan-based [Client.Subscribe]
	// rejects this with [ErrChanDropOldestUnsupported].
	DropOldest
)

func (DropPolicy) String

func (p DropPolicy) String() string

String returns a stable debug name for the policy.

type GroupMember

type GroupMember struct {
	// Broker is the URL this member dials. Required.
	Broker string

	// Name is an optional identifier surfaced in logs, returned in
	// the Subscribe token map, and accepted by ClientGroup.Member.
	// Defaults to "member-N" (1-based) when empty.
	Name string

	// Opts are applied AFTER any WithGroupSharedOpts shared options.
	// Use to override or extend the shared opts per-broker (auth,
	// TLS, ClientID, per-member callbacks, etc.).
	Opts []Option
}

GroupMember describes one member of a ClientGroup. Each member becomes a full Client with its own session, supervisor, and CONNECT identity.

Use Opts for per-member settings — WithCredentials, WithTLSConfig, WithAuthenticator, WithClientID, WithConnectPacketBuilder, a per-member WithOnConnectionUp closing over Name for identity, etc. Opts are applied AFTER WithGroupSharedOpts so per-member settings win.

type GroupPublishPolicy

type GroupPublishPolicy uint8

GroupPublishPolicy selects how ClientGroup.Publish dispatches across members. For HA failover across interchangeable brokers use WithBrokers on a single Client instead — ClientGroup keeps N parallel sessions where every member is treated as itself.

const (
	// GroupPublishBroadcast (default) hits every member with the
	// same publish. Use when members carry different data (bridge /
	// mirror semantic). Returns nil if any member's Publish
	// succeeded; otherwise the joined error from every member.
	GroupPublishBroadcast GroupPublishPolicy = iota

	// GroupPublishRoundRobin distributes publishes across the group
	// for throughput against an interchangeable broker fleet. The
	// per-call starting cursor advances by one; unhealthy starting
	// members fall through to the next member regardless of policy.
	// Use when members are equivalent for publish purposes (e.g. a
	// clustered broker fleet) and you want N parallel sessions.
	GroupPublishRoundRobin

	// GroupPublishHashByTopic picks the starting member by FNV-1a
	// of opts.Topic, preserving per-topic ordering across the group
	// while every member is healthy. Recovery (hashed member down)
	// briefly reorders that topic as the probe walks the ring.
	// Empty Topic collapses to member 0.
	GroupPublishHashByTopic
)

func (GroupPublishPolicy) String

func (p GroupPublishPolicy) String() string

String returns a stable debug name for the policy.

type HandlerFunc

type HandlerFunc func(*Message)

HandlerFunc runs synchronously on the read goroutine for each matching inbound PUBLISH. Handlers MUST be non-blocking — a slow handler stalls the read loop, blocks PINGRESP processing, and drops the connection. Use Subscribe / SubscribeQueue for async.

type MemoryPublisherQueue

type MemoryPublisherQueue struct {
	// contains filtered or unexported fields
}

MemoryPublisherQueue is an in-memory PublisherQueue. It is the default when no durable store is required. Crash semantics: all queued entries are lost. For at-least-once across process restarts, use the queue/file/ submodule instead.

func NewMemoryPublisherQueue

func NewMemoryPublisherQueue() *MemoryPublisherQueue

NewMemoryPublisherQueue constructs an empty in-memory queue.

func (*MemoryPublisherQueue) Ack

Ack implements PublisherQueue. Removes the entry identified by tok. Acking an unknown token is a no-op.

func (*MemoryPublisherQueue) Close

func (q *MemoryPublisherQueue) Close() error

Close implements PublisherQueue.

func (*MemoryPublisherQueue) Enqueue

func (q *MemoryPublisherQueue) Enqueue(_ context.Context, entry QueueEntry) error

Enqueue implements PublisherQueue.

func (*MemoryPublisherQueue) EvictHead

EvictHead implements PublisherQueue by popping the oldest in-memory item.

func (*MemoryPublisherQueue) Len

Len implements PublisherQueue.

func (*MemoryPublisherQueue) PeekBatch

func (q *MemoryPublisherQueue) PeekBatch(_ context.Context, n int) ([]QueueEntry, []QueueToken, error)

PeekBatch implements PublisherQueue.

type Message

type Message struct {
	*wire.Publish
	// contains filtered or unexported fields
}

Message is the handler-facing view of an inbound PUBLISH. The embedded *wire.Publish exposes Topic, Payload, Properties, QoS, Retain, Dup, PacketID — those fields alias the pooled frame and are valid only until Ack returns. Use CloneTopic / ClonePayload to retain past Ack.

SubscribeCallback auto-acks after the handler returns. Subscribe (chan) and SubscribeQueue require manual Ack — for QoS 1 the PUBACK is held until Ack (flushed in §4.6 arrival order); for QoS 2 the PUBREC is held until Ack, PUBCOMP fires automatically on PUBREL.

When SubAutoAck is set on Subscribe / SubscribeQueue the library delivers a detached *Message instead: Topic / Payload / Properties are heap copies safe to retain indefinitely, and Ack is a no-op (the original frame is already released).

A PUBLISH matching multiple overlapping filters delivers the same *Message to every matched handler. Ack is refcounted — only the final call releases the frame.

func (*Message) Ack

func (m *Message) Ack() error

Ack acknowledges the message. Idempotent — calls beyond the matched-handler count are silent no-ops. After the final Ack the embedded Publish fields (Topic, Payload, Properties) are invalid; use Message.CloneTopic / Message.ClonePayload for retention.

On a detached message (delivered when SubAutoAck is set) Ack is always a no-op — the broker ack and frame release already happened inside the dispatcher.

func (*Message) ClonePayload

func (m *Message) ClonePayload() []byte

ClonePayload returns a heap-allocated copy of Payload safe to retain past Message.Ack.

func (*Message) CloneTopic

func (m *Message) CloneTopic() string

CloneTopic returns a heap-allocated copy of Topic safe to retain past Message.Ack. One allocation; prefer reading m.Topic inside the handler when retention isn't needed.

type Option

type Option func(*Config) error

Option mutates a Config during New; a non-nil return aborts construction.

func WithAuthenticator

func WithAuthenticator(a Authenticator) Option

WithAuthenticator enables MQTT v5 enhanced authentication. When combined with WithPublisherPool, the same Authenticator runs concurrently across the parent and every pool member; implementations used that way must be safe for concurrent use.

func WithBroker

func WithBroker(url string) Option

WithBroker sets a single broker URL. Supported schemes: mqtt://, tcp://, mqtts://, tls://, ssl://; default ports 1883 / 8883 fill in when missing. For ws:// or wss:// pair with WithDialFunc.

func WithBrokers

func WithBrokers(urls ...string) Option

WithBrokers sets the ordered broker URL list. The supervisor rotates through it across reconnect attempts; Client.SetBrokers replaces it at runtime.

func WithCleanStart

func WithCleanStart(b bool) Option

WithCleanStart sets the CleanStart flag on the initial CONNECT. Default true. Reconnects use WithCleanStartOnReconnect.

func WithCleanStartOnReconnect

func WithCleanStartOnReconnect(b bool) Option

WithCleanStartOnReconnect sets CleanStart on every CONNECT after the first. Default false — preserve broker session for QoS 1/2 resume.

func WithClientID

func WithClientID(id string) Option

WithClientID sets the MQTT ClientID. Empty string asks the broker to assign one via the AssignedClientIdentifier CONNACK property.

func WithConnectPacketBuilder

func WithConnectPacketBuilder(fn func(ctx context.Context, opts *wire.ConnectOpts) error) Option

WithConnectPacketBuilder mutates *wire.ConnectOpts immediately before each CONNECT is serialised — canonical OAuth token-refresh hook. ctx is the per-attempt context bounded by ConnectTimeout. A non-nil error fails the attempt; the supervisor retries after backoff and fires OnConnectError.

func WithConnectTimeout

func WithConnectTimeout(d time.Duration) Option

WithConnectTimeout caps the CONNECT handshake (dial + CONNECT/CONNACK).

func WithConnectUserProperties

func WithConnectUserProperties(p []wire.UserProperty) Option

WithConnectUserProperties replaces the CONNECT user-property list. The slice is referenced as-is; do not mutate after passing.

func WithConnectUserProperty

func WithConnectUserProperty(key, value string) Option

WithConnectUserProperty appends one CONNECT user property. May be called multiple times; for bulk replacement see WithConnectUserProperties.

func WithCredentials

func WithCredentials(user string, pass []byte) Option

WithCredentials sets username and password sent in the CONNECT packet.

func WithDialFunc

func WithDialFunc(fn transport.DialFunc) Option

WithDialFunc replaces the built-in TCP/TLS dial path. Takes precedence over WithDialer / WithTLSConfig. Use for SOCKS / HTTP proxies, per-dial mTLS, WebSocket (transport/ws), or in-memory test transports. ctx is bounded by ConnectTimeout. When set, broker URL scheme validation is skipped. Nil rejected.

The transport/ws sibling submodule exposes a [ws.DialFunc] helper that bakes in DialOpts (TLS config, custom headers, etc.) and returns a value directly usable here:

mqttv5.WithDialFunc(ws.DialFunc(ws.DialOpts{TLSConfig: tlsCfg}))

func WithDialer

func WithDialer(d *net.Dialer) Option

WithDialer overrides the default *net.Dialer.

func WithDisconnectFlushTimeout

func WithDisconnectFlushTimeout(d time.Duration) Option

WithDisconnectFlushTimeout caps the wait for a graceful DISCONNECT write before the socket is forcibly closed. Default DefaultDisconnectFlushTimeout.

func WithDropPolicy

func WithDropPolicy(p DropPolicy) Option

WithDropPolicy sets the default per-subscription drop policy. Per-call override via SubDropPolicy; see DropPolicy.

func WithInboundTopicAliasMaximum

func WithInboundTopicAliasMaximum(n uint16) Option

WithInboundTopicAliasMaximum advertises how many inbound topic aliases this client accepts (§3.1.2.11.5). Zero (default) disables inbound aliasing.

func WithKeepAlive

func WithKeepAlive(s uint16) Option

WithKeepAlive sets the keepalive interval (seconds, range 1..65535). Pass 0 to WithoutKeepAlive instead.

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger sets the *slog.Logger. Default slog.Default.

func WithMaxSubscribeQueueSize

func WithMaxSubscribeQueueSize(n int) Option

WithMaxSubscribeQueueSize caps each subscription's buffer. 0 = unbounded. Per-call override via SubMaxQueueSize.

func WithMaximumPacketSize

func WithMaximumPacketSize(n uint32) Option

WithMaximumPacketSize caps the largest packet the broker may send to this client (§3.1.2.11.4). Zero advertises no limit.

func WithOnConnectError

func WithOnConnectError(fn func(error)) Option

WithOnConnectError fires per failed CONNECT attempt (dial err, CONNACK refusal, AUTH-loop err). Observability only — the supervisor retries regardless. Must not block.

func WithOnConnectionDown

func WithOnConnectionDown(fn func() bool) Option

WithOnConnectionDown registers a callback fired when the connection is lost. Does NOT fire on user-initiated Disconnect. Return false to terminate the supervisor — no further reconnect attempts. A subsequent Connect on the same Client re-starts the lifecycle. Must not block.

func WithOnConnectionUp

func WithOnConnectionUp(fn func(*wire.Connack)) Option

WithOnConnectionUp fires on each successful CONNECT/CONNACK after pending replays and resubscribes. fn receives a detached *wire.Connack clone safe to retain (assigned ClientID, server keepalive, MaximumQoS, ResponseInformation, ...). Must not block.

func WithOnReconnectAttempt

func WithOnReconnectAttempt(fn func(attempt int, brokerURL string)) Option

WithOnReconnectAttempt fires immediately before each reconnect dial (not the initial Client.Connect); attempt starts at 1. Must not block.

func WithOnServerDisconnect

func WithOnServerDisconnect(fn func(*wire.Disconnect)) Option

WithOnServerDisconnect fires when the broker sends a DISCONNECT (§3.14). fn receives a detached *wire.Disconnect clone safe to retain. Fires after the connection is marked down and before OnConnectionDown; may call Client.SetBrokers to honour a ServerMoved / UseAnotherServer redirect on the next attempt. Not fired for socket-level errors or client-initiated Disconnect. Must not block.

func WithPingTimeout

func WithPingTimeout(d time.Duration) Option

WithPingTimeout sets the budget for a PINGRESP after a PINGREQ. Exceeding it forces a reconnect. Default DefaultPingTimeout (10 s). Override only on links where PINGRESP round-trip exceeds a few seconds (LEO satellite, severely congested cellular).

func WithPublishMode

func WithPublishMode(m PublishMode) Option

WithPublishMode selects QoS 0 Client.Publish semantics. See PublishMode. QoS 1/2 unaffected.

func WithPublisherPool

func WithPublisherPool(size int) Option

WithPublisherPool enables N publish-only connections to the configured broker. 0/1 disables. Each member is a full Client with its own session; QoS 1/2 acks land on the emitting member. Members force CleanStart=true + SessionExpiry=0 — a mid-flight drop becomes at-least-once with possible duplicates. Use the main connection for ledger-style work.

Example

ExampleWithPublisherPool dedicates N publish-only connections to the same broker, lifting publish throughput above what one keep-alive socket can absorb. Pair with mqttv5.PoolRoutingHashByTopic to preserve per-topic ordering.

package main

import (
	"context"
	"fmt"

	"github.com/ashtonian/mqttv5"
	"github.com/ashtonian/mqttv5/wire"
)

func main() {
	cli, err := mqttv5.New(
		mqttv5.WithBroker("mqtt://localhost:1883"),
		mqttv5.WithPublisherPool(4),
		mqttv5.WithPublisherPoolRouting(mqttv5.PoolRoutingHashByTopic),
	)
	if err != nil {
		panic(err)
	}

	ctx := context.Background()
	_ = cli.Connect(ctx)
	defer cli.Disconnect(ctx)

	// Round-robin / hashed across the 4 pool members under the hood.
	for i := range 100 {
		_ = cli.Publish(ctx, wire.PublishOpts{
			Topic:   fmt.Sprintf("events/tick/%d", i%4),
			Payload: fmt.Appendf(nil, `{"n":%d}`, i),
			QoS:     1,
		})
	}
}

func WithPublisherPoolClientIDFn

func WithPublisherPoolClientIDFn(fn func(parent string, idx int) string) Option

WithPublisherPoolClientIDFn derives each pool member's ClientID from the parent ClientID and a 1-based index. Override when the parent ClientID is empty — the default ("-pub-N") collides across processes.

func WithPublisherPoolRouting

func WithPublisherPoolRouting(p PoolRoutingPolicy) Option

WithPublisherPoolRouting selects the publisher-pool member-picking strategy. See PoolRoutingPolicy. No-op when PublisherPoolSize ≤ 1.

func WithReceiveMaximum

func WithReceiveMaximum(n uint16) Option

WithReceiveMaximum bounds concurrent inbound QoS 1/2 publishes (§3.1.2.11.3). Also sizes the packet-ID pool.

func WithReconnectBackoff

func WithReconnectBackoff(b Backoff) Option

WithReconnectBackoff sets the supervisor's reconnect backoff. See ConstantBackoff and ExponentialBackoff; default DefaultReconnectBackoff.

func WithRequestProblemInformation

func WithRequestProblemInformation(b bool) Option

WithRequestProblemInformation asks the broker for ReasonString / UserProperties on error responses (§3.1.2.11.7). Default true — debugging a CONNECT or PUBLISH refusal without these is "why was this rejected?" with no answer; the wire cost is a handful of bytes on the error path only. Pass false to opt out.

func WithRequestResponseInformation

func WithRequestResponseInformation(b bool) Option

WithRequestResponseInformation asks the broker for ResponseInformation in CONNACK (§3.1.2.11.6) — used by the request/response pattern (§4.10).

func WithSessionExpiry

func WithSessionExpiry(s uint32) Option

WithSessionExpiry sets the Session Expiry Interval (§3.1.2.11.2) in seconds. Default DefaultSessionExpiry (300 s / 5 min) — enough for QoS 1/2 resume across a typical reconnect blip. Zero ends the session with the network connection (no broker-side retention).

func WithStats

func WithStats() Option

WithStats enables the Client.Stats counter set. When disabled the hot path skips every atomic increment and Stats returns the zero value. Off by default; enable to scrape counters into Prometheus / OpenTelemetry / expvar.

func WithStore

func WithStore(s session.Store) Option

WithStore overrides the session.Store. Default is in-memory; use the store/file submodule for crash safety.

func WithTLSConfig

func WithTLSConfig(t *tls.Config) Option

WithTLSConfig supplies a *tls.Config for mqtts:// / tls:// dials. Ignored for plain TCP schemes.

func WithWill

func WithWill(w *wire.WillOpts) Option

WithWill attaches a will message to be published by the broker if this client disconnects ungracefully.

func WithWriteBatch

func WithWriteBatch(n int) Option

WithWriteBatch coalesces up to n pre-encoded packets per writev syscall. Off by default (n=0 or 1). Wins under sustained concurrent Client.Publish (~40% at 256B with n=16 on loopback); regresses for single-publisher or large-payload workloads. Measure first.

func WithWriteOverflowPolicy

func WithWriteOverflowPolicy(p WriteOverflowPolicy) Option

WithWriteOverflowPolicy selects the QoS 0 writer-queue overflow policy. Only meaningful for PublishFireAndForget — QoS 1/2 and PublishWaitForFlush always wait for either the broker or the writer to be ready.

func WithWriteQueueSize

func WithWriteQueueSize(n int) Option

WithWriteQueueSize sets the internal MPSC write-channel buffer. Larger absorbs publish bursts; costs memory and queue latency.

func WithoutKeepAlive

func WithoutKeepAlive() Option

WithoutKeepAlive disables MQTT keepalive entirely (no PINGREQ goroutine). Rarely correct in production — without PINGREQ the broker cannot detect a half-open connection from this side.

type PoolRoutingPolicy

type PoolRoutingPolicy uint8

PoolRoutingPolicy selects how the publisher pool picks the starting member per Client.Publish. Unhealthy starting members fall through to the next member regardless of policy.

const (
	// PoolRoutingRoundRobin (default) cycles through members; same
	// topic may arrive at the broker out of order.
	PoolRoutingRoundRobin PoolRoutingPolicy = iota

	// PoolRoutingHashByTopic picks the starting member by FNV-1a of
	// opts.Topic, preserving per-topic ordering while the pool is
	// healthy. Empty Topic collapses to member 0.
	PoolRoutingHashByTopic
)

func (PoolRoutingPolicy) String

func (p PoolRoutingPolicy) String() string

String returns a stable debug name for the policy.

type PublishMode

type PublishMode uint8

PublishMode controls QoS 0 Client.Publish write-completion semantics. QoS 1/2 always wait for the broker's ack.

const (
	// PublishFireAndForget (default): Publish returns once the
	// message is queued for the writer goroutine. Transport errors
	// are not surfaced.
	PublishFireAndForget PublishMode = iota

	// PublishWaitForFlush makes Publish block until conn.Write
	// returns and surfaces transport errors. Useful as a
	// connection-health probe.
	PublishWaitForFlush
)

func (PublishMode) String

func (m PublishMode) String() string

String returns a stable name for the mode.

type PublisherQueue

type PublisherQueue interface {
	// Enqueue durably stores entry. Returns ErrQueueClosed after Close.
	Enqueue(ctx context.Context, entry QueueEntry) error

	// PeekBatch returns up to n oldest entries without removing them.
	// The tokens slice is parallel to entries; the caller passes each
	// token to Ack to mark the entry delivered. An empty result with
	// nil error means "queue is empty right now".
	PeekBatch(ctx context.Context, n int) (entries []QueueEntry, tokens []QueueToken, err error)

	// Ack removes the entry identified by token. Acking an unknown
	// token is a no-op (an entry already removed via another path).
	Ack(ctx context.Context, token QueueToken) error

	// EvictHead removes the oldest entry and returns it. Returns
	// (zero, false, nil) when the queue is empty. Implementations
	// that cannot evict the head out-of-band return
	// ErrEvictionNotSupported and the caller must use DropNewest.
	EvictHead(ctx context.Context) (QueueEntry, bool, error)

	// Len returns the current entry count.
	Len(ctx context.Context) (int, error)

	// Close releases any underlying resources. After Close, subsequent
	// Enqueue calls must return ErrQueueClosed.
	Close() error
}

PublisherQueue is the durable backing store for QueuePublisher. Implementations may be in-memory or disk-backed. Every method must be safe for concurrent use.

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

Queue is an unbounded MPSC FIFO of T. Enqueue is non-blocking; Dequeue blocks until an item arrives, the queue is closed, or ctx cancels. After Close, queued items are still returned by Dequeue; once drained, Dequeue returns (zero, false).

func NewQueue

func NewQueue[T any]() *Queue[T]

NewQueue returns a ready-to-use Queue.

Example

ExampleNewQueue shows the lock-protected unbounded MPSC queue returned by mqttv5.Client.SubscribeQueue. It is exported so callers can build their own internal pipelines on top of the same type.

package main

import (
	"context"
	"fmt"

	"github.com/ashtonian/mqttv5"
)

func main() {
	q := mqttv5.NewQueue[string]()

	go func() {
		defer q.Close()
		q.Enqueue("one")
		q.Enqueue("two")
	}()

	ctx := context.Background()
	for {
		v, ok := q.Dequeue(ctx)
		if !ok {
			break
		}
		fmt.Println(v)
	}
}
Output:
one
two

func (*Queue[T]) Close

func (q *Queue[T]) Close()

Close signals waiting Dequeue calls to drain. Items already queued remain available.

func (*Queue[T]) Dequeue

func (q *Queue[T]) Dequeue(ctx context.Context) (T, bool)

Dequeue blocks until an item is available, the queue is closed, or ctx cancels. ok is false when no item could be retrieved.

func (*Queue[T]) Enqueue

func (q *Queue[T]) Enqueue(item T) bool

Enqueue appends an item. Returns false if the queue is closed.

func (*Queue[T]) Len

func (q *Queue[T]) Len() int

Len returns the current item count. Mostly for tests and metrics.

func (*Queue[T]) TryDequeue

func (q *Queue[T]) TryDequeue() (T, bool)

TryDequeue removes and returns the head without blocking.

type QueueEntry

type QueueEntry struct {
	Publish    wire.PublishOpts
	EnqueuedAt time.Time
}

QueueEntry is one durable publish in flight between Publish (enqueue) and broker PUBACK (Ack). The drain loop sets PacketID and Dup per attempt; values in Publish are ignored.

type QueueOption

type QueueOption func(*queueConfig)

QueueOption customises a QueuePublisher.

func WithDeadLetter

func WithDeadLetter(fn func(QueueEntry, error)) QueueOption

WithDeadLetter registers a callback for terminally failed entries (e.g. TTL expiry). Runs on the drain goroutine — must not block.

func WithQueueBatchSize

func WithQueueBatchSize(n int) QueueOption

WithQueueBatchSize caps the number of entries the drain goroutine pulls in one PeekBatch. Default 16.

func WithQueueDropPolicy

func WithQueueDropPolicy(p DropPolicy) QueueOption

WithQueueDropPolicy controls full-queue behaviour. DropNewest (default) returns ErrQueueFull from Publish when the queue is at capacity. DropOldest evicts the oldest entry via PublisherQueue.EvictHead and dead-letters it (if a callback is registered) before enqueuing the new one. Backends that cannot evict (return ErrEvictionNotSupported) cause NewQueuePublisher to fail rather than silently downgrade.

func WithQueueIdleInterval

func WithQueueIdleInterval(d time.Duration) QueueOption

WithQueueIdleInterval overrides the drain loop's idle-tick wakeup. Default DefaultQueueIdleInterval.

func WithQueueMaxSize

func WithQueueMaxSize(n int) QueueOption

WithQueueMaxSize bounds the queue. At capacity, Publish either returns ErrQueueFull (DropNewest) or evicts the oldest entry via EvictHead (DropOldest — see WithQueueDropPolicy).

func WithQueuePublishTimeout

func WithQueuePublishTimeout(d time.Duration) QueueOption

WithQueuePublishTimeout caps each per-message broker handshake inside the drain loop. Default DefaultQueuePublishTimeout. Set short to keep retries snappy at the cost of more wasted work against a slow broker; set long for occasional huge payloads.

func WithQueueTTL

func WithQueueTTL(d time.Duration) QueueOption

WithQueueTTL drops entries older than d at drain time and fires the dead-letter callback. When d > 0 the value also mirrors into each entry's MessageExpiryInterval so the broker enforces TTL after hand-off.

type QueuePublisher

type QueuePublisher struct {
	// contains filtered or unexported fields
}

QueuePublisher durably enqueues publishes and drains them to the broker in order from a background goroutine. QueuePublisher.Publish returns once the entry is stored, before the broker round-trip. For crash safety, pair the queue/file submodule with the store/file submodule (in-flight ack state) — independent layers.

func NewQueuePublisher

func NewQueuePublisher(client *Client, queue PublisherQueue, opts ...QueueOption) (*QueuePublisher, error)

NewQueuePublisher wires a QueuePublisher over client + queue. The drain goroutine starts immediately and runs until QueuePublisher.Close. When DropOldest is requested, queue.EvictHead is probed; ErrEvictionNotSupported aborts construction rather than silently downgrading.

Example

ExampleNewQueuePublisher durably enqueues publishes and drains them to the broker in order from a background goroutine. Publish returns once the queue has stored the entry, before the broker round-trip.

For at-least-once across process restart, swap mqttv5.NewMemoryPublisherQueue for the queue/file submodule.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ashtonian/mqttv5"
	"github.com/ashtonian/mqttv5/wire"
)

func main() {
	cli, _ := mqttv5.New(mqttv5.WithBroker("mqtt://localhost:1883"))
	ctx := context.Background()
	_ = cli.Connect(ctx)
	defer cli.Disconnect(ctx)

	pub, err := mqttv5.NewQueuePublisher(cli, mqttv5.NewMemoryPublisherQueue(),
		mqttv5.WithQueueBatchSize(32),
		mqttv5.WithQueueTTL(24*time.Hour),
		mqttv5.WithDeadLetter(func(e mqttv5.QueueEntry, err error) {
			fmt.Println("dead letter", e.Publish.Topic, err)
		}),
	)
	if err != nil {
		panic(err)
	}
	defer pub.Close(context.Background())

	_ = pub.Publish(ctx, wire.PublishOpts{
		Topic:   "events/durable",
		Payload: []byte("at-least-once"),
		QoS:     1,
	})
}

func (*QueuePublisher) Close

func (p *QueuePublisher) Close(ctx context.Context) error

Close stops the drain goroutine and closes the backing queue. ctx bounds how long Close waits for the drain to settle.

func (*QueuePublisher) Publish

func (p *QueuePublisher) Publish(ctx context.Context, opts wire.PublishOpts) error

Publish durably enqueues opts and returns once the queue has stored it. The broker handshake happens asynchronously on the drain goroutine. Rejects QoS 0 with ErrQoS0NotQueueable.

type QueueToken

type QueueToken any

QueueToken identifies one entry. Opaque to QueuePublisher — implementations choose whatever value lets Ack remove the exact entry PeekBatch returned (sequence number, byte offset, etc.).

type Stats

type Stats struct {
	Connects          uint64 // successful CONNECT/CONNACK handshakes
	ConnectFailures   uint64 // CONNECT attempts that failed (any cause)
	Disconnects       uint64 // connection drops (any cause)
	ServerDisconnects uint64 // broker-sent DISCONNECT packets
	PingTimeouts      uint64 // PINGRESP timeouts that forced a reconnect

	PublishesSent     uint64 // QoS 0/1/2 publishes handed to the writer
	PublishesAcked    uint64 // QoS 1/2 acks received from broker
	PublishesInflight uint64 // QoS 1/2 outbound entries awaiting ack
	PublishesReplayed uint64 // QoS 1/2 replays issued after reconnect

	InboundPublishes uint64 // PUBLISHes received from the broker
	InboundDropped   uint64 // inbound PUBLISHes dropped due to full subscriber buffer

	SubscribesSent      uint64 // SUBSCRIBE packets emitted
	UnsubscribesSent    uint64 // UNSUBSCRIBE packets emitted
	SubscriptionsActive uint64 // count of currently registered subscriptions

	PoolFallbacks uint64 // publishes that fell back from pool to main connection
}

Stats is the snapshot of Client counters returned by Client.Stats.

Two flavours of field:

  • Monotonic counters (Connects, ConnectFailures, Disconnects, ServerDisconnects, PingTimeouts, PublishesSent, PublishesAcked, PublishesReplayed, InboundPublishes, InboundDropped, SubscribesSent, UnsubscribesSent, PoolFallbacks). Sampled atomically — no locking on the read side.
  • Point-in-time gauges (PublishesInflight, SubscriptionsActive) read live state under a mutex (Outbound tracker and the subscriptions map respectively). The locks are cheap but a scraper hitting Stats() at very high frequency does contend with Subscribe / Unsubscribe and the ack path. Default to sampling at most a few times per second.

The zero value is returned when WithStats was not set on the Config — when stats are disabled the hot path skips every atomic increment, so leave WithStats off when you have no scraper.

type SubscribeOption

type SubscribeOption func(*subscribeConfig)

SubscribeOption configures client-side delivery for one Subscribe call. Protocol flags (QoS, NoLocal, etc.) live on TopicFilter itself — these options control what happens after delivery.

func SubAutoAck

func SubAutoAck() SubscribeOption

SubAutoAck makes Client.Subscribe / Client.SubscribeQueue ack each delivery on the dispatcher goroutine, before handing the message to the consumer. The consumer receives a detached *Message whose Topic / Payload / Properties are heap copies safe to retain indefinitely; Message.Ack becomes a no-op.

Trade-off: convenience for two allocations per delivery (Topic clone + Payload clone) on top of the *Message + *wire.Publish header allocations — the default manual-ack path is zero-alloc on the receive hot path. More importantly, auto-ack breaks at-least-once semantics: a consumer that crashes between delivery and processing has nothing to replay, since the broker considers the message delivered. Reach for this on QoS 0 / observational consumers; keep manual ack for ledger-style workloads.

Ignored by Client.SubscribeCallback (already auto-acks).

func SubBuffer

func SubBuffer(n int) SubscribeOption

SubBuffer sets Client.Subscribe's channel buffer size. Default DefaultSubscribeBuffer. Ignored by Client.SubscribeQueue and Client.SubscribeCallback.

func SubDropPolicy

func SubDropPolicy(p DropPolicy) SubscribeOption

SubDropPolicy chooses behaviour when the channel buffer or queue cap fills.

  • Subscribe (chan): only DropNewest is supported. Passing DropOldest here causes Subscribe to return ErrChanDropOldestUnsupported — peek-and-pop on a consumer-owned channel would race the receiver. Use SubscribeQueue for DropOldest semantics.
  • SubscribeQueue: both policies are honored.
  • SubscribeCallback: ignored (callback never drops; slow handler stalls the connection instead).

Default is the client-level WithDropPolicy (DropNewest).

func SubMaxQueueSize

func SubMaxQueueSize(n int) SubscribeOption

SubMaxQueueSize caps Client.SubscribeQueue's length. 0 = unbounded. Default from WithMaxSubscribeQueueSize (0).

func SubOnDrop

func SubOnDrop(fn func(*Message)) SubscribeOption

SubOnDrop fires when a message is dropped by Client.Subscribe (chan full) or Client.SubscribeQueue (cap reached). The dropped message is already ack'd; fn is for metrics / logging only — must NOT call Message.Ack, must NOT retain Topic / Payload past return (both alias the frame buffer).

type SubscriptionToken

type SubscriptionToken struct {
	// contains filtered or unexported fields
}

SubscriptionToken identifies an active subscription so Unsubscribe can find and remove it.

type TopicFilter

type TopicFilter = wire.SubscribeFilter

TopicFilter is one entry in a SUBSCRIBE packet: a topic pattern plus the per-filter MQTT v5 §3.8.3.1 flags. Only Topic is required; the rest default to MQTT-spec zero values.

{Topic: "events/#"}                        // QoS 0
{Topic: "events/#", QoS: 1}                // at-least-once
{Topic: "events/#", QoS: 1, NoLocal: true} // don't echo own publishes back

type Typed

type Typed[T any] struct {
	// contains filtered or unexported fields
}

Typed pairs a *Client with a Codec[T] for typed publish/subscribe.

func NewTyped

func NewTyped[T any](c *Client, codec Codec[T]) *Typed[T]

NewTyped wraps c with codec.

Example

ExampleNewTyped wraps a mqttv5.Client with a mqttv5.Codec so Publish and Subscribe carry decoded values. Decode failures on the receive path are logged + acked + dropped — wrap the codec for stricter behaviour.

package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/ashtonian/mqttv5"
	"github.com/ashtonian/mqttv5/wire"
)

// readingCodec is a tiny inline [mqttv5.Codec] used by
// [ExampleNewTyped]. In real code prefer the codec/json submodule or
// any other implementation of mqttv5.Codec[T].
type readingCodec struct{}

type reading struct {
	Device string  `json:"device"`
	Temp   float64 `json:"temp"`
}

func (readingCodec) Encode(v reading) ([]byte, error) { return json.Marshal(v) }
func (readingCodec) Decode(b []byte) (reading, error) {
	var v reading
	err := json.Unmarshal(b, &v)
	return v, err
}

func main() {
	cli, _ := mqttv5.New(mqttv5.WithBroker("mqtt://localhost:1883"))
	ctx := context.Background()
	_ = cli.Connect(ctx)
	defer cli.Disconnect(ctx)

	typed := mqttv5.NewTyped[reading](cli, readingCodec{})

	_ = typed.Publish(ctx,
		wire.PublishOpts{Topic: "sensors/a1", QoS: 1},
		reading{Device: "a1", Temp: 22.5},
	)

	ch, _, err := typed.Subscribe(ctx,
		[]mqttv5.TopicFilter{{Topic: "sensors/+", QoS: 1}})
	if err != nil {
		return
	}
	for m := range ch {
		fmt.Println(m.Topic, m.Value.Temp)
		_ = m.Ack()
	}
}

func (*Typed[T]) Publish

func (t *Typed[T]) Publish(ctx context.Context, opts wire.PublishOpts, v T) error

Publish encodes v and sends it. opts.Payload, if set, is overwritten.

func (*Typed[T]) Subscribe

func (t *Typed[T]) Subscribe(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (<-chan *TypedMessage[T], SubscriptionToken, error)

Subscribe wraps Client.Subscribe with a decode stage. Decode failures are logged + acked + dropped — for stricter behaviour wrap the codec or use SubscribeCallback. Caller MUST call Ack on each TypedMessage.

func (*Typed[T]) SubscribeQueue

func (t *Typed[T]) SubscribeQueue(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (*Queue[*TypedMessage[T]], SubscriptionToken, error)

SubscribeQueue is the queue-backed variant of Subscribe.

type TypedMessage

type TypedMessage[T any] struct {
	*Message
	Topic string
	Value T
}

TypedMessage wraps Message with an already-decoded Value and a detached Topic. Both Value and Topic are safe to retain past Ack — Value via the codec's allocation, Topic via an explicit strings.Clone at construction. The embedded *Message still aliases the frame for Payload / Properties access; use Message.ClonePayload or read those fields before calling Ack if you need them.

type WriteOverflowPolicy

type WriteOverflowPolicy uint8

WriteOverflowPolicy controls what QoS 0 Client.Publish does when the internal writer queue (sized by WithWriteQueueSize) is full. QoS 1/2 are unaffected — they always block on the broker ack.

const (
	// WriteBlock (default) blocks Publish until the writer queue has
	// room, the call's ctx fires, or the connection goes down. The
	// caller surfaces overload as latency.
	WriteBlock WriteOverflowPolicy = iota

	// WriteDropNewest returns [ErrWriteQueueFull] immediately when
	// the queue has no room. Use for high-rate fire-and-forget
	// telemetry where the producer goroutine must not block.
	WriteDropNewest
)

func (WriteOverflowPolicy) String

func (p WriteOverflowPolicy) String() string

String returns a stable name for the policy.

Directories

Path Synopsis
codec
json module
msgpack module
internal
trie
Package trie implements an MQTT v5 topic filter matcher used by the client to dispatch inbound PUBLISH packets to subscribed handlers.
Package trie implements an MQTT v5 topic filter matcher used by the client to dispatch inbound PUBLISH packets to subscribed handlers.
queue
file module
Package session manages MQTT v5 session state: packet ID allocation, in-flight QoS 1/2 tracking (both outbound and inbound), and a Store interface so the state can survive a reconnect.
Package session manages MQTT v5 session state: packet ID allocation, in-flight QoS 1/2 tracking (both outbound and inbound), and a Store interface so the state can survive a reconnect.
store
file module
Package transport abstracts the network layer underneath the MQTT v5 client.
Package transport abstracts the network layer underneath the MQTT v5 client.
ws module
Package wire implements the MQTT v5 wire-format codec.
Package wire implements the MQTT v5 wire-format codec.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL