Versions in this module Expand all Collapse all v0 v0.9.1 May 22, 2026 Changes in this version + const DefaultConnectTimeout + const DefaultDisconnectFlushTimeout + const DefaultKeepAlive + const DefaultPingTimeout + const DefaultPublisherPoolClientIDFormat + const DefaultQueueBatchSize + const DefaultQueueIdleInterval + const DefaultQueuePublishTimeout + const DefaultSessionExpiry + const DefaultSubscribeBuffer + const DefaultWriteQueueSize + var DefaultReconnectBackoff = ExponentialBackoff(time.Second, 30 * time.Second, 200 * time.Millisecond) + var ErrAlreadyConnected = errors.New("mqttv5: client already connected") + var ErrChanDropOldestUnsupported = errors.New("mqttv5: Subscribe (chan) does not support DropOldest; use SubscribeQueue") + var ErrClosed = errors.New("mqttv5: client closed") + var ErrConnectRefused = errors.New("mqttv5: broker refused CONNECT") + var ErrEvictionNotSupported = errors.New("mqttv5: queue backend does not support eviction") + var ErrInvalidBrokerURL = errors.New("mqttv5: invalid broker URL") + var ErrMissingBroker = errors.New("mqttv5: missing broker URL (use WithBroker)") + var ErrNilHandler = errors.New("mqttv5: subscribe handler must not be nil") + var ErrNoHealthyPublishers = errors.New("mqttv5: no healthy publishers in pool") + var ErrNotConnected = errors.New("mqttv5: client not connected") + var ErrQoS0NotQueueable = errors.New("mqttv5: QueuePublisher requires QoS >= 1") + var ErrQueueClosed = errors.New("mqttv5: queue is closed") + var ErrQueueFull = errors.New("mqttv5: queue is full") + var ErrSharedSubsUnsupported = errors.New("mqttv5: broker does not support shared subscriptions") + var ErrSubscriptionIDsUnsupported = errors.New("mqttv5: broker does not support subscription identifiers") + var ErrUnexpectedPacket = errors.New("mqttv5: unexpected packet from broker") + var ErrWildcardSubsUnsupported = errors.New("mqttv5: broker does not support wildcard subscriptions") + var ErrWriteQueueFull = errors.New("mqttv5: write queue full") + type Authenticator interface + Begin func() (data []byte, err error) + Continue func(brokerData []byte) (response []byte, done bool, err error) + Method func() string + type Backoff func(attempt int) time.Duration + func ConstantBackoff(d time.Duration) Backoff + func ExponentialBackoff(min, max, jitter time.Duration) Backoff + type Client struct + func New(opts ...Option) (*Client, error) + func (c *Client) ClientID() string + func (c *Client) Connect(ctx context.Context) error + func (c *Client) Connected() bool + func (c *Client) Disconnect(ctx context.Context) error + func (c *Client) DisconnectWith(ctx context.Context, opts wire.DisconnectOpts) error + func (c *Client) Publish(ctx context.Context, opts wire.PublishOpts) error + func (c *Client) SetBrokers(urls ...string) error + func (c *Client) Stats() Stats + func (c *Client) Subscribe(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (<-chan *Message, SubscriptionToken, error) + func (c *Client) SubscribeCallback(ctx context.Context, filters []TopicFilter, h HandlerFunc) (SubscriptionToken, error) + func (c *Client) SubscribeQueue(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (*Queue[*Message], SubscriptionToken, error) + func (c *Client) Unsubscribe(ctx context.Context, token SubscriptionToken) error + type ClientGroup struct + func NewClientGroup(members []GroupMember, opts ...ClientGroupOption) (*ClientGroup, error) + func (g *ClientGroup) Connect(ctx context.Context) error + func (g *ClientGroup) Disconnect(ctx context.Context) error + func (g *ClientGroup) Member(name string) *Client + func (g *ClientGroup) Members() []*Client + func (g *ClientGroup) Names() []string + func (g *ClientGroup) Publish(ctx context.Context, opts wire.PublishOpts) error + func (g *ClientGroup) Subscribe(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (<-chan *Message, map[string]SubscriptionToken, error) + func (g *ClientGroup) SubscribeQueue(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (*Queue[*Message], map[string]SubscriptionToken, error) + func (g *ClientGroup) Unsubscribe(ctx context.Context, name string, token SubscriptionToken) error + func (g *ClientGroup) UnsubscribeAll(ctx context.Context, tokens map[string]SubscriptionToken) error + type ClientGroupOption func(*clientGroupConfig) + func WithGroupPublishPolicy(p GroupPublishPolicy) ClientGroupOption + func WithGroupSequentialLifecycle() ClientGroupOption + func WithGroupSharedOpts(opts ...Option) ClientGroupOption + type Codec interface + Decode func(b []byte) (T, error) + Encode func(v T) ([]byte, error) + type Config struct + Authenticator Authenticator + BrokerURLs []string + CleanStart bool + CleanStartOnReconnect bool + ClientID string + ConnectPacketBuilder func(ctx context.Context, opts *wire.ConnectOpts) error + ConnectTimeout time.Duration + ConnectUserProperties []wire.UserProperty + DialFunc transport.DialFunc + Dialer *net.Dialer + DisconnectFlushTimeout time.Duration + DropPolicy DropPolicy + InboundTopicAliasMaximum uint16 + KeepAlive uint16 + Logger *slog.Logger + MaxSubscribeQueueSize int + MaximumPacketSize uint32 + OnConnectError func(err error) + OnConnectionDown func() bool + OnConnectionUp func(*wire.Connack) + OnReconnectAttempt func(attempt int, brokerURL string) + OnServerDisconnect func(*wire.Disconnect) + Password []byte + PingTimeout time.Duration + PublishMode PublishMode + PublisherPoolClientIDFn func(parent string, idx int) string + PublisherPoolRouting PoolRoutingPolicy + PublisherPoolSize int + ReceiveMaximum uint16 + ReconnectBackoff Backoff + RequestProblemInformation bool + RequestResponseInformation bool + SessionExpiry uint32 + StatsEnabled bool + Store session.Store + TLSConfig *tls.Config + Username string + WillMessage *wire.WillOpts + WriteBatchMax int + WriteOverflowPolicy WriteOverflowPolicy + WriteQueueSize int + type DropPolicy uint8 + const DropNewest + const DropOldest + func (p DropPolicy) String() string + type GroupMember struct + Broker string + Name string + Opts []Option + type GroupPublishPolicy uint8 + const GroupPublishBroadcast + const GroupPublishHashByTopic + const GroupPublishRoundRobin + func (p GroupPublishPolicy) String() string + type HandlerFunc func(*Message) + type MemoryPublisherQueue struct + func NewMemoryPublisherQueue() *MemoryPublisherQueue + func (q *MemoryPublisherQueue) Ack(_ context.Context, tok QueueToken) error + func (q *MemoryPublisherQueue) Close() error + func (q *MemoryPublisherQueue) Enqueue(_ context.Context, entry QueueEntry) error + func (q *MemoryPublisherQueue) EvictHead(_ context.Context) (QueueEntry, bool, error) + func (q *MemoryPublisherQueue) Len(_ context.Context) (int, error) + func (q *MemoryPublisherQueue) PeekBatch(_ context.Context, n int) ([]QueueEntry, []QueueToken, error) + type Message struct + func (m *Message) Ack() error + func (m *Message) ClonePayload() []byte + func (m *Message) CloneTopic() string + type Option func(*Config) error + func WithAuthenticator(a Authenticator) Option + func WithBroker(url string) Option + func WithBrokers(urls ...string) Option + func WithCleanStart(b bool) Option + func WithCleanStartOnReconnect(b bool) Option + func WithClientID(id string) Option + func WithConnectPacketBuilder(fn func(ctx context.Context, opts *wire.ConnectOpts) error) Option + func WithConnectTimeout(d time.Duration) Option + func WithConnectUserProperties(p []wire.UserProperty) Option + func WithConnectUserProperty(key, value string) Option + func WithCredentials(user string, pass []byte) Option + func WithDialFunc(fn transport.DialFunc) Option + func WithDialer(d *net.Dialer) Option + func WithDisconnectFlushTimeout(d time.Duration) Option + func WithDropPolicy(p DropPolicy) Option + func WithInboundTopicAliasMaximum(n uint16) Option + func WithKeepAlive(s uint16) Option + func WithLogger(l *slog.Logger) Option + func WithMaxSubscribeQueueSize(n int) Option + func WithMaximumPacketSize(n uint32) Option + func WithOnConnectError(fn func(error)) Option + func WithOnConnectionDown(fn func() bool) Option + func WithOnConnectionUp(fn func(*wire.Connack)) Option + func WithOnReconnectAttempt(fn func(attempt int, brokerURL string)) Option + func WithOnServerDisconnect(fn func(*wire.Disconnect)) Option + func WithPingTimeout(d time.Duration) Option + func WithPublishMode(m PublishMode) Option + func WithPublisherPool(size int) Option + func WithPublisherPoolClientIDFn(fn func(parent string, idx int) string) Option + func WithPublisherPoolRouting(p PoolRoutingPolicy) Option + func WithReceiveMaximum(n uint16) Option + func WithReconnectBackoff(b Backoff) Option + func WithRequestProblemInformation(b bool) Option + func WithRequestResponseInformation(b bool) Option + func WithSessionExpiry(s uint32) Option + func WithStats() Option + func WithStore(s session.Store) Option + func WithTLSConfig(t *tls.Config) Option + func WithWill(w *wire.WillOpts) Option + func WithWriteBatch(n int) Option + func WithWriteOverflowPolicy(p WriteOverflowPolicy) Option + func WithWriteQueueSize(n int) Option + func WithoutKeepAlive() Option + type PoolRoutingPolicy uint8 + const PoolRoutingHashByTopic + const PoolRoutingRoundRobin + func (p PoolRoutingPolicy) String() string + type PublishMode uint8 + const PublishFireAndForget + const PublishWaitForFlush + func (m PublishMode) String() string + type PublisherQueue interface + Ack func(ctx context.Context, token QueueToken) error + Close func() error + Enqueue func(ctx context.Context, entry QueueEntry) error + EvictHead func(ctx context.Context) (QueueEntry, bool, error) + Len func(ctx context.Context) (int, error) + PeekBatch func(ctx context.Context, n int) (entries []QueueEntry, tokens []QueueToken, err error) + type Queue struct + func NewQueue[T any]() *Queue[T] + func (q *Queue[T]) Close() + func (q *Queue[T]) Dequeue(ctx context.Context) (T, bool) + func (q *Queue[T]) Enqueue(item T) bool + func (q *Queue[T]) Len() int + func (q *Queue[T]) TryDequeue() (T, bool) + type QueueEntry struct + EnqueuedAt time.Time + Publish wire.PublishOpts + type QueueOption func(*queueConfig) + func WithDeadLetter(fn func(QueueEntry, error)) QueueOption + func WithQueueBatchSize(n int) QueueOption + func WithQueueDropPolicy(p DropPolicy) QueueOption + func WithQueueIdleInterval(d time.Duration) QueueOption + func WithQueueMaxSize(n int) QueueOption + func WithQueuePublishTimeout(d time.Duration) QueueOption + func WithQueueTTL(d time.Duration) QueueOption + type QueuePublisher struct + func NewQueuePublisher(client *Client, queue PublisherQueue, opts ...QueueOption) (*QueuePublisher, error) + func (p *QueuePublisher) Close(ctx context.Context) error + func (p *QueuePublisher) Publish(ctx context.Context, opts wire.PublishOpts) error + type QueueToken any + type Stats struct + ConnectFailures uint64 + Connects uint64 + Disconnects uint64 + InboundDropped uint64 + InboundPublishes uint64 + PingTimeouts uint64 + PoolFallbacks uint64 + PublishesAcked uint64 + PublishesInflight uint64 + PublishesReplayed uint64 + PublishesSent uint64 + ServerDisconnects uint64 + SubscribesSent uint64 + SubscriptionsActive uint64 + UnsubscribesSent uint64 + type SubscribeOption func(*subscribeConfig) + func SubAutoAck() SubscribeOption + func SubBuffer(n int) SubscribeOption + func SubDropPolicy(p DropPolicy) SubscribeOption + func SubMaxQueueSize(n int) SubscribeOption + func SubOnDrop(fn func(*Message)) SubscribeOption + type SubscriptionToken struct + type TopicFilter = wire.SubscribeFilter + type Typed struct + func NewTyped[T any](c *Client, codec Codec[T]) *Typed[T] + func (t *Typed[T]) Publish(ctx context.Context, opts wire.PublishOpts, v T) error + func (t *Typed[T]) Subscribe(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (<-chan *TypedMessage[T], SubscriptionToken, error) + func (t *Typed[T]) SubscribeQueue(ctx context.Context, filters []TopicFilter, opts ...SubscribeOption) (*Queue[*TypedMessage[T]], SubscriptionToken, error) + type TypedMessage struct + Topic string + Value T + type WriteOverflowPolicy uint8 + const WriteBlock + const WriteDropNewest + func (p WriteOverflowPolicy) String() string v0.9.0 May 22, 2026