messanger

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2025 License: Apache-2.0 Imports: 16 Imported by: 4

Documentation

Overview

Package messanger provides a unified interface for publish-subscribe messaging in the Otto IoT framework. It supports multiple messaging backends including local in-process messaging and MQTT-based distributed messaging.

The package implements a topic-based routing system where messages are published to topics and delivered to all subscribers of those topics. Topics follow the MQTT topic format with hierarchical paths separated by slashes (e.g., "ss/c/station/sensor").

Key Components:

  • Messanger interface: Core abstraction for all messaging implementations
  • MessangerLocal: In-process messaging with wildcard topic support
  • MessangerMQTT: MQTT broker-based distributed messaging
  • Msg: Message structure containing topic, payload, and metadata
  • Topics: Topic format validation and management

Example Usage:

// Create a local messanger
msg := messanger.NewMessanger("local")

// Subscribe to a topic
msg.Subscribe("ss/c/station/+", func(m *messanger.Msg) error {
    fmt.Printf("Received: %s\n", m.String())
    return nil
})

// Publish a message
msg.Pub("ss/c/station/temp", "25.5")

mqtt_mock.go provides a comprehensive mock implementation of the Eclipse Paho MQTT client interface for testing purposes.

The mock client simulates MQTT broker behavior without requiring an actual network connection or broker. This enables:

  • Fast, isolated unit tests
  • Deterministic test behavior
  • Testing error conditions and edge cases
  • CI/CD environments without external dependencies

Key Features:

  • Track all published messages for assertions
  • Simulate connection, subscription, and publish operations
  • Inject errors to test error handling
  • Manually trigger message delivery to handlers
  • Thread-safe for concurrent test scenarios

Example:

mock := NewMockClient()
mqtt := NewMQTT("test", "mock", "")
mqtt.SetMQTTClient(mock)

// Subscribe and trigger message
mqtt.Subscribe("test/topic", handler)
mock.SimulateMessage("test/topic", []byte("test data"))

// Verify publications
pubs := mock.GetPublications()
assert.Equal(t, 1, len(pubs))

Index

Constants

This section is empty.

Variables

View Source
var GetMQTT = func() *MQTT {
	if mqtt == nil {
		mqtt = &MQTT{
			id:     "default",
			Broker: "localhost",
		}
	}
	return mqtt
}

GetMQTT returns the singleton MQTT client instance, creating it with default settings if it doesn't exist yet.

Default settings:

  • ID: "default"
  • Broker: "localhost"

Returns a pointer to the MQTT client.

Note: This is a variable (not a const function) to allow test code to override it if needed.

Functions

func Bytes

func Bytes(data any) ([]byte, error)

Bytes converts various data types to a byte slice for use in message payloads. This utility function handles common types used in IoT applications.

Supported types:

  • []byte: Returned as-is
  • string: Converted to UTF-8 bytes
  • int: Formatted as decimal string
  • bool: Converted to "true" or "false"
  • float64: Formatted with 2 decimal places (e.g., "25.50")

Parameters:

  • data: The value to convert to bytes

Returns the byte representation and an error if the type is not supported.

Example:

bytes, err := Bytes(25.5)  // Returns []byte("25.50"), nil

func StartMQTTBroker added in v0.0.10

func StartMQTTBroker(ctx context.Context) (func(context.Context) error, error)

func StopMQTTBroker added in v0.0.11

func StopMQTTBroker(ctx context.Context) error

func ValidateTopic

func ValidateTopic(topic string) bool

ValidateTopic checks if a topic string follows Otto's topic format conventions. A valid Otto topic must have:

  • At least 4 segments separated by '/'
  • First segment must be "ss" (namespace)
  • Second segment must be "c" (control) or "d" (data)
  • Third segment (station ID) must not be empty
  • Fourth segment (sensor/command) must not be empty

Parameters:

  • topic: The topic string to validate (e.g., "ss/c/station1/temp")

Returns true if the topic is valid, false otherwise.

Example:

valid := ValidateTopic("ss/c/station1/temp")  // Returns true
valid := ValidateTopic("invalid/topic")       // Returns false

Types

type Config added in v0.0.11

type Config struct {
	// Broker is the address of the MQTT broker (hostname or IP)
	Broker   string `json:"broker"`
	Username string `json:"username"`
	Password string `json:"password"`
}

MessangerConfig holds configuration parameters for messanger initialization. Currently supports broker address configuration for MQTT-based messangers.

type MQTT

type MQTT struct {
	Broker   string // Broker hostname or IP (no protocol/port)
	Username string // MQTT authentication username
	Password string // MQTT authentication password
	Debug    bool   // Enable Paho MQTT client debug logging

	gomqtt.Client // Embedded Paho MQTT client
	// contains filtered or unexported fields
}

MQTT wraps the Eclipse Paho MQTT Go client with Otto-specific configuration and error handling. It provides a simplified interface for connecting to MQTT brokers and publishing/subscribing to topics.

The wrapper handles:

  • Connection management with automatic reconnection
  • Authentication (username/password)
  • Debug logging when enabled
  • Error tracking
  • Mock client injection for testing

Default Configuration:

  • Port: 1883 (standard MQTT)
  • Protocol: TCP
  • Username: "otto"
  • Password: "otto123"
  • Clean Session: true

Example:

client := NewMQTT("sensor1", "mqtt.example.com", "")
err := client.Connect()
if err != nil {
    log.Fatal(err)
}
client.Publish("sensors/temp", "25.5")

func NewMQTT

func NewMQTT(id string, broker string, topics string) *MQTT

NewMQTT creates a new MQTT client instance with default credentials. The client is not connected until Connect() is called.

Parameters:

  • id: Unique client identifier (used for MQTT client ID)
  • broker: Broker hostname or IP address (without tcp:// prefix or port)
  • topics: Deprecated/unused parameter, kept for compatibility

Returns a pointer to the initialized MQTT client.

Example:

client := NewMQTT("sensor1", "localhost", "")
err := client.Connect()

func (*MQTT) Close

func (m *MQTT) Close()

Close gracefully disconnects from the MQTT broker. It waits up to 1000ms for in-flight messages to complete before disconnecting.

After Close() is called, the client should not be used for further operations without reconnecting via Connect().

Example:

defer client.Close()  // Ensure cleanup on exit

func (*MQTT) Connect

func (m *MQTT) Connect() error

Connect establishes a connection to the configured MQTT broker. It configures the MQTT client with the appropriate options and attempts to connect, waiting for the operation to complete.

Configuration precedence:

  1. Explicitly set values on the MQTT struct
  2. MQTT_BROKER environment variable (for broker address)
  3. Default values ("localhost", "otto", "otto123")

Connection parameters:

  • URL format: tcp://broker:1883
  • Clean Session: true (start fresh on each connection)
  • QoS: 0 (fire and forget)

If Debug is enabled, MQTT client debug and error logs are written to the default logger.

Returns an error if the connection fails, nil on success.

Example:

client := NewMQTT("sensor1", "mqtt.example.com", "")
err := client.Connect()
if err != nil {
    log.Fatalf("Failed to connect: %v", err)
}

func (*MQTT) Error

func (m *MQTT) Error() error

Error returns the last error encountered by the MQTT client. Returns nil if no error has occurred.

func (*MQTT) ID

func (m *MQTT) ID() string

ID returns the unique identifier for this MQTT client.

func (*MQTT) IsConnected

func (m *MQTT) IsConnected() bool

IsConnected checks whether the MQTT client is currently connected to the broker.

Returns true if connected, false otherwise.

Example:

if !client.IsConnected() {
    client.Connect()
}

func (*MQTT) Publish

func (m *MQTT) Publish(topic string, value any) error

Publish sends a message to the specified MQTT topic. The message is sent with QoS 0 (at most once) and is not retained by the broker.

Parameters:

  • topic: The MQTT topic to publish to (must not be empty)
  • value: The message payload (any type accepted by the MQTT client)

Returns an error if:

  • The topic is empty
  • The client is not connected
  • The publish operation fails

Example:

err := client.Publish("sensors/temp", "25.5")
if err != nil {
    log.Printf("Publish failed: %v", err)
}

func (*MQTT) SetMQTTClient added in v0.0.10

func (m *MQTT) SetMQTTClient(c gomqtt.Client) *MQTT

SetMQTTClient injects a custom or mock MQTT client for testing purposes. This allows unit tests to use a mock client instead of connecting to a real broker.

Parameters:

  • c: The client to use (typically a MockClient for testing)

Returns the MQTT instance for method chaining.

Example:

mockClient := NewMockClient()
mqtt.SetMQTTClient(mockClient)

func (*MQTT) Subscribe

func (m *MQTT) Subscribe(topic string, f MsgHandler) error

Subscribe registers a message handler for the specified MQTT topic pattern. When messages matching the topic are received, they are converted to Otto's Msg format and passed to the handler function.

The subscription uses QoS 0 (at most once delivery).

MQTT Wildcard Support:

  • '+' matches exactly one topic level (e.g., "sensors/+/temp")
  • '#' matches zero or more levels (e.g., "sensors/#")

Parameters:

  • topic: The MQTT topic pattern to subscribe to
  • f: The handler function to invoke for each received message

Returns an error if:

  • The client is not connected
  • The subscription operation fails

Example:

client.Subscribe("sensors/+/temp", func(msg *Msg) error {
    fmt.Printf("Temp: %s\n", msg.String())
    return nil
})

TODO: Add automatic re-subscription when reconnecting to the broker.

type MessageHandler

type MessageHandler interface {
	HandleMsg(msg *Msg) error
}

MessageHandler is an interface for types that can handle messages. This provides an alternative to the MsgHandler function type for implementing message handling logic as methods on types.

type Messanger

type Messanger interface {
	// ID returns the unique identifier for this messanger instance
	ID() string

	// Connect establishes connection to the underlying messaging backend.
	// For local messangers this is a no-op. For MQTT it connects to the broker.
	Connect() error

	// Subscribe registers a handler function to receive messages on the given topic.
	// The topic can include MQTT wildcards: '+' for single level, '#' for multi-level.
	// Returns an error if subscription fails.
	Subscribe(topic string, handler MsgHandler) error

	// Pub publishes data to the specified topic. The data can be any type that
	// can be converted to []byte (string, []byte, int, bool, float64).
	// Returns an error if publishing fails.
	Pub(topic string, msg any) error

	// PubMsg publishes a pre-constructed Msg to its embedded topic.
	// This is useful when you need more control over message metadata.
	// Returns an error if publishing fails.
	PubMsg(msg *Msg) error

	// Error returns the last error encountered by the messanger, if any
	Error() error

	// Close cleanly shuts down the messanger, closing connections and
	// cleaning up resources
	Close()
}

Messanger is the core interface that all messaging implementations must satisfy. It provides methods for connecting to a messaging backend, subscribing to topics, publishing messages, and managing the messanger lifecycle.

Implementations include:

  • MessangerLocal: In-process messaging with wildcard topic routing
  • MessangerMQTT: MQTT broker-based distributed messaging

Thread Safety: Implementations should be safe for concurrent use.

func GetMessanger

func GetMessanger() Messanger

GetMessanger returns the singleton Messanger instance created by NewMessanger. This function is thread-safe and can be called from multiple goroutines. Returns nil if no messanger has been created yet.

Example:

msg := messanger.GetMessanger()
if msg != nil {
    msg.Pub("ss/c/station/status", "online")
}

func NewMessanger

func NewMessanger(broker string) (m Messanger)

The created messanger becomes the global singleton accessible via GetMessanger(). If an invalid ID is provided, logs an error and returns nil.

Example:

msg := messanger.NewMessanger("local")
if msg == nil {
    log.Fatal("Failed to create messanger")
}

type MessangerBase

type MessangerBase struct {
	Published int // Count of messages published through this messanger
	// contains filtered or unexported fields
}

MessangerBase provides a base implementation of the Messanger interface with common functionality that can be embedded in specific messanger types. It handles subscription tracking, published message counting, and basic error management.

This type is typically embedded in concrete implementations like MessangerLocal and MessangerMQTT rather than used directly.

func NewMessangerBase

func NewMessangerBase(id string) *MessangerBase

NewMessangerBase creates a new MessangerBase instance with the given ID. It initializes the subscription map and sets up the base structure. This is typically called by concrete messanger implementations.

Parameters:

  • id: Unique identifier for the messanger instance

Returns a pointer to the initialized MessangerBase.

func (*MessangerBase) Close

func (mb *MessangerBase) Close()

Close cleanly shuts down the messanger. This base implementation is a no-op that just logs the close operation.

Concrete messanger implementations should override this method to perform actual cleanup (e.g., closing network connections, unsubscribing, etc.).

func (*MessangerBase) Error

func (mb *MessangerBase) Error() error

Error returns the last error encountered by this messanger, if any. Returns nil if no error has occurred.

func (*MessangerBase) ID

func (mb *MessangerBase) ID() string

ID returns the unique identifier of this MessangerBase instance.

func (*MessangerBase) Pub added in v0.0.11

func (mb *MessangerBase) Pub(topic string, data any) error

func (*MessangerBase) PubMsg

func (mb *MessangerBase) PubMsg(msg *Msg) error

PubMsg publishes a pre-constructed Msg structure. This base implementation only increments the Published counter for metrics.

Concrete messanger implementations should override this method to perform actual message publishing (e.g., sending via MQTT or local routing).

Parameters:

  • msg: The message to publish (must not be nil)

Returns an error if msg is nil, otherwise returns nil.

func (MessangerBase) ServeHTTP

func (m MessangerBase) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler to provide a REST API endpoint for inspecting messanger state. It returns a JSON response containing the messanger ID, list of subscribed topics, and count of published messages.

Response format:

{
  "ID": "messanger-id",
  "Subs": ["topic1", "topic2"],
  "Published": 42
}

This is useful for debugging and monitoring the messanger's state.

func (*MessangerBase) Subscribe

func (mb *MessangerBase) Subscribe(topic string, handler MsgHandler) error

Subscribe stores a topic subscription locally in the base implementation. It maps the topic string to the handler function for later retrieval.

This base implementation only stores the subscription locally. Concrete messanger implementations should override this method to perform actual subscription logic (e.g., subscribing to an MQTT broker).

Parameters:

  • topic: The topic pattern to subscribe to (may include wildcards)
  • handler: The callback function to invoke when messages arrive on this topic

Returns nil on success, or an error if subscription fails.

type MessangerLocal

type MessangerLocal struct {
	*MessangerBase
	sync.Mutex `json:"-"`
}

MessangerLocal provides an in-process messaging implementation that routes messages through a local topic tree without requiring an external broker. It supports MQTT-style wildcard subscriptions using a tree-based routing algorithm.

This implementation is useful for:

  • Testing message handlers without needing a broker
  • Single-process applications where distributed messaging isn't needed
  • Development and debugging
  • Embedded systems with limited resources

Wildcard Support:

  • '+' matches exactly one topic level (e.g., "ss/c/+/temp")
  • '#' matches zero or more levels (e.g., "ss/c/#")

Thread Safety: MessangerLocal is safe for concurrent use via embedded mutex.

Example:

msg := NewMessangerLocal("local")
msg.Subscribe("ss/c/+/temp", func(m *Msg) error {
    fmt.Printf("Temperature: %s\n", m.String())
    return nil
})
msg.Pub("ss/c/station1/temp", "25.5")

func NewMessangerLocal

func NewMessangerLocal(id string) *MessangerLocal

NewMessangerLocal creates and initializes a new local in-process messanger. The messanger is immediately ready for use without requiring a Connect() call.

Parameters:

  • id: Unique identifier for this messanger instance

Returns a pointer to the initialized MessangerLocal.

Example:

msg := NewMessangerLocal("test-local")

func (*MessangerLocal) Close

func (m *MessangerLocal) Close()

Close cleanly shuts down the local messanger by removing all subscriptions from the routing tree and clearing local state.

This method is thread-safe via the embedded mutex.

After Close() is called, the messanger should not be used for publishing or subscribing.

func (*MessangerLocal) Connect added in v0.0.10

func (m *MessangerLocal) Connect() error

Connect is a no-op for local messangers since no external connection is needed. It exists to satisfy the Messanger interface.

Always returns nil (success).

func (*MessangerLocal) Error

func (m *MessangerLocal) Error() error

Error returns the last error encountered by this messanger, if any. Returns nil if no error has occurred.

func (*MessangerLocal) ID

func (m *MessangerLocal) ID() string

ID returns the unique identifier for this local messanger instance.

func (*MessangerLocal) Pub

func (m *MessangerLocal) Pub(topic string, value any) error

Pub publishes a value to the specified topic through the local routing tree. The value is converted to bytes and wrapped in a Msg structure before delivery.

Supported value types: []byte, string, int, bool, float64

Parameters:

  • topic: The complete topic path to publish to (e.g., "ss/c/station/led")
  • value: The data to publish (will be converted to []byte)

Returns an error if:

  • topic is empty
  • no subscribers exist
  • value cannot be converted to bytes
  • no matching handlers are found in the routing tree

Example:

msg := NewMessangerLocal("local")
msg.Subscribe("ss/c/+/led", ledHandler)
msg.Pub("ss/c/station1/led", "on")  // Delivered to ledHandler

func (*MessangerLocal) PubMsg

func (m *MessangerLocal) PubMsg(msg *Msg) error

PubMsg publishes a pre-constructed message through the local routing tree. The message is looked up in the routing tree and delivered to all matching handlers.

Parameters:

  • msg: The message to publish (must not be nil)

Returns an error if:

  • msg is nil
  • no handlers match the message topic

Example:

msg := NewMsg("ss/c/station1/led", []byte("on"), "controller")
messanger.PubMsg(msg)

func (*MessangerLocal) Subscribe

func (m *MessangerLocal) Subscribe(topic string, handler MsgHandler) error

Subscribe registers a message handler for the given topic pattern in the local routing tree. The topic can include MQTT-style wildcards for flexible matching.

The handler is inserted into the topic routing tree and stored in the base subscription map for tracking.

Wildcard Examples:

  • "ss/c/+/temp" matches "ss/c/station1/temp", "ss/c/station2/temp", etc.
  • "ss/c/#" matches all topics starting with "ss/c/"
  • "ss/c/station/+" matches any single-level child of "ss/c/station/"

Parameters:

  • topic: The topic pattern to subscribe to (with optional wildcards)
  • handler: The callback function to invoke when matching messages arrive

Returns nil on success, or an error if subscription fails.

type MessangerMQTT

type MessangerMQTT struct {
	*MessangerBase
	*MQTT
	sync.Mutex `json:"-"`
}

MessangerMQTT provides distributed messaging through an MQTT broker. It implements the Messanger interface by wrapping an MQTT client and providing publish-subscribe capabilities across a network.

This implementation is suitable for:

  • Distributed IoT systems with multiple devices/stations
  • Cloud-connected applications
  • Systems requiring reliable message delivery
  • Multi-node deployments

The messanger connects to an MQTT broker (like Mosquitto, EMQX, or the embedded broker) and handles topic subscriptions, message publishing, and connection management.

Thread Safety: MessangerMQTT is safe for concurrent use via embedded mutex.

Example:

msg := NewMessangerMQTT("mqtt-client", "localhost")
err := msg.Connect()
if err != nil {
    log.Fatal(err)
}
msg.Subscribe("ss/c/+/temp", tempHandler)
msg.Pub("ss/d/station1/temp", "25.5")

func NewMessangerMQTT

func NewMessangerMQTT(id string, broker string) *MessangerMQTT

NewMessangerMQTT creates a new MQTT messanger instance that connects to the specified broker without authentication.

The messanger uses default credentials (otto/otto123) which can be overridden by using NewMessangerMQTTWithAuth() instead.

Parameters:

  • id: Unique identifier for this messanger instance (used as MQTT client ID)
  • broker: Address of the MQTT broker (hostname or IP, without protocol or port)

Returns a pointer to the initialized MessangerMQTT. Call Connect() to establish the broker connection.

Example:

msg := NewMessangerMQTT("sensor1", "localhost")
err := msg.Connect()

func NewMessangerMQTTWithAuth added in v0.0.10

func NewMessangerMQTTWithAuth(id string, broker string, username string, password string) *MessangerMQTT

NewMessangerMQTTWithAuth creates a new MQTT messanger instance with custom authentication credentials for connecting to a secured MQTT broker.

Special broker values:

  • "mock": Uses a mock MQTT client for testing (no actual network connection)

Parameters:

  • id: Unique identifier for this messanger (used as MQTT client ID)
  • broker: Address of the MQTT broker (hostname or IP)
  • username: MQTT authentication username (empty for default "otto")
  • password: MQTT authentication password (empty for default "otto123")

Returns a pointer to the initialized MessangerMQTT. Call Connect() to establish the broker connection.

Example:

msg := NewMessangerMQTTWithAuth("sensor1", "mqtt.example.com", "user", "pass")
err := msg.Connect()

func (*MessangerMQTT) Close

func (m *MessangerMQTT) Close()

Close cleanly shuts down the MQTT messanger by:

  1. Closing the MQTT broker connection
  2. Clearing local subscription tracking

This method is thread-safe via the embedded mutex. After Close() is called, the messanger should not be used.

Example:

defer msg.Close()  // Ensure cleanup on exit

func (*MessangerMQTT) Connect added in v0.0.10

func (m *MessangerMQTT) Connect() error

Connect establishes a connection to the MQTT broker and subscribes to any topics that were registered before connection was established.

This method:

  1. Connects to the configured MQTT broker
  2. Subscribes to all topics in the local subscription map
  3. Logs errors for any failed subscriptions but continues with others

Returns an error if the initial broker connection fails. Individual subscription failures are logged but don't cause the method to fail.

Example:

msg := NewMessangerMQTT("client", "localhost")
msg.Subscribe("ss/c/+/led", ledHandler)  // Registered but not active yet
err := msg.Connect()  // Now connected and subscribed to ss/c/+/led

func (*MessangerMQTT) Error

func (m *MessangerMQTT) Error() error

Error returns the last error from the underlying MQTT client, if any. Returns nil if no MQTT client exists or no error has occurred.

func (*MessangerMQTT) ID

func (m *MessangerMQTT) ID() string

ID returns the unique identifier for this MQTT messanger instance.

func (*MessangerMQTT) Pub

func (m *MessangerMQTT) Pub(topic string, value any) error

Pub publishes data to the specified topic via MQTT. The data is sent to all subscribers of that topic across the network.

Parameters:

  • topic: The MQTT topic to publish to (e.g., "ss/d/station1/temp")
  • value: The data to publish (any type accepted by MQTT client)

Returns nil on success. Error handling is currently limited; check logs for issues.

Example:

msg := NewMessangerMQTT("client", "localhost")
msg.Connect()
msg.Pub("ss/d/station1/temp", "25.5")

func (*MessangerMQTT) PubMsg

func (m *MessangerMQTT) PubMsg(msg *Msg) error

PubMsg publishes a pre-constructed Msg structure via MQTT. The message's topic and data fields are used for the MQTT publication.

Parameters:

  • msg: The message to publish (must not be nil)

Returns an error if msg is nil, otherwise returns nil.

Example:

msg := NewMsg("ss/d/station1/temp", []byte("25.5"), "sensor")
messanger.PubMsg(msg)

func (*MessangerMQTT) Subscribe

func (m *MessangerMQTT) Subscribe(topic string, handler MsgHandler) error

Subscribe registers a message handler for the given topic pattern on the MQTT broker. The topic pattern can include MQTT wildcards ('+' for single level, '#' for multi-level).

The subscription is stored locally and, if already connected to the broker, is immediately activated. If not yet connected, it will be activated when Connect() is called.

Parameters:

  • topic: The MQTT topic pattern to subscribe to (e.g., "ss/c/+/temp")
  • handler: The callback function to invoke when matching messages arrive

Returns an error if the broker subscription fails (when connected).

Example:

msg := NewMessangerMQTT("client", "localhost")
msg.Connect()
msg.Subscribe("ss/c/station1/#", func(m *Msg) error {
    fmt.Printf("Received: %s\n", m.String())
    return nil
})

type MockClient

type MockClient struct {
	LastTopic   string // Last topic published to (deprecated)
	LastMessage string // Last message published (deprecated)
	// contains filtered or unexported fields
}

MockClient implements the complete gomqtt.Client interface for testing. It simulates an MQTT client without requiring a real broker connection.

The mock tracks all operations (connect, publish, subscribe) and allows tests to:

  • Verify publications via GetPublications()
  • Verify subscriptions via GetSubscriptions()
  • Inject errors via SetConnectError(), SetPublishError(), etc.
  • Simulate incoming messages via SimulateMessage()
  • Control connection state

Thread Safety: All methods are protected by an RWMutex for concurrent access.

Example:

mock := NewMockClient()
mock.Connect()  // Always succeeds unless SetConnectError() was called
mock.Publish("test/topic", 0, false, "payload")
pubs := mock.GetPublications()
assert.Equal(t, 1, len(pubs))

func NewMockClient

func NewMockClient() *MockClient

NewMockClient creates and initializes a new mock MQTT client. The client starts in a disconnected state with no subscriptions.

Returns a pointer to the mock client.

Example:

mock := NewMockClient()
mqtt := NewMQTT("test", "mock", "")
mqtt.SetMQTTClient(mock)

func (*MockClient) AddRoute

func (m *MockClient) AddRoute(topic string, callback gomqtt.MessageHandler)

Routing methods

func (*MockClient) ClearPublications

func (m *MockClient) ClearPublications()

ClearPublications clears the publication history

func (*MockClient) Connect

func (m *MockClient) Connect() gomqtt.Token

func (*MockClient) Disconnect

func (m *MockClient) Disconnect(quiesce uint)

func (*MockClient) GetLastPublication

func (m *MockClient) GetLastPublication() *Publication

GetLastPublication returns the most recent publication

func (*MockClient) GetPublications

func (m *MockClient) GetPublications() []Publication

GetPublications returns all publications made through this mock client

func (*MockClient) GetSubscriptions

func (m *MockClient) GetSubscriptions() map[string]Subscription

GetSubscriptions returns all active subscriptions

func (*MockClient) HasSubscription

func (m *MockClient) HasSubscription(topic string) bool

HasSubscription checks if a topic is subscribed

func (*MockClient) ID added in v0.0.10

func (m *MockClient) ID() string

ID returns the identifier for this mock client. Always returns "mock" to distinguish it from real clients.

func (*MockClient) IsConnected

func (m *MockClient) IsConnected() bool

Connection methods

func (*MockClient) IsConnectionOpen

func (m *MockClient) IsConnectionOpen() bool

func (*MockClient) IsDisconnectCalled

func (m *MockClient) IsDisconnectCalled() bool

IsDisconnectCalled returns true if Disconnect was called

func (*MockClient) OptionsReader

func (m *MockClient) OptionsReader() gomqtt.ClientOptionsReader

Configuration methods

func (*MockClient) Publish

func (m *MockClient) Publish(topic string, qos byte, retained bool, payload interface{}) gomqtt.Token

Publishing methods

func (*MockClient) RemoveRoute

func (m *MockClient) RemoveRoute(topic string)

func (*MockClient) Reset

func (m *MockClient) Reset()

Reset clears all mock state

func (*MockClient) SetConnectError

func (m *MockClient) SetConnectError(err error)

SetConnectError configures the mock to return an error on Connect()

func (*MockClient) SetOrderMatters

func (m *MockClient) SetOrderMatters(matter bool)

func (*MockClient) SetPublishError

func (m *MockClient) SetPublishError(err error)

SetPublishError configures the mock to return an error on Publish()

func (*MockClient) SetSubscribeError

func (m *MockClient) SetSubscribeError(err error)

SetSubscribeError configures the mock to return an error on Subscribe()

func (*MockClient) SimulateConnectionLost

func (m *MockClient) SimulateConnectionLost(err error)

SimulateConnectionLost simulates a connection loss

func (*MockClient) SimulateMessage

func (m *MockClient) SimulateMessage(topic string, payload []byte) error

SimulateMessage triggers a message delivery to subscribed handlers This allows testing of message handling logic

func (*MockClient) Subscribe

func (m *MockClient) Subscribe(topic string, qos byte, callback gomqtt.MessageHandler) gomqtt.Token

Subscription methods

func (*MockClient) SubscribeMultiple

func (m *MockClient) SubscribeMultiple(filters map[string]byte, callback gomqtt.MessageHandler) gomqtt.Token

func (*MockClient) Unsubscribe

func (m *MockClient) Unsubscribe(topics ...string) gomqtt.Token

type MockClientOptionsReader

type MockClientOptionsReader struct {
	// contains filtered or unexported fields
}

MockClientOptionsReader is a simple stub implementation of the gomqtt.ClientOptionsReader interface for testing.

func (*MockClientOptionsReader) ClientID

func (m *MockClientOptionsReader) ClientID() string

ClientID returns the client identifier.

type MockMessage

type MockMessage struct {
	// contains filtered or unexported fields
}

MockMessage implements the gomqtt.Message interface for testing. It represents an MQTT message received by a subscriber.

func NewMockMessage

func NewMockMessage(topic string, payload []byte) *MockMessage

NewMockMessage creates a new mock MQTT message for testing. The message is created with QoS 0 and not retained.

Parameters:

  • topic: The topic the message is published to
  • payload: The message payload as bytes

Returns a pointer to the mock message.

func (*MockMessage) Ack

func (m *MockMessage) Ack()

Ack marks the message as acknowledged.

func (*MockMessage) Duplicate

func (m *MockMessage) Duplicate() bool

Duplicate returns whether this is a duplicate message delivery.

func (*MockMessage) MessageID

func (m *MockMessage) MessageID() uint16

MessageID returns the unique identifier for this message.

func (*MockMessage) Payload

func (m *MockMessage) Payload() []byte

Payload returns the message payload as bytes.

func (*MockMessage) Qos

func (m *MockMessage) Qos() byte

Qos returns the Quality of Service level for this message.

func (*MockMessage) Retained

func (m *MockMessage) Retained() bool

Retained returns whether this message was retained by the broker.

func (*MockMessage) Topic

func (m *MockMessage) Topic() string

Topic returns the topic this message was published to.

type MockToken

type MockToken struct {
	// contains filtered or unexported fields
}

MockToken implements the gomqtt.Token interface for testing. It represents the result of an asynchronous MQTT operation (connect, publish, subscribe). The token tracks whether the operation completed and any error that occurred.

func NewMockToken

func NewMockToken(err error) *MockToken

NewMockToken creates a new mock token with the specified error state. Pass nil for a successful operation token.

Parameters:

  • err: The error to return, or nil for success

Returns a pointer to the mock token.

func (*MockToken) Done

func (t *MockToken) Done() <-chan struct{}

Done returns a channel that is closed when the operation completes. For the mock, the channel is already closed (operation is instant).

func (*MockToken) Error

func (t *MockToken) Error() error

Error returns the error from the operation, if any. Returns nil if the operation succeeded.

func (*MockToken) Wait

func (t *MockToken) Wait() bool

Wait blocks until the operation completes (immediately for mock). Returns true if the operation succeeded, false if it failed.

func (*MockToken) WaitTimeout

func (t *MockToken) WaitTimeout(timeout time.Duration) bool

WaitTimeout waits for the operation to complete with a timeout. For the mock, this behaves the same as Wait() and returns immediately.

Parameters:

  • timeout: Maximum time to wait (ignored in mock)

Returns true if the operation succeeded, false if it failed.

type Msg

type Msg struct {
	ID     int64    `json:"id"`
	Topic  string   `json:"topic"`
	Path   []string `json:"path"`
	Args   []string `json:"args"`
	Data   []byte   `json:"msg"`
	Source string   `json:"source"`
	Valid  bool     `json:"valid"`

	Timestamp time.Duration `json:"timestamp"`
}

Msg represents a message in the Otto messaging system. It contains the message payload along with routing information and metadata for tracking and debugging.

Each message has a globally unique ID and timestamp for tracing message flow through the system. The topic is parsed into a path array for efficient routing.

Fields:

  • ID: Unique monotonically increasing identifier for this message
  • Topic: Full topic path (e.g., "ss/c/station/temp")
  • Path: Topic split into segments for routing (e.g., ["ss", "c", "station", "temp"])
  • Args: Optional arguments extracted from the topic path
  • Data: The message payload as a byte array
  • Source: Identifier of the component that created this message
  • Valid: Whether the topic follows Otto's topic format conventions
  • Timestamp: When the message was created

Messages are immutable once created and should be created using NewMsg().

func NewMsg

func NewMsg(topic string, data []byte, source string) *Msg

NewMsg creates a new message with the given topic, payload data, and source identifier. The topic is automatically parsed into path segments for routing, and the message is validated against Otto's topic format.

If message saving is enabled via GetMsgSaver().StartSaving(), the message will be recorded for debugging purposes.

Parameters:

  • topic: The full topic path (e.g., "ss/c/station/temp")
  • data: The message payload as bytes
  • source: Identifier of the component creating this message (e.g., "mqtt-sub", "local")

Returns a pointer to the newly created message.

Example:

msg := NewMsg("ss/c/station1/temp", []byte("25.5"), "sensor")
fmt.Printf("Message ID: %d, Topic: %s\n", msg.ID, msg.Topic)

func (*Msg) Bool added in v0.0.10

func (msg *Msg) Bool() bool

Bool interprets the message payload as a boolean value. It recognizes common boolean string representations:

  • true: "true", "1", "on", "yes"
  • false: everything else

Returns true if the payload matches a truthy value, false otherwise.

Example:

msg := NewMsg("ss/c/station/led", []byte("on"), "controller")
if msg.Bool() {
    fmt.Println("LED is on")
}

func (*Msg) Byte

func (msg *Msg) Byte() []byte

Byte returns the raw byte array payload of the message. This is an alias for accessing the Data field directly.

func (*Msg) Dump

func (msg *Msg) Dump() string

Dump returns a human-readable multi-line string representation of the message including all fields. This is useful for debugging and logging.

Returns a formatted string with ID, Path, Args, Source, Timestamp, and Data.

Example output:

  ID: 123
Path: ["ss" "c" "station1" "temp"]
Args: []
 Src: sensor
Time: 1234567890
Data: 25.5

func (*Msg) Float64

func (msg *Msg) Float64() float64

Float64 parses the message payload as a floating-point number. The payload should be a string representation of a number.

Returns the parsed float64 value. If parsing fails, returns 0.0.

Example:

msg := NewMsg("ss/d/station/temp", []byte("25.5"), "sensor")
temp := msg.Float64()  // Returns 25.5

func (*Msg) IsJSON

func (msg *Msg) IsJSON() bool

IsJSON checks whether the message payload is valid JSON. This is useful for determining if the payload needs JSON parsing or should be treated as a plain string/binary data.

Returns true if the payload is valid JSON, false otherwise.

func (*Msg) JSON

func (msg *Msg) JSON() ([]byte, error)

JSON encodes the entire message (not just the payload) as JSON. This serializes the Msg struct including all fields (ID, Topic, Data, etc.).

Returns the JSON byte array and any encoding error.

Note: To decode just the payload as JSON, use Map() instead.

func (*Msg) Last

func (msg *Msg) Last() string

Last returns the last segment of the message topic path. This is typically the sensor or command name.

Returns the last path element, or empty string if the path is empty.

Example:

msg := NewMsg("ss/c/station1/temp", []byte("25"), "sensor")
last := msg.Last()  // Returns "temp"

func (*Msg) Map

func (msg *Msg) Map() (map[string]interface{}, error)

Map decodes the message payload as a JSON object into a map. The payload must be valid JSON object format.

Returns a map with string keys and interface{} values, or an error if the payload is not valid JSON or not an object.

Example:

msg := NewMsg("ss/d/station/sensor", []byte(`{"temp":25.5,"humidity":60}`), "sensor")
data, err := msg.Map()
if err == nil {
    temp := data["temp"].(float64)
}

func (*Msg) Station

func (msg *Msg) Station() string

Station extracts the station identifier from the message topic. Otto topics follow the format "ss/[c|d]/station/sensor", where the third element (index 2) is the station ID.

Returns the station ID string, or empty string if the topic doesn't have enough elements (less than 3).

Example:

msg := NewMsg("ss/c/station1/temp", []byte("25"), "sensor")
station := msg.Station()  // Returns "station1"

func (*Msg) String

func (msg *Msg) String() string

String returns the message payload as a UTF-8 string. This is a convenience method for converting the byte payload to a string.

type MsgHandler

type MsgHandler func(msg *Msg) error

MsgHandler is a callback function type for handling incoming messages. Subscribers provide a MsgHandler function that will be invoked when a message is received on a subscribed topic. The handler receives a pointer to the Msg and should return an error if message processing fails.

type MsgPrinter

type MsgPrinter struct{}

MsgPrinter is a simple utility type for debugging that prints received messages. It implements the MessageHandler interface.

TODO: Add functionality to filter and print messages by ID.

func (*MsgPrinter) MsgHandler

func (m *MsgPrinter) MsgHandler(msg *Msg) error

MsgHandler prints the received message to stdout in a structured format. This is useful for debugging message flow.

Parameters:

  • msg: The message to print

Returns nil (always succeeds).

Example:

printer := &MsgPrinter{}
msg.Subscribe("ss/c/+/temp", printer.MsgHandler)

type MsgSaver

type MsgSaver struct {
	Messages []*Msg `json:"saved-messages"` // All recorded messages
	Saving   bool   `json:"saving"`         // Whether message recording is active
}

MsgSaver records messages for debugging, testing, and audit purposes. When saving is enabled, all messages created via NewMsg() are automatically appended to the Messages slice.

This is useful for:

  • Debugging message flow through the system
  • Testing message handlers
  • Auditing message history
  • Replaying messages for testing

Note: In production, be mindful of memory usage as messages accumulate. Consider periodically clearing or persisting messages to external storage.

TODO: Add file persistence and protocol-based message delivery.

func GetMsgSaver

func GetMsgSaver() *MsgSaver

GetMsgSaver returns the singleton MsgSaver instance, creating it if needed. This provides a global message recording facility that can be enabled or disabled at runtime.

Returns a pointer to the MsgSaver instance.

Example:

saver := GetMsgSaver()
saver.StartSaving()
// ... messages are now being recorded ...
saver.StopSaving()

func (*MsgSaver) Dump

func (ms *MsgSaver) Dump()

Dump prints all saved messages to stdout in a human-readable format. Each message is separated by a line of dashes for readability.

This is useful for debugging and inspecting the message history.

func (*MsgSaver) ServeHTTP

func (ms *MsgSaver) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler to provide a REST API endpoint for accessing saved messages. Returns the MsgSaver state as JSON, including all saved messages and the current saving status.

Response format:

{
  "saved-messages": [{"id": 1, "topic": "...", ...}, ...],
  "saving": true
}

This endpoint is useful for debugging and monitoring message flow.

func (*MsgSaver) StartSaving

func (ms *MsgSaver) StartSaving()

StartSaving enables message recording. After calling this, all messages created via NewMsg() will be stored in the Messages slice.

Example:

saver := GetMsgSaver()
saver.StartSaving()

func (*MsgSaver) StopSaving

func (ms *MsgSaver) StopSaving()

StopSaving disables message recording. Messages created after this call will not be saved. Previously saved messages remain in the Messages slice.

Example:

saver := GetMsgSaver()
saver.StopSaving()

type Publication

type Publication struct {
	Topic    string      // The topic published to
	Payload  interface{} // The message payload
	QoS      byte        // Quality of Service level
	Retained bool        // Whether the message should be retained
}

Publication represents a message that was published through the mock client. Tests can inspect publications to verify that code published the right messages.

type Subscription

type Subscription struct {
	Topic   string                // The topic pattern subscribed to
	QoS     byte                  // Quality of Service level
	Handler gomqtt.MessageHandler // The callback function for messages
}

Subscription represents a topic subscription made through the mock client. Tests can inspect subscriptions to verify that code subscribed correctly.

type Topics

type Topics struct {
	TopicFmt string         // Format string for topic generation (e.g., "ss/%s/%s/%s")
	Topicmap map[string]int // Map of topic to usage count
}

Topics manages topic formatting and usage tracking for the Otto messaging system. It provides helper methods for creating properly formatted topics and tracks how many times each topic has been used.

Otto uses a standardized topic format: "ss/[c|d]/station/sensor" Where:

  • "ss" is the namespace prefix (Smart Station)
  • "c" indicates control topics (commands to devices)
  • "d" indicates data topics (sensor readings, telemetry)
  • station is the station/device identifier
  • sensor is the sensor/actuator/command name

Example topics:

  • "ss/c/station1/led" - Control topic to turn LED on/off
  • "ss/d/station1/temp" - Data topic for temperature readings

func GetTopics

func GetTopics() *Topics

GetTopics returns the singleton Topics instance for the application. This provides access to topic formatting and usage tracking.

Returns a pointer to the Topics instance.

Example:

topics := GetTopics()
controlTopic := topics.Control("led")

func (*Topics) Control

func (t *Topics) Control(topic string) string

Control generates a control topic for the current station. Control topics are used to send commands to devices (e.g., turn on LED, set speed). The format is: "ss/c/{station}/{topic}"

This method also increments the usage counter for the generated topic.

Parameters:

  • topic: The command or actuator name (e.g., "led", "motor", "relay")

Returns the fully formatted control topic string.

Example:

topics := GetTopics()
ledTopic := topics.Control("led")  // Returns "ss/c/mystation/led"

func (*Topics) Data

func (t *Topics) Data(topic string) string

Data generates a data topic for the current station. Data topics are used to publish sensor readings and telemetry. The format is: "ss/d/{station}/{topic}"

This method also increments the usage counter for the generated topic.

Parameters:

  • topic: The sensor or data stream name (e.g., "temp", "humidity", "motion")

Returns the fully formatted data topic string.

Example:

topics := GetTopics()
tempTopic := topics.Data("temp")  // Returns "ss/d/mystation/temp"

func (Topics) ServeHTTP

func (t Topics) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler to provide a REST API endpoint for inspecting topic usage. Returns JSON containing the topic format and a map of all topics used by this station with their usage counts.

Response format:

{
  "TopicFmt": "ss/%s/%s/%s",
  "Topicmap": {
    "ss/c/station1/led": 5,
    "ss/d/station1/temp": 120
  }
}

This is useful for monitoring which topics are being used and how frequently.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL