mqtt

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2025 License: MIT Imports: 13 Imported by: 0

README

Gokit MQTT

License Go Doc Go Report Card Build Status

The mqtt package provides a robust and idiomatic implementation for working with MQTT brokers in Go. It includes features for publishing, subscribing, and managing MQTT connections, while adhering to Go best practices.

Features

  • Client Management: Establish and manage connections to MQTT brokers.
  • Publishing: Publish messages to topics with or without context deadlines.
  • Subscription Management: Register and consume messages from topics.
  • Error Handling: Comprehensive error handling for common MQTT operations.
  • Tracing: Integrated with OpenTelemetry for distributed tracing.

Installation

To use the mqtt package, add it to your project:

go get github.com/ralvescosta/gokit/mqtt

Usage

Creating an MQTT Client
import (
	"github.com/ralvescosta/gokit/mqtt"
    configsBuilder "github.com/ralvescosta/gokit/configs_builder"
)

func main() {
	cfgs, err := configsBuilder.
		NewConfigsBuilder().
		MQTT().
		Build()
	if err != nil {
		cfgs.Logger.Fatal(err.Error())
	}

	client := mqtt.NewMQTTClient(cfgs)

	if err := client.Connect(); err != nil {
		panic(err)
	}

	defer client.Client().Disconnect(250)
}
Publishing Messages
With Context Deadline
import (
	"context"
	"github.com/ralvescosta/gokit/mqtt"
)

func publishWithContext(publisher mqtt.Publisher) {
	ctx := context.Background()
	err := publisher.PubCtx(ctx, "example/topic", mqtt.AtLeastOnce, "Hello, MQTT!")
	if err != nil {
		panic(err)
	}
}
Without Context Deadline
func publishWithoutContext(publisher mqtt.Publisher) {
	err := publisher.Pub("example/topic", mqtt.AtLeastOnce, "Hello, MQTT!")
	if err != nil {
		panic(err)
	}
}
Subscribing to Topics
import (
	"os"
	"github.com/ralvescosta/gokit/mqtt"
)

func subscribe(dispatcher mqtt.Dispatcher) {
	handler := func(ctx context.Context, topic string, qos mqtt.QoS, payload []byte) error {
		fmt.Printf("Received message on topic %s: %s\n", topic, string(payload))
		return nil
	}

	err := dispatcher.Register("example/topic", mqtt.AtLeastOnce, handler)
	if err != nil {
		panic(err)
	}

	stop := make(chan os.Signal, 1)
	dispatcher.ConsumeBlocking(stop)
}

Error Handling

The mqtt package provides predefined errors for common issues:

  • ConnectionFailureError: Indicates a failure to connect to the MQTT broker.
  • EmptyTopicError: Indicates that the topic for a subscription cannot be empty.
  • NillHandlerError: Indicates that the handler for a subscription cannot be nil.
  • NillPayloadError: Indicates that the payload for a publish operation cannot be nil.
  • InvalidQoSError: Indicates that the provided QoS value is invalid.

Tracing

The mqtt package integrates with OpenTelemetry for distributed tracing. Each message handler creates a new span with the topic name as the span name.

License

This package is licensed under the MIT License. See the LICENSE file for details.

Contributing

Contributions are welcome! Please read the contributing guidelines before submitting a pull request.

Support

For support, please open an issue in the repository or contact the maintainers.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ConnectionFailureError indicates a failure to connect to the MQTT broker.
	ConnectionFailureError = NewError("connection failure")
	// EmptyTopicError indicates that the topic for a subscription cannot be an empty string.
	EmptyTopicError = NewError("subscribe topic cannot be an empty string")
	// NillHandlerError indicates that the handler for a subscription cannot be nil.
	NillHandlerError = NewError("subscribe handler cannot be nil")
	// NillPayloadError indicates that the payload for a publish operation cannot be nil.
	NillPayloadError = NewError("publish payload cannot be nil")
	// InvalidQoSError indicates that the provided QoS value is invalid.
	InvalidQoSError = NewError("qos must be one of: byte(0), byte(1), or byte(2)")
)

Functions

func LogMessage

func LogMessage(msg ...string) string

LogMessage formats and returns a log message with a consistent prefix for MQTT operations.

func NewError

func NewError(msg string) error

NewError creates a new instance of the custom Error type with the provided message.

func NewPublisher

func NewPublisher(configs *configs.Configs, client myQTT.Client) messaging.Publisher

NewPublisher creates a new instance of mqttPublisher.

func ValidateQoS

func ValidateQoS(qos QoS) bool

ValidateQoS checks if the provided QoS value is valid. It ensures the QoS is one of the defined levels: AtMostOnce, AtLeastOnce, or ExactlyOnce.

Types

type Dispatcher

type Dispatcher interface {
	// Register adds a new subscription to the dispatcher with the specified topic, QoS, and handler.
	// Returns an error if the topic is empty, the handler is nil, or the QoS is invalid.
	Register(topic string, qos QoS, handler Handler) error

	// ConsumeBlocking starts consuming messages for all registered subscriptions.
	// Blocks until a signal is received on the provided channel, at which point it unsubscribes from all topics.
	ConsumeBlocking()
}

Dispatcher is an interface for managing MQTT subscriptions and consuming messages.

func NewDispatcher

func NewDispatcher(logger logging.Logger, client myQTT.Client) Dispatcher

NewDispatcher initializes a new mqttDispatcher with the provided logger and MQTT client.

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error represents a custom error type for the MQTT package.

func (*Error) Error

func (e *Error) Error() string

Error returns the error message as a string.

type Handler

type Handler = func(ctx context.Context, topic string, qos QoS, payload []byte) error

Updated Handler type to include context.Context as the first argument.

type MQTTClient

type MQTTClient interface {
	// Connect establishes a connection to the MQTT broker.
	Connect() error
	// Client returns the underlying MQTT client instance.
	Client() myQTT.Client
}

MQTTClient defines the interface for an MQTT client.

func NewMQTTClient

func NewMQTTClient(cfgs *configs.Configs) MQTTClient

NewMQTTClient creates a new instance of mqttClient.

type QoS

type QoS byte

QoS represents the Quality of Service levels for MQTT messages. It defines the reliability of message delivery between the client and broker. - AtMostOnce: Messages are delivered at most once (0). - AtLeastOnce: Messages are delivered at least once (1). - ExactlyOnce: Messages are delivered exactly once (2).

const (
	// AtMostOnce represents QoS level 0.
	AtMostOnce QoS = 0
	// AtLeastOnce represents QoS level 1.
	AtLeastOnce QoS = 1
	// ExactlyOnce represents QoS level 2.
	ExactlyOnce QoS = 2
)

func QoSFromBytes

func QoSFromBytes(qos byte) QoS

QoSFromBytes converts a byte value to a QoS type. It maps the byte to one of the defined QoS levels.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL