Documentation
¶
Overview ¶
Package mqttv5 is a fast, ergonomic MQTT v5 client for Go. The supervisor (reconnect, replay-in-flight, auto-resubscribe) is baked into every Client; there is no separate "auto-reconnect" wrapper.
Why this package ¶
One package, one Client. No paho / autopaho split — reconnect, replay, and resubscribe are always on. Channel-native subscribe: <-chan *Message (Client.Subscribe), Queue of *Message (Client.SubscribeQueue), or a sync callback (Client.SubscribeCallback). Backpressure is a first-class concept — per-subscription DropNewest / DropOldest auto-ack the dropped message so the broker stops retransmitting.
Three multi-broker patterns are kept distinct:
- Failover within one logical client (interchangeable brokers, one session identity): WithBrokers.
- N parallel sessions to N independent brokers, each treated as itself: ClientGroup. Configurable per-broker auth / TLS / ClientID via [GroupMember.Opts]; pluggable publish policy (GroupPublishBroadcast, GroupPublishRoundRobin, GroupPublishHashByTopic).
- Publisher pool against one broker: WithPublisherPool.
Typed publish / subscribe goes through the generic Codec interface; JSON and msgpack ship in separate submodules so the core stays stdlib-only. The durable outbound QueuePublisher lets you enqueue publishes while disconnected, drain on reconnect, and survive process restart (when combined with the queue/file submodule).
Operator surface:
- Client.Stats returns a snapshot of in-memory counters (connects, publishes sent/acked, inbound dropped, pool fallbacks, ping timeouts, ...). Opt in via WithStats — when off the hot path compiles to a single predicted nil-check.
- Client.DisconnectWith sends a custom DISCONNECT (reason code, ReasonString, SessionExpiry override).
- WithConnectPacketBuilder mutates the CONNECT immediately before each attempt — the canonical OAuth-token-rotation hook.
- WithOnConnectError / WithOnReconnectAttempt feed metrics and alerting per attempt; WithOnConnectionDown returns false to terminate the supervisor.
Full MQTT v5 conformance: shared subscriptions, topic aliases (in + out), session expiry, retained messages, will + will properties, enhanced authentication (CONNECT and mid-session §4.12), and CONNACK capability flags honoured before any matching SUBSCRIBE traffic goes on the wire.
Quick start ¶
cli, err := mqttv5.New(
mqttv5.WithBroker("mqtt://localhost:1883"),
mqttv5.WithClientID("ingest-svc-1"),
)
if err != nil { panic(err) }
ctx := context.Background()
if err := cli.Connect(ctx); err != nil { panic(err) }
defer cli.Disconnect(ctx)
// Channel subscribe — manual ack, §4.6-ordered PUBACK flush.
msgs, _, err := cli.Subscribe(ctx,
[]mqttv5.TopicFilter{{Topic: "events/#", QoS: 1}})
if err != nil { panic(err) }
go func() {
for m := range msgs {
fmt.Printf("%s: %s\n", m.Topic, m.Payload)
_ = m.Ack()
}
}()
// QoS 1 publish — supervisor replays with DUP=1 across drops.
_ = cli.Publish(ctx, wire.PublishOpts{
Topic: "events/example",
Payload: []byte("hello"),
QoS: 1,
})
See the Example functions below for end-to-end snippets covering each subscribe shape, typed payloads, multi-broker fan-out, and the durable queue publisher.
Options naming ¶
Option helpers split by scope: Option (passed to New) configures the Client and is named With…; SubscribeOption (passed inline to Subscribe / SubscribeQueue) configures one subscription and is named Sub…; ClientGroupOption (passed to NewClientGroup) configures the group and is named WithGroup….
Submodules ¶
Each opt-in submodule has its own go.mod so importing it does not add a runtime dependency to the core:
- codec/json — JSON Codec implementation, wired into Typed.
- codec/msgpack — MessagePack Codec via vmihailenco/msgpack/v5.
- queue/file — Durable outbound publish queue (filesystem WAL).
- store/file — Crash-safe session store for in-flight QoS 1/2.
- transport/ws — WebSocket transport via gobwas/ws; wire it in with WithDialFunc.
Architecture ¶
One goroutine per connection drives the read path (read -> decode -> trie match -> handler) with handlers running synchronously on the reader. A second goroutine drains a many-producer-single-consumer write channel — no mutex around net.Conn.Write, so concurrent publishers scale across cores.
Packets and frame buffers come from per-type sync.Pools. Inbound Message aliases the pooled frame for zero-copy Topic and Payload; the frame returns to the pool only when refcounted Message.Ack reaches the last handler. A supervisor goroutine reconnects with configurable backoff, replays in-flight QoS 1/2 publishes with DUP=1, and re-issues every tracked subscription.
Benchmarks ¶
See benchmarks/ for end-to-end and codec micro-benchmarks against eclipse/paho.golang.
Package mqttv5 is a high-performance MQTT v5 client.
The client speaks raw bytes by default. Callers that want typed payloads can wrap with Typed[T] via a Codec[T] from one of the sibling codec submodules. See the package doc in doc.go for an architecture overview.
Example ¶
Example shows the minimal connect → subscribe → publish loop. The supervisor (reconnect, replay, resubscribe) is implicit — once Connect returns, every drop is recovered automatically until Disconnect.
package main
import (
"context"
"fmt"
"github.com/ashtonian/mqttv5"
"github.com/ashtonian/mqttv5/wire"
)
func main() {
cli, err := mqttv5.New(
mqttv5.WithBroker("mqtt://localhost:1883"),
mqttv5.WithClientID("example"),
)
if err != nil {
panic(err)
}
ctx := context.Background()
if err := cli.Connect(ctx); err != nil {
panic(err)
}
defer cli.Disconnect(ctx)
msgs, _, err := cli.Subscribe(ctx, []mqttv5.TopicFilter{{Topic: "demo/#", QoS: 1}})
if err != nil {
panic(err)
}
go func() {
for m := range msgs {
fmt.Printf("recv %s %s\n", m.Topic, m.Payload)
_ = m.Ack()
}
}()
_ = cli.Publish(ctx, wire.PublishOpts{
Topic: "demo/hello",
Payload: []byte("world"),
QoS: 1,
})
}
Output:
Index ¶
- Constants
- Variables
- type Authenticator
- type Backoff
- type Client
- func (c *Client) ClientID() string
- func (c *Client) Connect(ctx context.Context) error
- func (c *Client) Connected() bool
- func (c *Client) Disconnect(ctx context.Context) error
- func (c *Client) DisconnectWith(ctx context.Context, opts wire.DisconnectOpts) error
- func (c *Client) Publish(ctx context.Context, opts wire.PublishOpts) error
- func (c *Client) SetBrokers(urls ...string) error
- func (c *Client) Stats() Stats
- func (c *Client) Subscribe(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (<-chan *Message, SubscriptionToken, error)
- func (c *Client) SubscribeCallback(ctx context.Context, filters []TopicFilter, h HandlerFunc) (SubscriptionToken, error)
- func (c *Client) SubscribeQueue(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (*Queue[*Message], SubscriptionToken, error)
- func (c *Client) Unsubscribe(ctx context.Context, token SubscriptionToken) error
- type ClientGroup
- func (g *ClientGroup) Connect(ctx context.Context) error
- func (g *ClientGroup) Disconnect(ctx context.Context) error
- func (g *ClientGroup) Member(name string) *Client
- func (g *ClientGroup) Members() []*Client
- func (g *ClientGroup) Names() []string
- func (g *ClientGroup) Publish(ctx context.Context, opts wire.PublishOpts) error
- func (g *ClientGroup) Subscribe(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (<-chan *Message, map[string]SubscriptionToken, error)
- func (g *ClientGroup) SubscribeQueue(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (*Queue[*Message], map[string]SubscriptionToken, error)
- func (g *ClientGroup) Unsubscribe(ctx context.Context, name string, token SubscriptionToken) error
- func (g *ClientGroup) UnsubscribeAll(ctx context.Context, tokens map[string]SubscriptionToken) error
- type ClientGroupOption
- type Codec
- type Config
- type DropPolicy
- type GroupMember
- type GroupPublishPolicy
- type HandlerFunc
- type MemoryPublisherQueue
- func (q *MemoryPublisherQueue) Ack(_ context.Context, tok QueueToken) error
- func (q *MemoryPublisherQueue) Close() error
- func (q *MemoryPublisherQueue) Enqueue(_ context.Context, entry QueueEntry) error
- func (q *MemoryPublisherQueue) EvictHead(_ context.Context) (QueueEntry, bool, error)
- func (q *MemoryPublisherQueue) Len(_ context.Context) (int, error)
- func (q *MemoryPublisherQueue) PeekBatch(_ context.Context, n int) ([]QueueEntry, []QueueToken, error)
- type Message
- type Option
- func WithAuthenticator(a Authenticator) Option
- func WithBroker(url string) Option
- func WithBrokers(urls ...string) Option
- func WithCleanStart(b bool) Option
- func WithCleanStartOnReconnect(b bool) Option
- func WithClientID(id string) Option
- func WithConnectPacketBuilder(fn func(ctx context.Context, opts *wire.ConnectOpts) error) Option
- func WithConnectTimeout(d time.Duration) Option
- func WithConnectUserProperties(p []wire.UserProperty) Option
- func WithConnectUserProperty(key, value string) Option
- func WithCredentials(user string, pass []byte) Option
- func WithDialFunc(fn transport.DialFunc) Option
- func WithDialer(d *net.Dialer) Option
- func WithDisconnectFlushTimeout(d time.Duration) Option
- func WithDropPolicy(p DropPolicy) Option
- func WithInboundTopicAliasMaximum(n uint16) Option
- func WithKeepAlive(s uint16) Option
- func WithLogger(l *slog.Logger) Option
- func WithMaxSubscribeQueueSize(n int) Option
- func WithMaximumPacketSize(n uint32) Option
- func WithOnConnectError(fn func(error)) Option
- func WithOnConnectionDown(fn func() bool) Option
- func WithOnConnectionUp(fn func(*wire.Connack)) Option
- func WithOnReconnectAttempt(fn func(attempt int, brokerURL string)) Option
- func WithOnServerDisconnect(fn func(*wire.Disconnect)) Option
- func WithPingTimeout(d time.Duration) Option
- func WithPublishMode(m PublishMode) Option
- func WithPublisherPool(size int) Option
- func WithPublisherPoolClientIDFn(fn func(parent string, idx int) string) Option
- func WithPublisherPoolRouting(p PoolRoutingPolicy) Option
- func WithReceiveMaximum(n uint16) Option
- func WithReconnectBackoff(b Backoff) Option
- func WithRequestProblemInformation(b bool) Option
- func WithRequestResponseInformation(b bool) Option
- func WithSessionExpiry(s uint32) Option
- func WithStats() Option
- func WithStore(s session.Store) Option
- func WithTLSConfig(t *tls.Config) Option
- func WithWill(w *wire.WillOpts) Option
- func WithWriteBatch(n int) Option
- func WithWriteOverflowPolicy(p WriteOverflowPolicy) Option
- func WithWriteQueueSize(n int) Option
- func WithoutKeepAlive() Option
- type PoolRoutingPolicy
- type PublishMode
- type PublisherQueue
- type Queue
- type QueueEntry
- type QueueOption
- func WithDeadLetter(fn func(QueueEntry, error)) QueueOption
- func WithQueueBatchSize(n int) QueueOption
- func WithQueueDropPolicy(p DropPolicy) QueueOption
- func WithQueueIdleInterval(d time.Duration) QueueOption
- func WithQueueMaxSize(n int) QueueOption
- func WithQueuePublishTimeout(d time.Duration) QueueOption
- func WithQueueTTL(d time.Duration) QueueOption
- type QueuePublisher
- type QueueToken
- type Stats
- type SubscribeOption
- type SubscriptionToken
- type TopicFilter
- type Typed
- func (t *Typed[T]) Publish(ctx context.Context, opts wire.PublishOpts, v T) error
- func (t *Typed[T]) Subscribe(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (<-chan *TypedMessage[T], SubscriptionToken, error)
- func (t *Typed[T]) SubscribeQueue(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (*Queue[*TypedMessage[T]], SubscriptionToken, error)
- type TypedMessage
- type WriteOverflowPolicy
Examples ¶
Constants ¶
const ( // DefaultKeepAlive is the keepalive interval used when // WithKeepAlive is not called. MQTT v5 spec range is 1..65535 // seconds. DefaultKeepAlive uint16 = 30 // DefaultConnectTimeout bounds the CONNECT handshake (dial + // CONNECT/CONNACK) when WithConnectTimeout is not called. DefaultConnectTimeout = 10 * time.Second // DefaultPingTimeout is the budget for PINGRESP after a PINGREQ // before the connection is declared dead. Chosen flat (not a // multiple of KeepAlive) because PINGRESP is a tiny packet whose // RTT does not scale with how often PINGREQ is sent. With // DefaultKeepAlive (30 s) the total dead-detect time is 40 s — // comfortably inside every common broker's 1.5×KeepAlive (~45 s) // disconnect window, so the client drives the reconnect rather // than getting cut. DefaultPingTimeout = 10 * time.Second // DefaultSessionExpiry is the Session Expiry Interval (§3.1.2.11.2) // advertised on every CONNECT. 5 minutes is long enough to ride out // the typical reconnect blip / container redeploy window so QoS 1/2 // resume actually works out of the box; short enough that the // broker isn't asked to hold idle sessions indefinitely. DefaultSessionExpiry uint32 = 300 // DefaultWriteQueueSize sizes the internal MPSC write channel. DefaultWriteQueueSize = 256 // DefaultDisconnectFlushTimeout caps how long writeDisconnectAndClose // will wait for the DISCONNECT packet to flush before the // connection is forcibly torn down. DefaultDisconnectFlushTimeout = 500 * time.Millisecond )
Default values used by Config.defaults. Exported so callers can reference them when overriding selectively.
const ( // DefaultQueueBatchSize caps the per-PeekBatch entry count. DefaultQueueBatchSize = 16 // DefaultQueueIdleInterval is the drain-loop wakeup tick when no // Enqueue signals have arrived. Lower values reduce the worst- // case latency to pick up restored entries; higher values reduce // wakeup cost. DefaultQueueIdleInterval = 500 * time.Millisecond // DefaultQueuePublishTimeout caps each per-message broker // handshake (ack wait) inside the drain loop. DefaultQueuePublishTimeout = 30 * time.Second )
Default values used by NewQueuePublisher when QueueOptions leave a field unset.
const DefaultPublisherPoolClientIDFormat = "%s-pub-%d"
DefaultPublisherPoolClientIDFormat is the format string used by the default pool-member ClientID function: parent ClientID, then "-pub-", then the 1-based member index.
const DefaultSubscribeBuffer = 64
DefaultSubscribeBuffer is the channel buffer size used by Subscribe when SubBuffer is not called.
Variables ¶
var ( ErrNotConnected = errors.New("mqttv5: client not connected") ErrAlreadyConnected = errors.New("mqttv5: client already connected") ErrClosed = errors.New("mqttv5: client closed") ErrConnectRefused = errors.New("mqttv5: broker refused CONNECT") ErrUnexpectedPacket = errors.New("mqttv5: unexpected packet from broker") // ErrWriteQueueFull is returned by QoS 0 Publish when the client // is configured with WriteDropNewest and the writer queue has no // room. The publish never reaches the wire. ErrWriteQueueFull = errors.New("mqttv5: write queue full") // Subscribe-time errors raised when the broker advertised the // feature as unsupported in CONNACK (MQTT v5 §3.2.2.3.{11,12,13}). // Returned before any SUBSCRIBE traffic goes on the wire. ErrWildcardSubsUnsupported = errors.New("mqttv5: broker does not support wildcard subscriptions") ErrSubscriptionIDsUnsupported = errors.New("mqttv5: broker does not support subscription identifiers") )
Errors returned by the client.
var ( // ErrQueueClosed is returned by Enqueue / Publish after Close. ErrQueueClosed = errors.New("mqttv5: queue is closed") // ErrQueueFull is returned by [QueuePublisher.Publish] when the // queue is at its [WithQueueMaxSize] cap and DropNewest is in // effect. ErrQueueFull = errors.New("mqttv5: queue is full") // ErrQoS0NotQueueable is returned when a QoS 0 publish reaches // [QueuePublisher.Publish] — durable enqueue is meaningless when // the broker has no delivery obligation. ErrQoS0NotQueueable = errors.New("mqttv5: QueuePublisher requires QoS >= 1") // ErrEvictionNotSupported is returned by // [PublisherQueue.EvictHead] for backends that cannot evict // (append-only disk queues, etc.). [NewQueuePublisher] surfaces // this at construct time when [DropOldest] is requested. ErrEvictionNotSupported = errors.New("mqttv5: queue backend does not support eviction") )
Errors returned by the QueuePublisher / PublisherQueue surface.
var DefaultReconnectBackoff = ExponentialBackoff( time.Second, 30*time.Second, 200*time.Millisecond, )
DefaultReconnectBackoff is used when WithReconnectBackoff is not called: 1s -> 2s -> 4s -> ... up to 30s with 200ms jitter.
var ErrChanDropOldestUnsupported = errors.New(
"mqttv5: Subscribe (chan) does not support DropOldest; use SubscribeQueue",
)
ErrChanDropOldestUnsupported is returned by Client.Subscribe when called with SubDropPolicy(DropOldest); use Client.SubscribeQueue for DropOldest semantics.
var ErrInvalidBrokerURL = errors.New("mqttv5: invalid broker URL")
ErrInvalidBrokerURL is returned when a broker URL fails to parse or has an unsupported scheme. WithDialFunc relaxes scheme validation.
var ErrMissingBroker = errors.New("mqttv5: missing broker URL (use WithBroker)")
ErrMissingBroker is returned by New when no broker URL was provided.
var ErrNilHandler = errors.New("mqttv5: subscribe handler must not be nil")
ErrNilHandler is returned by Client.SubscribeCallback when the handler argument is nil.
var ErrNoHealthyPublishers = errors.New("mqttv5: no healthy publishers in pool")
ErrNoHealthyPublishers is returned by publisherPool.publish when every pool member is disconnected. The Client falls back to the main connection in this case (and increments PoolFallbacks).
Functions ¶
This section is empty.
Types ¶
type Authenticator ¶
type Authenticator interface {
Method() string
Begin() (data []byte, err error)
Continue(brokerData []byte) (response []byte, done bool, err error)
}
Authenticator drives the client side of MQTT v5 enhanced authentication — the challenge-response handshake that runs during CONNECT (SCRAM, Kerberos, OAuth challenge, etc.).
Method() supplies the AuthenticationMethod property; Begin() the initial AuthenticationData. For each AUTH packet from the broker before CONNACK, Continue(brokerData) returns the next response. done=true is informational — the broker decides when to send CONNACK. A non-nil error from Continue aborts the connection.
Implementations need only be safe for concurrent use when shared across multiple Clients (e.g. with WithPublisherPool).
type Backoff ¶
Backoff returns the delay before retry attempt N. The first retry after a failed connect is attempt 0 (the initial connect itself runs immediately, no backoff). Implementations must be safe for concurrent use.
func ConstantBackoff ¶
ConstantBackoff returns the same delay for every attempt. Predictable but vulnerable to thundering-herd reconnect storms; ExponentialBackoff is usually preferable.
func ExponentialBackoff ¶
ExponentialBackoff doubles min each attempt up to max with ±jitter added (range [computed-jitter, computed+jitter]).
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is an MQTT v5 client. After New(opts...), call Connect to establish the first connection and start the supervisor that reconnects on drop.
func New ¶
New constructs a Client. Apply WithBroker (required) plus any other options. The Client is not connected — call Connect.
func (*Client) ClientID ¶
ClientID returns the configured ClientID. ClientID returns the ClientID currently in effect. When the broker assigned an identifier via the AssignedClientIdentifier CONNACK property (typical when WithClientID("") was used), the broker-side value is returned; otherwise the value configured via WithClientID is returned.
func (*Client) Connect ¶
Connect performs the initial CONNECT/CONNACK handshake (bounded by ctx) and starts the supervisor. Reconnect attempts run on internal background contexts until Client.Disconnect. Returns ErrAlreadyConnected when called twice without an intervening Disconnect.
func (*Client) Disconnect ¶
Disconnect sends a graceful DISCONNECT (reason Normal Disconnection), stops the supervisor, and tears down the current connection. For a custom reason / properties use Client.DisconnectWith. Does not fire OnConnectionDown. Idempotent.
func (*Client) DisconnectWith ¶
DisconnectWith sends a DISCONNECT with opts, stops the supervisor, and tears down the current connection. Pool-member disconnect errors are joined into the returned error. Idempotent.
func (*Client) Publish ¶
Publish sends a PUBLISH packet.
- QoS 0: returns per PublishMode (PublishFireAndForget or PublishWaitForFlush).
- QoS 1: returns after PUBACK.
- QoS 2: returns after PUBCOMP.
QoS 1/2 packets are stashed on the session entry so the supervisor replays them with DUP=1 across reconnect; callers stay blocked on the ack across the drop. With WithPublisherPool the call first tries the pool per PoolRoutingPolicy and falls back to the main connection if every pool member is unhealthy.
Example ¶
ExampleClient_Publish sends a QoS 1 publish. Publish returns after PUBACK arrives. If the connection drops mid-flight the session entry persists and the supervisor replays the packet with DUP=1 on reconnect; the caller stays blocked across the drop.
package main
import (
"context"
"fmt"
"github.com/ashtonian/mqttv5"
"github.com/ashtonian/mqttv5/wire"
)
func main() {
cli, _ := mqttv5.New(mqttv5.WithBroker("mqtt://localhost:1883"))
ctx := context.Background()
_ = cli.Connect(ctx)
defer cli.Disconnect(ctx)
err := cli.Publish(ctx, wire.PublishOpts{
Topic: "events/example",
Payload: []byte(`{"value":42}`),
QoS: 1,
})
if err != nil {
fmt.Println("publish:", err)
}
}
Output:
func (*Client) SetBrokers ¶
SetBrokers replaces the broker URL list at runtime; the next reconnect uses it. Typical caller: WithOnServerDisconnect on a ServerMoved / UseAnotherServer redirect. URLs are validated as in New.
Example ¶
ExampleClient_SetBrokers shows multi-broker failover. The supervisor rotates through mqttv5.Config.BrokerURLs on every reconnect attempt; mqttv5.Client.SetBrokers replaces the list at runtime — the canonical hook is a ServerMoved DISCONNECT.
package main
import (
"crypto/tls"
"github.com/ashtonian/mqttv5"
"github.com/ashtonian/mqttv5/wire"
)
func main() {
var cli *mqttv5.Client
cli, _ = mqttv5.New(
mqttv5.WithBrokers(
"mqtts://broker-a.example.com:8883",
"mqtts://broker-b.example.com:8883",
),
mqttv5.WithTLSConfig(&tls.Config{MinVersion: tls.VersionTLS13}),
mqttv5.WithOnServerDisconnect(func(d *wire.Disconnect) {
if ref, ok := d.Properties.String(wire.PropServerReference); ok {
_ = cli.SetBrokers(ref)
}
}),
)
_ = cli
}
Output:
func (*Client) Stats ¶
Stats returns a snapshot of the Client's counters. Cheap to call — the returned value is detached. Returns the zero value when stats are disabled (WithStats not set on the Config).
func (*Client) Subscribe ¶
func (c *Client) Subscribe(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (<-chan *Message, SubscriptionToken, error)
Subscribe sends one SUBSCRIBE for filters and returns a buffered channel of matching inbound messages. Callers MUST call Message.Ack on each. Returns once SUBACK arrives; the channel closes on Client.Unsubscribe or Client.Disconnect. Full-buffer messages are dropped + acked (DropNewest); observe drops via SubOnDrop. Returns ErrChanDropOldestUnsupported when called with explicit SubDropPolicy of DropOldest.
Example ¶
ExampleClient_Subscribe receives matched messages on a buffered channel. The caller MUST call mqttv5.Message.Ack on each one — for QoS 1/2 the broker-side ack is held until Ack is called, in §4.6 arrival order across overlapping filters.
package main
import (
"context"
"fmt"
"github.com/ashtonian/mqttv5"
)
func main() {
cli, _ := mqttv5.New(mqttv5.WithBroker("mqtt://localhost:1883"))
ctx := context.Background()
_ = cli.Connect(ctx)
defer cli.Disconnect(ctx)
msgs, token, err := cli.Subscribe(ctx,
[]mqttv5.TopicFilter{{Topic: "events/#", QoS: 1}},
mqttv5.SubBuffer(256),
mqttv5.SubOnDrop(func(m *mqttv5.Message) {
// Dropped messages have already been ack'd; this is for
// observability only. Must not call Ack and must not retain
// Topic/Payload past return.
fmt.Println("dropped", m.Topic)
}),
)
if err != nil {
return
}
for m := range msgs {
fmt.Println(m.Topic, string(m.Payload))
_ = m.Ack()
}
_ = cli.Unsubscribe(ctx, token)
}
Output:
func (*Client) SubscribeCallback ¶
func (c *Client) SubscribeCallback(ctx context.Context, filters []TopicFilter, h HandlerFunc) (SubscriptionToken, error)
SubscribeCallback registers a subscription that invokes h for every matching inbound PUBLISH.
h runs synchronously on the read goroutine and MUST NOT block — a slow handler stalls PINGRESP and drops the connection. The runtime auto-acks via Message.Ack after h returns; use Client.Subscribe or Client.SubscribeQueue when ack must be deferred past h. Returns ErrNilHandler when h is nil.
Example ¶
ExampleClient_SubscribeCallback registers a synchronous handler invoked on the read goroutine. The handler must not block — a slow callback stalls PINGRESP processing and drops the connection. Ack fires automatically after the callback returns.
package main
import (
"context"
"fmt"
"github.com/ashtonian/mqttv5"
)
func main() {
cli, _ := mqttv5.New(mqttv5.WithBroker("mqtt://localhost:1883"))
ctx := context.Background()
_ = cli.Connect(ctx)
defer cli.Disconnect(ctx)
_, err := cli.SubscribeCallback(ctx,
[]mqttv5.TopicFilter{{Topic: "ctrl/+", QoS: 0}},
func(m *mqttv5.Message) {
fmt.Println(m.Topic, string(m.Payload))
},
)
if err != nil {
return
}
}
Output:
func (*Client) SubscribeQueue ¶
func (c *Client) SubscribeQueue(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (*Queue[*Message], SubscriptionToken, error)
SubscribeQueue sends one SUBSCRIBE and returns a Queue of matching messages. Unbounded by default; cap with SubMaxQueueSize. On overflow, SubDropPolicy decides: DropNewest acks + drops the inbound; DropOldest dequeues + acks the head and admits the new one. Closes on Client.Unsubscribe or Client.Disconnect.
Example ¶
ExampleClient_SubscribeQueue stores matched messages in an unbounded queue. mqttv5.DropOldest evicts the queue head when SubMaxQueueSize is reached so the freshest N messages survive.
package main
import (
"context"
"fmt"
"github.com/ashtonian/mqttv5"
)
func main() {
cli, _ := mqttv5.New(mqttv5.WithBroker("mqtt://localhost:1883"))
ctx := context.Background()
_ = cli.Connect(ctx)
defer cli.Disconnect(ctx)
q, _, err := cli.SubscribeQueue(ctx,
[]mqttv5.TopicFilter{{Topic: "events/#", QoS: 1}},
mqttv5.SubMaxQueueSize(10_000),
mqttv5.SubDropPolicy(mqttv5.DropOldest),
)
if err != nil {
return
}
for {
m, ok := q.Dequeue(ctx)
if !ok {
break
}
fmt.Println(m.Topic, string(m.Payload))
_ = m.Ack()
}
}
Output:
func (*Client) Unsubscribe ¶
func (c *Client) Unsubscribe(ctx context.Context, token SubscriptionToken) error
Unsubscribe sends an UNSUBSCRIBE for the topics covered by token, removes the handlers from the matcher, and drops them from the resubscribe set so reconnects don't re-request them.
type ClientGroup ¶
type ClientGroup struct {
// contains filtered or unexported fields
}
ClientGroup manages N parallel sessions to N independent brokers. Each member is a full Client with its own session and supervisor; the group adds dispatch policy on top. Use for bridges across independent brokers, multi-tenant SaaS with per-broker credentials, or a clustered broker fleet you want N parallel sessions into. For HA failover use WithBrokers on a single Client instead — ClientGroup does not failover between members.
func NewClientGroup ¶
func NewClientGroup(members []GroupMember, opts ...ClientGroupOption) (*ClientGroup, error)
NewClientGroup constructs a ClientGroup over the given members. Each GroupMember becomes a full *Client; the group applies the configured publish policy on top.
Member ClientIDs are deduped: if two members resolve to the same ClientID, the second gets a "-N" suffix appended. Set GroupMember.Opts with WithClientID for explicit per-member IDs.
Defaults: GroupPublishBroadcast, parallel Connect/Disconnect.
Example ¶
ExampleNewClientGroup creates one mqttv5.Client per broker and dispatches via the configured GroupPublishPolicy. Subscribe merges inbound messages from every member into one channel.
For failover within one logical client (interchangeable brokers, same data), use mqttv5.WithBrokers on a single Client instead — see [Example_ClientSetBrokers].
package main
import (
"context"
"fmt"
"github.com/ashtonian/mqttv5"
)
func main() {
group, err := mqttv5.NewClientGroup(
[]mqttv5.GroupMember{
{
Broker: "mqtts://emea.example.com:8883",
Name: "emea",
Opts: []mqttv5.Option{mqttv5.WithCredentials("emea-svc", []byte("token-1"))},
},
{
Broker: "mqtts://apac.example.com:8883",
Name: "apac",
Opts: []mqttv5.Option{mqttv5.WithCredentials("apac-svc", []byte("token-2"))},
},
},
mqttv5.WithGroupSharedOpts(
mqttv5.WithClientID("fanout"),
mqttv5.WithKeepAlive(30),
),
mqttv5.WithGroupPublishPolicy(mqttv5.GroupPublishBroadcast),
)
if err != nil {
panic(err)
}
ctx := context.Background()
if err := group.Connect(ctx); err != nil {
panic(err)
}
defer group.Disconnect(ctx)
msgs, _, err := group.Subscribe(ctx,
[]mqttv5.TopicFilter{{Topic: "events/#", QoS: 1}})
if err != nil {
return
}
for m := range msgs {
fmt.Println(m.Topic)
_ = m.Ack()
}
}
Output:
func (*ClientGroup) Connect ¶
func (g *ClientGroup) Connect(ctx context.Context) error
Connect dials every member. The default is parallel; use WithGroupSequentialLifecycle to serialise. The returned error is errors.Join of every member that failed — nil on full success. If at least one member succeeded the group is still usable for the healthy subset.
func (*ClientGroup) Disconnect ¶
func (g *ClientGroup) Disconnect(ctx context.Context) error
Disconnect tears down every member. Bridge goroutines for outstanding Subscribe / SubscribeQueue calls drain first (bounded by ctx). The default is parallel; use WithGroupSequentialLifecycle to serialise. Returned error is errors.Join of every member's Disconnect error.
func (*ClientGroup) Member ¶
func (g *ClientGroup) Member(name string) *Client
Member returns the Client for the named member, or nil when no such member exists.
func (*ClientGroup) Members ¶
func (g *ClientGroup) Members() []*Client
Members returns the underlying Clients in member order. Use for per-member operations the group API doesn't expose directly (Stats per member, manual Publish to a specific member, etc.).
func (*ClientGroup) Names ¶
func (g *ClientGroup) Names() []string
Names returns the member names in order.
func (*ClientGroup) Publish ¶
func (g *ClientGroup) Publish(ctx context.Context, opts wire.PublishOpts) error
Publish dispatches opts across members per the configured GroupPublishPolicy. Returns nil on success (one member acked for RoundRobin/HashByTopic; at least one acked for Broadcast); the joined per-member errors otherwise.
func (*ClientGroup) Subscribe ¶
func (g *ClientGroup) Subscribe(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (<-chan *Message, map[string]SubscriptionToken, error)
Subscribe subscribes on every member and merges all inbound messages into one channel. The caller MUST call msg.Ack() on each received message; Ack dispatches back to the member that delivered it.
The returned token map is keyed by member name — pass an entry to Unsubscribe to tear down that member's subscription, or use UnsubscribeAll to tear down everything.
Behaviour mirrors Client.Subscribe — Subscribe is "subscribe on all" only; for failover across interchangeable brokers use WithBrokers on a single Client.
func (*ClientGroup) SubscribeQueue ¶
func (g *ClientGroup) SubscribeQueue(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (*Queue[*Message], map[string]SubscriptionToken, error)
SubscribeQueue is the queue-merged variant of Subscribe.
func (*ClientGroup) Unsubscribe ¶
func (g *ClientGroup) Unsubscribe(ctx context.Context, name string, token SubscriptionToken) error
Unsubscribe tears down the named member's subscription.
func (*ClientGroup) UnsubscribeAll ¶
func (g *ClientGroup) UnsubscribeAll(ctx context.Context, tokens map[string]SubscriptionToken) error
UnsubscribeAll tears down every token in the supplied map. Errors are joined.
type ClientGroupOption ¶
type ClientGroupOption func(*clientGroupConfig)
ClientGroupOption configures a ClientGroup at construct time.
func WithGroupPublishPolicy ¶
func WithGroupPublishPolicy(p GroupPublishPolicy) ClientGroupOption
WithGroupPublishPolicy selects the ClientGroup.Publish dispatch policy. See GroupPublishPolicy.
func WithGroupSequentialLifecycle ¶
func WithGroupSequentialLifecycle() ClientGroupOption
WithGroupSequentialLifecycle disables the default parallel Connect / Disconnect / Subscribe across members. Useful in tests where deterministic ordering matters more than wall time.
func WithGroupSharedOpts ¶
func WithGroupSharedOpts(opts ...Option) ClientGroupOption
WithGroupSharedOpts applies opts to every member before that member's own [GroupMember.Opts]. Use for shared settings (KeepAlive, Logger, ReconnectBackoff). Per-member Opts override.
type Codec ¶
Codec encodes and decodes payloads of type T. Implementations live in separate submodules so the core stays codec-agnostic.
type Config ¶
type Config struct {
// BrokerURLs is the ordered list of broker URLs. The supervisor
// rotates through it on reconnect; [Client.SetBrokers] replaces
// it at runtime.
BrokerURLs []string
ClientID string
Username string
Password []byte
KeepAlive uint16
// CleanStart sets the CleanStart flag on the initial CONNECT
// (§3.1.2.4). Default true.
CleanStart bool
// CleanStartOnReconnect sets CleanStart on every CONNECT after
// the first. Default false (preserve broker session for QoS 1/2
// resume).
CleanStartOnReconnect bool
SessionExpiry uint32
ReceiveMaximum uint16
// MaximumPacketSize caps the largest packet the broker may send
// to this client (§3.1.2.11.4). Zero advertises no limit.
MaximumPacketSize uint32
// InboundTopicAliasMaximum is the inbound TopicAliasMaximum
// (§3.1.2.11.5). Zero (default) tells the broker not to alias
// inbound PUBLISHes. The outbound budget is taken automatically
// from the broker's CONNACK.
InboundTopicAliasMaximum uint16
// RequestResponseInformation asks the broker for
// ResponseInformation in CONNACK (§3.1.2.11.6).
RequestResponseInformation bool
// RequestProblemInformation asks the broker for ReasonString /
// UserProperties on error responses (§3.1.2.11.7).
RequestProblemInformation bool
// ConnectUserProperties are sent verbatim as CONNECT user
// properties.
ConnectUserProperties []wire.UserProperty
ConnectTimeout time.Duration
PingTimeout time.Duration
// DisconnectFlushTimeout bounds the wait for a graceful
// DISCONNECT write before the socket is forcibly closed.
// Default 500ms.
DisconnectFlushTimeout time.Duration
WriteQueueSize int
// WriteOverflowPolicy controls what QoS 0 Publish does when the
// writer queue is full. WriteBlock (default) waits for room or
// ctx; WriteDropNewest returns ErrWriteQueueFull immediately.
// QoS 1/2 unaffected.
WriteOverflowPolicy WriteOverflowPolicy
// WriteBatchMax caps how many pre-encoded packets the writer
// goroutine coalesces into one writev. 0 disables batching.
// Worth enabling only for sustained concurrent publishers; see
// [WithWriteBatch].
WriteBatchMax int
WillMessage *wire.WillOpts
// MaxSubscribeQueueSize caps the per-subscription buffer.
// Zero = unbounded.
MaxSubscribeQueueSize int
// DropPolicy sets the default policy for full subscription
// buffers. [Client.Subscribe] (chan) supports DropNewest only;
// [Client.SubscribeQueue] honors both.
DropPolicy DropPolicy
// PublisherPoolSize > 1 enables a publish-only connection pool
// that falls back to the main connection if every member is
// unhealthy.
PublisherPoolSize int
// PublisherPoolRouting selects the pool member-picking strategy.
PublisherPoolRouting PoolRoutingPolicy
// PublisherPoolClientIDFn derives each pool member's ClientID
// from the parent ClientID and a 1-based index. Default is
// fmt.Sprintf("%s-pub-%d", parent, idx).
PublisherPoolClientIDFn func(parent string, idx int) string
// PublishMode selects QoS 0 write-completion semantics. QoS 1/2
// always wait for the broker's ack.
PublishMode PublishMode
// OnConnectionUp fires on each successful CONNECT/CONNACK after
// pending replays and resubscribes. The supplied *wire.Connack
// is a detached clone safe to retain. Must not block.
OnConnectionUp func(*wire.Connack)
// OnConnectionDown fires on unexpected connection loss (not on
// user-initiated Disconnect). Return false to stop the
// supervisor; a subsequent Connect restarts it. Must not block.
OnConnectionDown func() bool
// OnConnectError fires per failed CONNECT attempt (dial failure,
// CONNACK refusal, AUTH-loop error). Observability only; the
// supervisor retries regardless. Must not block.
OnConnectError func(err error)
// OnReconnectAttempt fires immediately before each reconnect
// dial (not the initial Connect). attempt starts at 1. Must not
// block.
OnReconnectAttempt func(attempt int, brokerURL string)
// StatsEnabled toggles the [Client.Stats] counter set. Default
// false — when disabled the hot path skips every atomic
// increment and Stats returns the zero value.
StatsEnabled bool
// OnServerDisconnect fires when the broker sends a DISCONNECT
// (§3.14) — the supplied *wire.Disconnect is a detached clone.
// Fires after the connection is marked down and before
// OnConnectionDown. May call [Client.SetBrokers] to honour a
// ServerMoved / UseAnotherServer redirect on the next attempt.
OnServerDisconnect func(*wire.Disconnect)
// Authenticator drives MQTT v5 enhanced authentication when set.
Authenticator Authenticator
// ConnectPacketBuilder is invoked immediately before each CONNECT
// is serialised. The callback may mutate any field of opts —
// typical use is rotating Username/Password per attempt for
// OAuth refresh. ctx is the per-attempt context bounded by
// ConnectTimeout. A non-nil error fails the attempt; the
// supervisor retries after backoff and fires OnConnectError.
ConnectPacketBuilder func(ctx context.Context, opts *wire.ConnectOpts) error
// ReconnectBackoff returns the delay before retry attempt N.
// Defaults to [DefaultReconnectBackoff].
ReconnectBackoff Backoff
Logger *slog.Logger
Store session.Store
TLSConfig *tls.Config
Dialer *net.Dialer
// DialFunc replaces the built-in TCP/TLS dial path. Takes
// precedence over TLSConfig and Dialer. Use for SOCKS / mTLS /
// WebSocket (see transport/ws) / in-memory test transports.
// When set, broker URL scheme validation is skipped.
DialFunc transport.DialFunc
// contains filtered or unexported fields
}
Config carries the resolved configuration for a Client. Construct via New.
type DropPolicy ¶
type DropPolicy uint8
DropPolicy controls behaviour when a bounded subscription buffer fills.
const ( // DropNewest discards the inbound message + acks it so the // broker doesn't retry. DropNewest DropPolicy = iota // DropOldest evicts the buffer head to admit the inbound // message. SubscribeQueue only — chan-based [Client.Subscribe] // rejects this with [ErrChanDropOldestUnsupported]. DropOldest )
func (DropPolicy) String ¶
func (p DropPolicy) String() string
String returns a stable debug name for the policy.
type GroupMember ¶
type GroupMember struct {
// Broker is the URL this member dials. Required.
Broker string
// Name is an optional identifier surfaced in logs, returned in
// the Subscribe token map, and accepted by ClientGroup.Member.
// Defaults to "member-N" (1-based) when empty.
Name string
// Opts are applied AFTER any WithGroupSharedOpts shared options.
// Use to override or extend the shared opts per-broker (auth,
// TLS, ClientID, per-member callbacks, etc.).
Opts []Option
}
GroupMember describes one member of a ClientGroup. Each member becomes a full Client with its own session, supervisor, and CONNECT identity.
Use Opts for per-member settings — WithCredentials, WithTLSConfig, WithAuthenticator, WithClientID, WithConnectPacketBuilder, a per-member WithOnConnectionUp closing over Name for identity, etc. Opts are applied AFTER WithGroupSharedOpts so per-member settings win.
type GroupPublishPolicy ¶
type GroupPublishPolicy uint8
GroupPublishPolicy selects how ClientGroup.Publish dispatches across members. For HA failover across interchangeable brokers use WithBrokers on a single Client instead — ClientGroup keeps N parallel sessions where every member is treated as itself.
const ( // GroupPublishBroadcast (default) hits every member with the // same publish. Use when members carry different data (bridge / // mirror semantic). Returns nil if any member's Publish // succeeded; otherwise the joined error from every member. GroupPublishBroadcast GroupPublishPolicy = iota // GroupPublishRoundRobin distributes publishes across the group // for throughput against an interchangeable broker fleet. The // per-call starting cursor advances by one; unhealthy starting // members fall through to the next member regardless of policy. // Use when members are equivalent for publish purposes (e.g. a // clustered broker fleet) and you want N parallel sessions. GroupPublishRoundRobin // GroupPublishHashByTopic picks the starting member by FNV-1a // of opts.Topic, preserving per-topic ordering across the group // while every member is healthy. Recovery (hashed member down) // briefly reorders that topic as the probe walks the ring. // Empty Topic collapses to member 0. GroupPublishHashByTopic )
func (GroupPublishPolicy) String ¶
func (p GroupPublishPolicy) String() string
String returns a stable debug name for the policy.
type HandlerFunc ¶
type HandlerFunc func(*Message)
HandlerFunc runs synchronously on the read goroutine for each matching inbound PUBLISH. Handlers MUST be non-blocking — a slow handler stalls the read loop, blocks PINGRESP processing, and drops the connection. Use Subscribe / SubscribeQueue for async.
type MemoryPublisherQueue ¶
type MemoryPublisherQueue struct {
// contains filtered or unexported fields
}
MemoryPublisherQueue is an in-memory PublisherQueue. It is the default when no durable store is required. Crash semantics: all queued entries are lost. For at-least-once across process restarts, use the queue/file/ submodule instead.
func NewMemoryPublisherQueue ¶
func NewMemoryPublisherQueue() *MemoryPublisherQueue
NewMemoryPublisherQueue constructs an empty in-memory queue.
func (*MemoryPublisherQueue) Ack ¶
func (q *MemoryPublisherQueue) Ack(_ context.Context, tok QueueToken) error
Ack implements PublisherQueue. Removes the entry identified by tok. Acking an unknown token is a no-op.
func (*MemoryPublisherQueue) Close ¶
func (q *MemoryPublisherQueue) Close() error
Close implements PublisherQueue.
func (*MemoryPublisherQueue) Enqueue ¶
func (q *MemoryPublisherQueue) Enqueue(_ context.Context, entry QueueEntry) error
Enqueue implements PublisherQueue.
func (*MemoryPublisherQueue) EvictHead ¶
func (q *MemoryPublisherQueue) EvictHead(_ context.Context) (QueueEntry, bool, error)
EvictHead implements PublisherQueue by popping the oldest in-memory item.
func (*MemoryPublisherQueue) Len ¶
func (q *MemoryPublisherQueue) Len(_ context.Context) (int, error)
Len implements PublisherQueue.
func (*MemoryPublisherQueue) PeekBatch ¶
func (q *MemoryPublisherQueue) PeekBatch(_ context.Context, n int) ([]QueueEntry, []QueueToken, error)
PeekBatch implements PublisherQueue.
type Message ¶
Message is the handler-facing view of an inbound PUBLISH. The embedded *wire.Publish exposes Topic, Payload, Properties, QoS, Retain, Dup, PacketID — those fields alias the pooled frame and are valid only until Ack returns. Use CloneTopic / ClonePayload to retain past Ack.
SubscribeCallback auto-acks after the handler returns. Subscribe (chan) and SubscribeQueue require manual Ack — for QoS 1 the PUBACK is held until Ack (flushed in §4.6 arrival order); for QoS 2 the PUBREC is held until Ack, PUBCOMP fires automatically on PUBREL.
When SubAutoAck is set on Subscribe / SubscribeQueue the library delivers a detached *Message instead: Topic / Payload / Properties are heap copies safe to retain indefinitely, and Ack is a no-op (the original frame is already released).
A PUBLISH matching multiple overlapping filters delivers the same *Message to every matched handler. Ack is refcounted — only the final call releases the frame.
func (*Message) Ack ¶
Ack acknowledges the message. Idempotent — calls beyond the matched-handler count are silent no-ops. After the final Ack the embedded Publish fields (Topic, Payload, Properties) are invalid; use Message.CloneTopic / Message.ClonePayload for retention.
On a detached message (delivered when SubAutoAck is set) Ack is always a no-op — the broker ack and frame release already happened inside the dispatcher.
func (*Message) ClonePayload ¶
ClonePayload returns a heap-allocated copy of Payload safe to retain past Message.Ack.
func (*Message) CloneTopic ¶
CloneTopic returns a heap-allocated copy of Topic safe to retain past Message.Ack. One allocation; prefer reading m.Topic inside the handler when retention isn't needed.
type Option ¶
Option mutates a Config during New; a non-nil return aborts construction.
func WithAuthenticator ¶
func WithAuthenticator(a Authenticator) Option
WithAuthenticator enables MQTT v5 enhanced authentication. When combined with WithPublisherPool, the same Authenticator runs concurrently across the parent and every pool member; implementations used that way must be safe for concurrent use.
func WithBroker ¶
WithBroker sets a single broker URL. Supported schemes: mqtt://, tcp://, mqtts://, tls://, ssl://; default ports 1883 / 8883 fill in when missing. For ws:// or wss:// pair with WithDialFunc.
func WithBrokers ¶
WithBrokers sets the ordered broker URL list. The supervisor rotates through it across reconnect attempts; Client.SetBrokers replaces it at runtime.
func WithCleanStart ¶
WithCleanStart sets the CleanStart flag on the initial CONNECT. Default true. Reconnects use WithCleanStartOnReconnect.
func WithCleanStartOnReconnect ¶
WithCleanStartOnReconnect sets CleanStart on every CONNECT after the first. Default false — preserve broker session for QoS 1/2 resume.
func WithClientID ¶
WithClientID sets the MQTT ClientID. Empty string asks the broker to assign one via the AssignedClientIdentifier CONNACK property.
func WithConnectPacketBuilder ¶
WithConnectPacketBuilder mutates *wire.ConnectOpts immediately before each CONNECT is serialised — canonical OAuth token-refresh hook. ctx is the per-attempt context bounded by ConnectTimeout. A non-nil error fails the attempt; the supervisor retries after backoff and fires OnConnectError.
func WithConnectTimeout ¶
WithConnectTimeout caps the CONNECT handshake (dial + CONNECT/CONNACK).
func WithConnectUserProperties ¶
func WithConnectUserProperties(p []wire.UserProperty) Option
WithConnectUserProperties replaces the CONNECT user-property list. The slice is referenced as-is; do not mutate after passing.
func WithConnectUserProperty ¶
WithConnectUserProperty appends one CONNECT user property. May be called multiple times; for bulk replacement see WithConnectUserProperties.
func WithCredentials ¶
WithCredentials sets username and password sent in the CONNECT packet.
func WithDialFunc ¶
WithDialFunc replaces the built-in TCP/TLS dial path. Takes precedence over WithDialer / WithTLSConfig. Use for SOCKS / HTTP proxies, per-dial mTLS, WebSocket (transport/ws), or in-memory test transports. ctx is bounded by ConnectTimeout. When set, broker URL scheme validation is skipped. Nil rejected.
The transport/ws sibling submodule exposes a [ws.DialFunc] helper that bakes in DialOpts (TLS config, custom headers, etc.) and returns a value directly usable here:
mqttv5.WithDialFunc(ws.DialFunc(ws.DialOpts{TLSConfig: tlsCfg}))
func WithDialer ¶
WithDialer overrides the default *net.Dialer.
func WithDisconnectFlushTimeout ¶
WithDisconnectFlushTimeout caps the wait for a graceful DISCONNECT write before the socket is forcibly closed. Default DefaultDisconnectFlushTimeout.
func WithDropPolicy ¶
func WithDropPolicy(p DropPolicy) Option
WithDropPolicy sets the default per-subscription drop policy. Per-call override via SubDropPolicy; see DropPolicy.
func WithInboundTopicAliasMaximum ¶
WithInboundTopicAliasMaximum advertises how many inbound topic aliases this client accepts (§3.1.2.11.5). Zero (default) disables inbound aliasing.
func WithKeepAlive ¶
WithKeepAlive sets the keepalive interval (seconds, range 1..65535). Pass 0 to WithoutKeepAlive instead.
func WithLogger ¶
WithLogger sets the *slog.Logger. Default slog.Default.
func WithMaxSubscribeQueueSize ¶
WithMaxSubscribeQueueSize caps each subscription's buffer. 0 = unbounded. Per-call override via SubMaxQueueSize.
func WithMaximumPacketSize ¶
WithMaximumPacketSize caps the largest packet the broker may send to this client (§3.1.2.11.4). Zero advertises no limit.
func WithOnConnectError ¶
WithOnConnectError fires per failed CONNECT attempt (dial err, CONNACK refusal, AUTH-loop err). Observability only — the supervisor retries regardless. Must not block.
func WithOnConnectionDown ¶
WithOnConnectionDown registers a callback fired when the connection is lost. Does NOT fire on user-initiated Disconnect. Return false to terminate the supervisor — no further reconnect attempts. A subsequent Connect on the same Client re-starts the lifecycle. Must not block.
func WithOnConnectionUp ¶
WithOnConnectionUp fires on each successful CONNECT/CONNACK after pending replays and resubscribes. fn receives a detached *wire.Connack clone safe to retain (assigned ClientID, server keepalive, MaximumQoS, ResponseInformation, ...). Must not block.
func WithOnReconnectAttempt ¶
WithOnReconnectAttempt fires immediately before each reconnect dial (not the initial Client.Connect); attempt starts at 1. Must not block.
func WithOnServerDisconnect ¶
func WithOnServerDisconnect(fn func(*wire.Disconnect)) Option
WithOnServerDisconnect fires when the broker sends a DISCONNECT (§3.14). fn receives a detached *wire.Disconnect clone safe to retain. Fires after the connection is marked down and before OnConnectionDown; may call Client.SetBrokers to honour a ServerMoved / UseAnotherServer redirect on the next attempt. Not fired for socket-level errors or client-initiated Disconnect. Must not block.
func WithPingTimeout ¶
WithPingTimeout sets the budget for a PINGRESP after a PINGREQ. Exceeding it forces a reconnect. Default DefaultPingTimeout (10 s). Override only on links where PINGRESP round-trip exceeds a few seconds (LEO satellite, severely congested cellular).
func WithPublishMode ¶
func WithPublishMode(m PublishMode) Option
WithPublishMode selects QoS 0 Client.Publish semantics. See PublishMode. QoS 1/2 unaffected.
func WithPublisherPool ¶
WithPublisherPool enables N publish-only connections to the configured broker. 0/1 disables. Each member is a full Client with its own session; QoS 1/2 acks land on the emitting member. Members force CleanStart=true + SessionExpiry=0 — a mid-flight drop becomes at-least-once with possible duplicates. Use the main connection for ledger-style work.
Example ¶
ExampleWithPublisherPool dedicates N publish-only connections to the same broker, lifting publish throughput above what one keep-alive socket can absorb. Pair with mqttv5.PoolRoutingHashByTopic to preserve per-topic ordering.
package main
import (
"context"
"fmt"
"github.com/ashtonian/mqttv5"
"github.com/ashtonian/mqttv5/wire"
)
func main() {
cli, err := mqttv5.New(
mqttv5.WithBroker("mqtt://localhost:1883"),
mqttv5.WithPublisherPool(4),
mqttv5.WithPublisherPoolRouting(mqttv5.PoolRoutingHashByTopic),
)
if err != nil {
panic(err)
}
ctx := context.Background()
_ = cli.Connect(ctx)
defer cli.Disconnect(ctx)
// Round-robin / hashed across the 4 pool members under the hood.
for i := range 100 {
_ = cli.Publish(ctx, wire.PublishOpts{
Topic: fmt.Sprintf("events/tick/%d", i%4),
Payload: fmt.Appendf(nil, `{"n":%d}`, i),
QoS: 1,
})
}
}
Output:
func WithPublisherPoolClientIDFn ¶
WithPublisherPoolClientIDFn derives each pool member's ClientID from the parent ClientID and a 1-based index. Override when the parent ClientID is empty — the default ("-pub-N") collides across processes.
func WithPublisherPoolRouting ¶
func WithPublisherPoolRouting(p PoolRoutingPolicy) Option
WithPublisherPoolRouting selects the publisher-pool member-picking strategy. See PoolRoutingPolicy. No-op when PublisherPoolSize ≤ 1.
func WithReceiveMaximum ¶
WithReceiveMaximum bounds concurrent inbound QoS 1/2 publishes (§3.1.2.11.3). Also sizes the packet-ID pool.
func WithReconnectBackoff ¶
WithReconnectBackoff sets the supervisor's reconnect backoff. See ConstantBackoff and ExponentialBackoff; default DefaultReconnectBackoff.
func WithRequestProblemInformation ¶
WithRequestProblemInformation asks the broker for ReasonString / UserProperties on error responses (§3.1.2.11.7). Default true — debugging a CONNECT or PUBLISH refusal without these is "why was this rejected?" with no answer; the wire cost is a handful of bytes on the error path only. Pass false to opt out.
func WithRequestResponseInformation ¶
WithRequestResponseInformation asks the broker for ResponseInformation in CONNACK (§3.1.2.11.6) — used by the request/response pattern (§4.10).
func WithSessionExpiry ¶
WithSessionExpiry sets the Session Expiry Interval (§3.1.2.11.2) in seconds. Default DefaultSessionExpiry (300 s / 5 min) — enough for QoS 1/2 resume across a typical reconnect blip. Zero ends the session with the network connection (no broker-side retention).
func WithStats ¶
func WithStats() Option
WithStats enables the Client.Stats counter set. When disabled the hot path skips every atomic increment and Stats returns the zero value. Off by default; enable to scrape counters into Prometheus / OpenTelemetry / expvar.
func WithStore ¶
WithStore overrides the session.Store. Default is in-memory; use the store/file submodule for crash safety.
func WithTLSConfig ¶
WithTLSConfig supplies a *tls.Config for mqtts:// / tls:// dials. Ignored for plain TCP schemes.
func WithWill ¶
WithWill attaches a will message to be published by the broker if this client disconnects ungracefully.
func WithWriteBatch ¶
WithWriteBatch coalesces up to n pre-encoded packets per writev syscall. Off by default (n=0 or 1). Wins under sustained concurrent Client.Publish (~40% at 256B with n=16 on loopback); regresses for single-publisher or large-payload workloads. Measure first.
func WithWriteOverflowPolicy ¶
func WithWriteOverflowPolicy(p WriteOverflowPolicy) Option
WithWriteOverflowPolicy selects the QoS 0 writer-queue overflow policy. Only meaningful for PublishFireAndForget — QoS 1/2 and PublishWaitForFlush always wait for either the broker or the writer to be ready.
func WithWriteQueueSize ¶
WithWriteQueueSize sets the internal MPSC write-channel buffer. Larger absorbs publish bursts; costs memory and queue latency.
func WithoutKeepAlive ¶
func WithoutKeepAlive() Option
WithoutKeepAlive disables MQTT keepalive entirely (no PINGREQ goroutine). Rarely correct in production — without PINGREQ the broker cannot detect a half-open connection from this side.
type PoolRoutingPolicy ¶
type PoolRoutingPolicy uint8
PoolRoutingPolicy selects how the publisher pool picks the starting member per Client.Publish. Unhealthy starting members fall through to the next member regardless of policy.
const ( // PoolRoutingRoundRobin (default) cycles through members; same // topic may arrive at the broker out of order. PoolRoutingRoundRobin PoolRoutingPolicy = iota // PoolRoutingHashByTopic picks the starting member by FNV-1a of // opts.Topic, preserving per-topic ordering while the pool is // healthy. Empty Topic collapses to member 0. PoolRoutingHashByTopic )
func (PoolRoutingPolicy) String ¶
func (p PoolRoutingPolicy) String() string
String returns a stable debug name for the policy.
type PublishMode ¶
type PublishMode uint8
PublishMode controls QoS 0 Client.Publish write-completion semantics. QoS 1/2 always wait for the broker's ack.
const ( // PublishFireAndForget (default): Publish returns once the // message is queued for the writer goroutine. Transport errors // are not surfaced. PublishFireAndForget PublishMode = iota // PublishWaitForFlush makes Publish block until conn.Write // returns and surfaces transport errors. Useful as a // connection-health probe. PublishWaitForFlush )
func (PublishMode) String ¶
func (m PublishMode) String() string
String returns a stable name for the mode.
type PublisherQueue ¶
type PublisherQueue interface {
// Enqueue durably stores entry. Returns ErrQueueClosed after Close.
Enqueue(ctx context.Context, entry QueueEntry) error
// PeekBatch returns up to n oldest entries without removing them.
// The tokens slice is parallel to entries; the caller passes each
// token to Ack to mark the entry delivered. An empty result with
// nil error means "queue is empty right now".
PeekBatch(ctx context.Context, n int) (entries []QueueEntry, tokens []QueueToken, err error)
// Ack removes the entry identified by token. Acking an unknown
// token is a no-op (an entry already removed via another path).
Ack(ctx context.Context, token QueueToken) error
// EvictHead removes the oldest entry and returns it. Returns
// (zero, false, nil) when the queue is empty. Implementations
// that cannot evict the head out-of-band return
// ErrEvictionNotSupported and the caller must use DropNewest.
EvictHead(ctx context.Context) (QueueEntry, bool, error)
// Len returns the current entry count.
Len(ctx context.Context) (int, error)
// Close releases any underlying resources. After Close, subsequent
// Enqueue calls must return ErrQueueClosed.
Close() error
}
PublisherQueue is the durable backing store for QueuePublisher. Implementations may be in-memory or disk-backed. Every method must be safe for concurrent use.
type Queue ¶
type Queue[T any] struct { // contains filtered or unexported fields }
Queue is an unbounded MPSC FIFO of T. Enqueue is non-blocking; Dequeue blocks until an item arrives, the queue is closed, or ctx cancels. After Close, queued items are still returned by Dequeue; once drained, Dequeue returns (zero, false).
func NewQueue ¶
NewQueue returns a ready-to-use Queue.
Example ¶
ExampleNewQueue shows the lock-protected unbounded MPSC queue returned by mqttv5.Client.SubscribeQueue. It is exported so callers can build their own internal pipelines on top of the same type.
package main
import (
"context"
"fmt"
"github.com/ashtonian/mqttv5"
)
func main() {
q := mqttv5.NewQueue[string]()
go func() {
defer q.Close()
q.Enqueue("one")
q.Enqueue("two")
}()
ctx := context.Background()
for {
v, ok := q.Dequeue(ctx)
if !ok {
break
}
fmt.Println(v)
}
}
Output: one two
func (*Queue[T]) Close ¶
func (q *Queue[T]) Close()
Close signals waiting Dequeue calls to drain. Items already queued remain available.
func (*Queue[T]) Dequeue ¶
Dequeue blocks until an item is available, the queue is closed, or ctx cancels. ok is false when no item could be retrieved.
func (*Queue[T]) TryDequeue ¶
TryDequeue removes and returns the head without blocking.
type QueueEntry ¶
type QueueEntry struct {
Publish wire.PublishOpts
EnqueuedAt time.Time
}
QueueEntry is one durable publish in flight between Publish (enqueue) and broker PUBACK (Ack). The drain loop sets PacketID and Dup per attempt; values in Publish are ignored.
type QueueOption ¶
type QueueOption func(*queueConfig)
QueueOption customises a QueuePublisher.
func WithDeadLetter ¶
func WithDeadLetter(fn func(QueueEntry, error)) QueueOption
WithDeadLetter registers a callback for terminally failed entries (e.g. TTL expiry). Runs on the drain goroutine — must not block.
func WithQueueBatchSize ¶
func WithQueueBatchSize(n int) QueueOption
WithQueueBatchSize caps the number of entries the drain goroutine pulls in one PeekBatch. Default 16.
func WithQueueDropPolicy ¶
func WithQueueDropPolicy(p DropPolicy) QueueOption
WithQueueDropPolicy controls full-queue behaviour. DropNewest (default) returns ErrQueueFull from Publish when the queue is at capacity. DropOldest evicts the oldest entry via PublisherQueue.EvictHead and dead-letters it (if a callback is registered) before enqueuing the new one. Backends that cannot evict (return ErrEvictionNotSupported) cause NewQueuePublisher to fail rather than silently downgrade.
func WithQueueIdleInterval ¶
func WithQueueIdleInterval(d time.Duration) QueueOption
WithQueueIdleInterval overrides the drain loop's idle-tick wakeup. Default DefaultQueueIdleInterval.
func WithQueueMaxSize ¶
func WithQueueMaxSize(n int) QueueOption
WithQueueMaxSize bounds the queue. At capacity, Publish either returns ErrQueueFull (DropNewest) or evicts the oldest entry via EvictHead (DropOldest — see WithQueueDropPolicy).
func WithQueuePublishTimeout ¶
func WithQueuePublishTimeout(d time.Duration) QueueOption
WithQueuePublishTimeout caps each per-message broker handshake inside the drain loop. Default DefaultQueuePublishTimeout. Set short to keep retries snappy at the cost of more wasted work against a slow broker; set long for occasional huge payloads.
func WithQueueTTL ¶
func WithQueueTTL(d time.Duration) QueueOption
WithQueueTTL drops entries older than d at drain time and fires the dead-letter callback. When d > 0 the value also mirrors into each entry's MessageExpiryInterval so the broker enforces TTL after hand-off.
type QueuePublisher ¶
type QueuePublisher struct {
// contains filtered or unexported fields
}
QueuePublisher durably enqueues publishes and drains them to the broker in order from a background goroutine. QueuePublisher.Publish returns once the entry is stored, before the broker round-trip. For crash safety, pair the queue/file submodule with the store/file submodule (in-flight ack state) — independent layers.
func NewQueuePublisher ¶
func NewQueuePublisher(client *Client, queue PublisherQueue, opts ...QueueOption) (*QueuePublisher, error)
NewQueuePublisher wires a QueuePublisher over client + queue. The drain goroutine starts immediately and runs until QueuePublisher.Close. When DropOldest is requested, queue.EvictHead is probed; ErrEvictionNotSupported aborts construction rather than silently downgrading.
Example ¶
ExampleNewQueuePublisher durably enqueues publishes and drains them to the broker in order from a background goroutine. Publish returns once the queue has stored the entry, before the broker round-trip.
For at-least-once across process restart, swap mqttv5.NewMemoryPublisherQueue for the queue/file submodule.
package main
import (
"context"
"fmt"
"time"
"github.com/ashtonian/mqttv5"
"github.com/ashtonian/mqttv5/wire"
)
func main() {
cli, _ := mqttv5.New(mqttv5.WithBroker("mqtt://localhost:1883"))
ctx := context.Background()
_ = cli.Connect(ctx)
defer cli.Disconnect(ctx)
pub, err := mqttv5.NewQueuePublisher(cli, mqttv5.NewMemoryPublisherQueue(),
mqttv5.WithQueueBatchSize(32),
mqttv5.WithQueueTTL(24*time.Hour),
mqttv5.WithDeadLetter(func(e mqttv5.QueueEntry, err error) {
fmt.Println("dead letter", e.Publish.Topic, err)
}),
)
if err != nil {
panic(err)
}
defer pub.Close(context.Background())
_ = pub.Publish(ctx, wire.PublishOpts{
Topic: "events/durable",
Payload: []byte("at-least-once"),
QoS: 1,
})
}
Output:
func (*QueuePublisher) Close ¶
func (p *QueuePublisher) Close(ctx context.Context) error
Close stops the drain goroutine and closes the backing queue. ctx bounds how long Close waits for the drain to settle.
func (*QueuePublisher) Publish ¶
func (p *QueuePublisher) Publish(ctx context.Context, opts wire.PublishOpts) error
Publish durably enqueues opts and returns once the queue has stored it. The broker handshake happens asynchronously on the drain goroutine. Rejects QoS 0 with ErrQoS0NotQueueable.
type QueueToken ¶
type QueueToken any
QueueToken identifies one entry. Opaque to QueuePublisher — implementations choose whatever value lets Ack remove the exact entry PeekBatch returned (sequence number, byte offset, etc.).
type Stats ¶
type Stats struct {
Connects uint64 // successful CONNECT/CONNACK handshakes
ConnectFailures uint64 // CONNECT attempts that failed (any cause)
Disconnects uint64 // connection drops (any cause)
ServerDisconnects uint64 // broker-sent DISCONNECT packets
PingTimeouts uint64 // PINGRESP timeouts that forced a reconnect
PublishesSent uint64 // QoS 0/1/2 publishes handed to the writer
PublishesAcked uint64 // QoS 1/2 acks received from broker
PublishesInflight uint64 // QoS 1/2 outbound entries awaiting ack
PublishesReplayed uint64 // QoS 1/2 replays issued after reconnect
InboundPublishes uint64 // PUBLISHes received from the broker
InboundDropped uint64 // inbound PUBLISHes dropped due to full subscriber buffer
SubscribesSent uint64 // SUBSCRIBE packets emitted
UnsubscribesSent uint64 // UNSUBSCRIBE packets emitted
SubscriptionsActive uint64 // count of currently registered subscriptions
PoolFallbacks uint64 // publishes that fell back from pool to main connection
}
Stats is the snapshot of Client counters returned by Client.Stats.
Two flavours of field:
- Monotonic counters (Connects, ConnectFailures, Disconnects, ServerDisconnects, PingTimeouts, PublishesSent, PublishesAcked, PublishesReplayed, InboundPublishes, InboundDropped, SubscribesSent, UnsubscribesSent, PoolFallbacks). Sampled atomically — no locking on the read side.
- Point-in-time gauges (PublishesInflight, SubscriptionsActive) read live state under a mutex (Outbound tracker and the subscriptions map respectively). The locks are cheap but a scraper hitting Stats() at very high frequency does contend with Subscribe / Unsubscribe and the ack path. Default to sampling at most a few times per second.
The zero value is returned when WithStats was not set on the Config — when stats are disabled the hot path skips every atomic increment, so leave WithStats off when you have no scraper.
type SubscribeOption ¶
type SubscribeOption func(*subscribeConfig)
SubscribeOption configures client-side delivery for one Subscribe call. Protocol flags (QoS, NoLocal, etc.) live on TopicFilter itself — these options control what happens after delivery.
func SubAutoAck ¶
func SubAutoAck() SubscribeOption
SubAutoAck makes Client.Subscribe / Client.SubscribeQueue ack each delivery on the dispatcher goroutine, before handing the message to the consumer. The consumer receives a detached *Message whose Topic / Payload / Properties are heap copies safe to retain indefinitely; Message.Ack becomes a no-op.
Trade-off: convenience for two allocations per delivery (Topic clone + Payload clone) on top of the *Message + *wire.Publish header allocations — the default manual-ack path is zero-alloc on the receive hot path. More importantly, auto-ack breaks at-least-once semantics: a consumer that crashes between delivery and processing has nothing to replay, since the broker considers the message delivered. Reach for this on QoS 0 / observational consumers; keep manual ack for ledger-style workloads.
Ignored by Client.SubscribeCallback (already auto-acks).
func SubBuffer ¶
func SubBuffer(n int) SubscribeOption
SubBuffer sets Client.Subscribe's channel buffer size. Default DefaultSubscribeBuffer. Ignored by Client.SubscribeQueue and Client.SubscribeCallback.
func SubDropPolicy ¶
func SubDropPolicy(p DropPolicy) SubscribeOption
SubDropPolicy chooses behaviour when the channel buffer or queue cap fills.
- Subscribe (chan): only DropNewest is supported. Passing DropOldest here causes Subscribe to return ErrChanDropOldestUnsupported — peek-and-pop on a consumer-owned channel would race the receiver. Use SubscribeQueue for DropOldest semantics.
- SubscribeQueue: both policies are honored.
- SubscribeCallback: ignored (callback never drops; slow handler stalls the connection instead).
Default is the client-level WithDropPolicy (DropNewest).
func SubMaxQueueSize ¶
func SubMaxQueueSize(n int) SubscribeOption
SubMaxQueueSize caps Client.SubscribeQueue's length. 0 = unbounded. Default from WithMaxSubscribeQueueSize (0).
func SubOnDrop ¶
func SubOnDrop(fn func(*Message)) SubscribeOption
SubOnDrop fires when a message is dropped by Client.Subscribe (chan full) or Client.SubscribeQueue (cap reached). The dropped message is already ack'd; fn is for metrics / logging only — must NOT call Message.Ack, must NOT retain Topic / Payload past return (both alias the frame buffer).
type SubscriptionToken ¶
type SubscriptionToken struct {
// contains filtered or unexported fields
}
SubscriptionToken identifies an active subscription so Unsubscribe can find and remove it.
type TopicFilter ¶
type TopicFilter = wire.SubscribeFilter
TopicFilter is one entry in a SUBSCRIBE packet: a topic pattern plus the per-filter MQTT v5 §3.8.3.1 flags. Only Topic is required; the rest default to MQTT-spec zero values.
{Topic: "events/#"} // QoS 0
{Topic: "events/#", QoS: 1} // at-least-once
{Topic: "events/#", QoS: 1, NoLocal: true} // don't echo own publishes back
type Typed ¶
type Typed[T any] struct { // contains filtered or unexported fields }
Typed pairs a *Client with a Codec[T] for typed publish/subscribe.
func NewTyped ¶
NewTyped wraps c with codec.
Example ¶
ExampleNewTyped wraps a mqttv5.Client with a mqttv5.Codec so Publish and Subscribe carry decoded values. Decode failures on the receive path are logged + acked + dropped — wrap the codec for stricter behaviour.
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/ashtonian/mqttv5"
"github.com/ashtonian/mqttv5/wire"
)
// readingCodec is a tiny inline [mqttv5.Codec] used by
// [ExampleNewTyped]. In real code prefer the codec/json submodule or
// any other implementation of mqttv5.Codec[T].
type readingCodec struct{}
type reading struct {
Device string `json:"device"`
Temp float64 `json:"temp"`
}
func (readingCodec) Encode(v reading) ([]byte, error) { return json.Marshal(v) }
func (readingCodec) Decode(b []byte) (reading, error) {
var v reading
err := json.Unmarshal(b, &v)
return v, err
}
func main() {
cli, _ := mqttv5.New(mqttv5.WithBroker("mqtt://localhost:1883"))
ctx := context.Background()
_ = cli.Connect(ctx)
defer cli.Disconnect(ctx)
typed := mqttv5.NewTyped[reading](cli, readingCodec{})
_ = typed.Publish(ctx,
wire.PublishOpts{Topic: "sensors/a1", QoS: 1},
reading{Device: "a1", Temp: 22.5},
)
ch, _, err := typed.Subscribe(ctx,
[]mqttv5.TopicFilter{{Topic: "sensors/+", QoS: 1}})
if err != nil {
return
}
for m := range ch {
fmt.Println(m.Topic, m.Value.Temp)
_ = m.Ack()
}
}
Output:
func (*Typed[T]) Subscribe ¶
func (t *Typed[T]) Subscribe(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (<-chan *TypedMessage[T], SubscriptionToken, error)
Subscribe wraps Client.Subscribe with a decode stage. Decode failures are logged + acked + dropped — for stricter behaviour wrap the codec or use SubscribeCallback. Caller MUST call Ack on each TypedMessage.
func (*Typed[T]) SubscribeQueue ¶
func (t *Typed[T]) SubscribeQueue(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (*Queue[*TypedMessage[T]], SubscriptionToken, error)
SubscribeQueue is the queue-backed variant of Subscribe.
type TypedMessage ¶
TypedMessage wraps Message with an already-decoded Value and a detached Topic. Both Value and Topic are safe to retain past Ack — Value via the codec's allocation, Topic via an explicit strings.Clone at construction. The embedded *Message still aliases the frame for Payload / Properties access; use Message.ClonePayload or read those fields before calling Ack if you need them.
type WriteOverflowPolicy ¶
type WriteOverflowPolicy uint8
WriteOverflowPolicy controls what QoS 0 Client.Publish does when the internal writer queue (sized by WithWriteQueueSize) is full. QoS 1/2 are unaffected — they always block on the broker ack.
const ( // WriteBlock (default) blocks Publish until the writer queue has // room, the call's ctx fires, or the connection goes down. The // caller surfaces overload as latency. WriteBlock WriteOverflowPolicy = iota // WriteDropNewest returns [ErrWriteQueueFull] immediately when // the queue has no room. Use for high-rate fire-and-forget // telemetry where the producer goroutine must not block. WriteDropNewest )
func (WriteOverflowPolicy) String ¶
func (p WriteOverflowPolicy) String() string
String returns a stable name for the policy.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
codec
|
|
|
json
module
|
|
|
msgpack
module
|
|
|
internal
|
|
|
trie
Package trie implements an MQTT v5 topic filter matcher used by the client to dispatch inbound PUBLISH packets to subscribed handlers.
|
Package trie implements an MQTT v5 topic filter matcher used by the client to dispatch inbound PUBLISH packets to subscribed handlers. |
|
queue
|
|
|
file
module
|
|
|
Package session manages MQTT v5 session state: packet ID allocation, in-flight QoS 1/2 tracking (both outbound and inbound), and a Store interface so the state can survive a reconnect.
|
Package session manages MQTT v5 session state: packet ID allocation, in-flight QoS 1/2 tracking (both outbound and inbound), and a Store interface so the state can survive a reconnect. |
|
store
|
|
|
file
module
|
|
|
Package transport abstracts the network layer underneath the MQTT v5 client.
|
Package transport abstracts the network layer underneath the MQTT v5 client. |
|
ws
module
|
|
|
Package wire implements the MQTT v5 wire-format codec.
|
Package wire implements the MQTT v5 wire-format codec. |