Documentation
¶
Overview ¶
Package messanger provides a unified interface for publish-subscribe messaging in the Otto IoT framework. It supports multiple messaging backends including local in-process messaging and MQTT-based distributed messaging.
The package implements a topic-based routing system where messages are published to topics and delivered to all subscribers of those topics. Topics follow the MQTT topic format with hierarchical paths separated by slashes (e.g., "ss/c/station/sensor").
Key Components:
- Messanger interface: Core abstraction for all messaging implementations
- MessangerLocal: In-process messaging with wildcard topic support
- MessangerMQTT: MQTT broker-based distributed messaging
- Msg: Message structure containing topic, payload, and metadata
- Topics: Topic format validation and management
Example Usage:
// Create a local messanger
msg := messanger.NewMessanger("local")
// Subscribe to a topic
msg.Subscribe("ss/c/station/+", func(m *messanger.Msg) error {
fmt.Printf("Received: %s\n", m.String())
return nil
})
// Publish a message
msg.Pub("ss/c/station/temp", "25.5")
mqtt_mock.go provides a comprehensive mock implementation of the Eclipse Paho MQTT client interface for testing purposes.
The mock client simulates MQTT broker behavior without requiring an actual network connection or broker. This enables:
- Fast, isolated unit tests
- Deterministic test behavior
- Testing error conditions and edge cases
- CI/CD environments without external dependencies
Key Features:
- Track all published messages for assertions
- Simulate connection, subscription, and publish operations
- Inject errors to test error handling
- Manually trigger message delivery to handlers
- Thread-safe for concurrent test scenarios
Example:
mock := NewMockClient()
mqtt := NewMQTT("test", "mock", "")
mqtt.SetMQTTClient(mock)
// Subscribe and trigger message
mqtt.Subscribe("test/topic", handler)
mock.SimulateMessage("test/topic", []byte("test data"))
// Verify publications
pubs := mock.GetPublications()
assert.Equal(t, 1, len(pubs))
Index ¶
- Variables
- func Bytes(data any) ([]byte, error)
- func StartMQTTBroker(ctx context.Context) (func(context.Context) error, error)
- func StopMQTTBroker(ctx context.Context) error
- func ValidateTopic(topic string) bool
- type Config
- type MQTT
- func (m *MQTT) Close()
- func (m *MQTT) Connect() error
- func (m *MQTT) Error() error
- func (m *MQTT) ID() string
- func (m *MQTT) IsConnected() bool
- func (m *MQTT) Publish(topic string, value any) error
- func (m *MQTT) SetMQTTClient(c gomqtt.Client) *MQTT
- func (m *MQTT) Subscribe(topic string, f MsgHandler) error
- type MessageHandler
- type Messanger
- type MessangerBase
- func (mb *MessangerBase) Close()
- func (mb *MessangerBase) Error() error
- func (mb *MessangerBase) ID() string
- func (mb *MessangerBase) Pub(topic string, data any) error
- func (mb *MessangerBase) PubMsg(msg *Msg) error
- func (m MessangerBase) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (mb *MessangerBase) Subscribe(topic string, handler MsgHandler) error
- type MessangerLocal
- func (m *MessangerLocal) Close()
- func (m *MessangerLocal) Connect() error
- func (m *MessangerLocal) Error() error
- func (m *MessangerLocal) ID() string
- func (m *MessangerLocal) Pub(topic string, value any) error
- func (m *MessangerLocal) PubMsg(msg *Msg) error
- func (m *MessangerLocal) Subscribe(topic string, handler MsgHandler) error
- type MessangerMQTT
- func (m *MessangerMQTT) Close()
- func (m *MessangerMQTT) Connect() error
- func (m *MessangerMQTT) Error() error
- func (m *MessangerMQTT) ID() string
- func (m *MessangerMQTT) Pub(topic string, value any) error
- func (m *MessangerMQTT) PubMsg(msg *Msg) error
- func (m *MessangerMQTT) Subscribe(topic string, handler MsgHandler) error
- type MockClient
- func (m *MockClient) AddRoute(topic string, callback gomqtt.MessageHandler)
- func (m *MockClient) ClearPublications()
- func (m *MockClient) Connect() gomqtt.Token
- func (m *MockClient) Disconnect(quiesce uint)
- func (m *MockClient) GetLastPublication() *Publication
- func (m *MockClient) GetPublications() []Publication
- func (m *MockClient) GetSubscriptions() map[string]Subscription
- func (m *MockClient) HasSubscription(topic string) bool
- func (m *MockClient) ID() string
- func (m *MockClient) IsConnected() bool
- func (m *MockClient) IsConnectionOpen() bool
- func (m *MockClient) IsDisconnectCalled() bool
- func (m *MockClient) OptionsReader() gomqtt.ClientOptionsReader
- func (m *MockClient) Publish(topic string, qos byte, retained bool, payload interface{}) gomqtt.Token
- func (m *MockClient) RemoveRoute(topic string)
- func (m *MockClient) Reset()
- func (m *MockClient) SetConnectError(err error)
- func (m *MockClient) SetOrderMatters(matter bool)
- func (m *MockClient) SetPublishError(err error)
- func (m *MockClient) SetSubscribeError(err error)
- func (m *MockClient) SimulateConnectionLost(err error)
- func (m *MockClient) SimulateMessage(topic string, payload []byte) error
- func (m *MockClient) Subscribe(topic string, qos byte, callback gomqtt.MessageHandler) gomqtt.Token
- func (m *MockClient) SubscribeMultiple(filters map[string]byte, callback gomqtt.MessageHandler) gomqtt.Token
- func (m *MockClient) Unsubscribe(topics ...string) gomqtt.Token
- type MockClientOptionsReader
- type MockMessage
- type MockToken
- type Msg
- func (msg *Msg) Bool() bool
- func (msg *Msg) Byte() []byte
- func (msg *Msg) Dump() string
- func (msg *Msg) Float64() float64
- func (msg *Msg) IsJSON() bool
- func (msg *Msg) JSON() ([]byte, error)
- func (msg *Msg) Last() string
- func (msg *Msg) Map() (map[string]interface{}, error)
- func (msg *Msg) Station() string
- func (msg *Msg) String() string
- type MsgHandler
- type MsgPrinter
- type MsgSaver
- type Publication
- type Subscription
- type Topics
Constants ¶
This section is empty.
Variables ¶
var GetMQTT = func() *MQTT { if mqtt == nil { mqtt = &MQTT{ id: "default", Broker: "localhost", } } return mqtt }
GetMQTT returns the singleton MQTT client instance, creating it with default settings if it doesn't exist yet.
Default settings:
- ID: "default"
- Broker: "localhost"
Returns a pointer to the MQTT client.
Note: This is a variable (not a const function) to allow test code to override it if needed.
Functions ¶
func Bytes ¶
Bytes converts various data types to a byte slice for use in message payloads. This utility function handles common types used in IoT applications.
Supported types:
- []byte: Returned as-is
- string: Converted to UTF-8 bytes
- int: Formatted as decimal string
- bool: Converted to "true" or "false"
- float64: Formatted with 2 decimal places (e.g., "25.50")
Parameters:
- data: The value to convert to bytes
Returns the byte representation and an error if the type is not supported.
Example:
bytes, err := Bytes(25.5) // Returns []byte("25.50"), nil
func StartMQTTBroker ¶ added in v0.0.10
func StopMQTTBroker ¶ added in v0.0.11
func ValidateTopic ¶
ValidateTopic checks if a topic string follows Otto's topic format conventions. A valid Otto topic must have:
- At least 4 segments separated by '/'
- First segment must be "ss" (namespace)
- Second segment must be "c" (control) or "d" (data)
- Third segment (station ID) must not be empty
- Fourth segment (sensor/command) must not be empty
Parameters:
- topic: The topic string to validate (e.g., "ss/c/station1/temp")
Returns true if the topic is valid, false otherwise.
Example:
valid := ValidateTopic("ss/c/station1/temp") // Returns true
valid := ValidateTopic("invalid/topic") // Returns false
Types ¶
type Config ¶ added in v0.0.11
type Config struct {
// Broker is the address of the MQTT broker (hostname or IP)
Broker string `json:"broker"`
Username string `json:"username"`
Password string `json:"password"`
}
MessangerConfig holds configuration parameters for messanger initialization. Currently supports broker address configuration for MQTT-based messangers.
type MQTT ¶
type MQTT struct {
Broker string // Broker hostname or IP (no protocol/port)
Username string // MQTT authentication username
Password string // MQTT authentication password
Debug bool // Enable Paho MQTT client debug logging
gomqtt.Client // Embedded Paho MQTT client
// contains filtered or unexported fields
}
MQTT wraps the Eclipse Paho MQTT Go client with Otto-specific configuration and error handling. It provides a simplified interface for connecting to MQTT brokers and publishing/subscribing to topics.
The wrapper handles:
- Connection management with automatic reconnection
- Authentication (username/password)
- Debug logging when enabled
- Error tracking
- Mock client injection for testing
Default Configuration:
- Port: 1883 (standard MQTT)
- Protocol: TCP
- Username: "otto"
- Password: "otto123"
- Clean Session: true
Example:
client := NewMQTT("sensor1", "mqtt.example.com", "")
err := client.Connect()
if err != nil {
log.Fatal(err)
}
client.Publish("sensors/temp", "25.5")
func NewMQTT ¶
NewMQTT creates a new MQTT client instance with default credentials. The client is not connected until Connect() is called.
Parameters:
- id: Unique client identifier (used for MQTT client ID)
- broker: Broker hostname or IP address (without tcp:// prefix or port)
- topics: Deprecated/unused parameter, kept for compatibility
Returns a pointer to the initialized MQTT client.
Example:
client := NewMQTT("sensor1", "localhost", "")
err := client.Connect()
func (*MQTT) Close ¶
func (m *MQTT) Close()
Close gracefully disconnects from the MQTT broker. It waits up to 1000ms for in-flight messages to complete before disconnecting.
After Close() is called, the client should not be used for further operations without reconnecting via Connect().
Example:
defer client.Close() // Ensure cleanup on exit
func (*MQTT) Connect ¶
Connect establishes a connection to the configured MQTT broker. It configures the MQTT client with the appropriate options and attempts to connect, waiting for the operation to complete.
Configuration precedence:
- Explicitly set values on the MQTT struct
- MQTT_BROKER environment variable (for broker address)
- Default values ("localhost", "otto", "otto123")
Connection parameters:
- URL format: tcp://broker:1883
- Clean Session: true (start fresh on each connection)
- QoS: 0 (fire and forget)
If Debug is enabled, MQTT client debug and error logs are written to the default logger.
Returns an error if the connection fails, nil on success.
Example:
client := NewMQTT("sensor1", "mqtt.example.com", "")
err := client.Connect()
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
func (*MQTT) Error ¶
Error returns the last error encountered by the MQTT client. Returns nil if no error has occurred.
func (*MQTT) IsConnected ¶
IsConnected checks whether the MQTT client is currently connected to the broker.
Returns true if connected, false otherwise.
Example:
if !client.IsConnected() {
client.Connect()
}
func (*MQTT) Publish ¶
Publish sends a message to the specified MQTT topic. The message is sent with QoS 0 (at most once) and is not retained by the broker.
Parameters:
- topic: The MQTT topic to publish to (must not be empty)
- value: The message payload (any type accepted by the MQTT client)
Returns an error if:
- The topic is empty
- The client is not connected
- The publish operation fails
Example:
err := client.Publish("sensors/temp", "25.5")
if err != nil {
log.Printf("Publish failed: %v", err)
}
func (*MQTT) SetMQTTClient ¶ added in v0.0.10
SetMQTTClient injects a custom or mock MQTT client for testing purposes. This allows unit tests to use a mock client instead of connecting to a real broker.
Parameters:
- c: The client to use (typically a MockClient for testing)
Returns the MQTT instance for method chaining.
Example:
mockClient := NewMockClient() mqtt.SetMQTTClient(mockClient)
func (*MQTT) Subscribe ¶
func (m *MQTT) Subscribe(topic string, f MsgHandler) error
Subscribe registers a message handler for the specified MQTT topic pattern. When messages matching the topic are received, they are converted to Otto's Msg format and passed to the handler function.
The subscription uses QoS 0 (at most once delivery).
MQTT Wildcard Support:
- '+' matches exactly one topic level (e.g., "sensors/+/temp")
- '#' matches zero or more levels (e.g., "sensors/#")
Parameters:
- topic: The MQTT topic pattern to subscribe to
- f: The handler function to invoke for each received message
Returns an error if:
- The client is not connected
- The subscription operation fails
Example:
client.Subscribe("sensors/+/temp", func(msg *Msg) error {
fmt.Printf("Temp: %s\n", msg.String())
return nil
})
TODO: Add automatic re-subscription when reconnecting to the broker.
type MessageHandler ¶
MessageHandler is an interface for types that can handle messages. This provides an alternative to the MsgHandler function type for implementing message handling logic as methods on types.
type Messanger ¶
type Messanger interface {
// ID returns the unique identifier for this messanger instance
ID() string
// Connect establishes connection to the underlying messaging backend.
// For local messangers this is a no-op. For MQTT it connects to the broker.
Connect() error
// Subscribe registers a handler function to receive messages on the given topic.
// The topic can include MQTT wildcards: '+' for single level, '#' for multi-level.
// Returns an error if subscription fails.
Subscribe(topic string, handler MsgHandler) error
// Pub publishes data to the specified topic. The data can be any type that
// can be converted to []byte (string, []byte, int, bool, float64).
// Returns an error if publishing fails.
Pub(topic string, msg any) error
// PubMsg publishes a pre-constructed Msg to its embedded topic.
// This is useful when you need more control over message metadata.
// Returns an error if publishing fails.
PubMsg(msg *Msg) error
// Error returns the last error encountered by the messanger, if any
Error() error
// Close cleanly shuts down the messanger, closing connections and
// cleaning up resources
Close()
}
Messanger is the core interface that all messaging implementations must satisfy. It provides methods for connecting to a messaging backend, subscribing to topics, publishing messages, and managing the messanger lifecycle.
Implementations include:
- MessangerLocal: In-process messaging with wildcard topic routing
- MessangerMQTT: MQTT broker-based distributed messaging
Thread Safety: Implementations should be safe for concurrent use.
func GetMessanger ¶
func GetMessanger() Messanger
GetMessanger returns the singleton Messanger instance created by NewMessanger. This function is thread-safe and can be called from multiple goroutines. Returns nil if no messanger has been created yet.
Example:
msg := messanger.GetMessanger()
if msg != nil {
msg.Pub("ss/c/station/status", "online")
}
func NewMessanger ¶
The created messanger becomes the global singleton accessible via GetMessanger(). If an invalid ID is provided, logs an error and returns nil.
Example:
msg := messanger.NewMessanger("local")
if msg == nil {
log.Fatal("Failed to create messanger")
}
type MessangerBase ¶
type MessangerBase struct {
Published int // Count of messages published through this messanger
// contains filtered or unexported fields
}
MessangerBase provides a base implementation of the Messanger interface with common functionality that can be embedded in specific messanger types. It handles subscription tracking, published message counting, and basic error management.
This type is typically embedded in concrete implementations like MessangerLocal and MessangerMQTT rather than used directly.
func NewMessangerBase ¶
func NewMessangerBase(id string) *MessangerBase
NewMessangerBase creates a new MessangerBase instance with the given ID. It initializes the subscription map and sets up the base structure. This is typically called by concrete messanger implementations.
Parameters:
- id: Unique identifier for the messanger instance
Returns a pointer to the initialized MessangerBase.
func (*MessangerBase) Close ¶
func (mb *MessangerBase) Close()
Close cleanly shuts down the messanger. This base implementation is a no-op that just logs the close operation.
Concrete messanger implementations should override this method to perform actual cleanup (e.g., closing network connections, unsubscribing, etc.).
func (*MessangerBase) Error ¶
func (mb *MessangerBase) Error() error
Error returns the last error encountered by this messanger, if any. Returns nil if no error has occurred.
func (*MessangerBase) ID ¶
func (mb *MessangerBase) ID() string
ID returns the unique identifier of this MessangerBase instance.
func (*MessangerBase) Pub ¶ added in v0.0.11
func (mb *MessangerBase) Pub(topic string, data any) error
func (*MessangerBase) PubMsg ¶
func (mb *MessangerBase) PubMsg(msg *Msg) error
PubMsg publishes a pre-constructed Msg structure. This base implementation only increments the Published counter for metrics.
Concrete messanger implementations should override this method to perform actual message publishing (e.g., sending via MQTT or local routing).
Parameters:
- msg: The message to publish (must not be nil)
Returns an error if msg is nil, otherwise returns nil.
func (MessangerBase) ServeHTTP ¶
func (m MessangerBase) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements http.Handler to provide a REST API endpoint for inspecting messanger state. It returns a JSON response containing the messanger ID, list of subscribed topics, and count of published messages.
Response format:
{
"ID": "messanger-id",
"Subs": ["topic1", "topic2"],
"Published": 42
}
This is useful for debugging and monitoring the messanger's state.
func (*MessangerBase) Subscribe ¶
func (mb *MessangerBase) Subscribe(topic string, handler MsgHandler) error
Subscribe stores a topic subscription locally in the base implementation. It maps the topic string to the handler function for later retrieval.
This base implementation only stores the subscription locally. Concrete messanger implementations should override this method to perform actual subscription logic (e.g., subscribing to an MQTT broker).
Parameters:
- topic: The topic pattern to subscribe to (may include wildcards)
- handler: The callback function to invoke when messages arrive on this topic
Returns nil on success, or an error if subscription fails.
type MessangerLocal ¶
type MessangerLocal struct {
*MessangerBase
sync.Mutex `json:"-"`
}
MessangerLocal provides an in-process messaging implementation that routes messages through a local topic tree without requiring an external broker. It supports MQTT-style wildcard subscriptions using a tree-based routing algorithm.
This implementation is useful for:
- Testing message handlers without needing a broker
- Single-process applications where distributed messaging isn't needed
- Development and debugging
- Embedded systems with limited resources
Wildcard Support:
- '+' matches exactly one topic level (e.g., "ss/c/+/temp")
- '#' matches zero or more levels (e.g., "ss/c/#")
Thread Safety: MessangerLocal is safe for concurrent use via embedded mutex.
Example:
msg := NewMessangerLocal("local")
msg.Subscribe("ss/c/+/temp", func(m *Msg) error {
fmt.Printf("Temperature: %s\n", m.String())
return nil
})
msg.Pub("ss/c/station1/temp", "25.5")
func NewMessangerLocal ¶
func NewMessangerLocal(id string) *MessangerLocal
NewMessangerLocal creates and initializes a new local in-process messanger. The messanger is immediately ready for use without requiring a Connect() call.
Parameters:
- id: Unique identifier for this messanger instance
Returns a pointer to the initialized MessangerLocal.
Example:
msg := NewMessangerLocal("test-local")
func (*MessangerLocal) Close ¶
func (m *MessangerLocal) Close()
Close cleanly shuts down the local messanger by removing all subscriptions from the routing tree and clearing local state.
This method is thread-safe via the embedded mutex.
After Close() is called, the messanger should not be used for publishing or subscribing.
func (*MessangerLocal) Connect ¶ added in v0.0.10
func (m *MessangerLocal) Connect() error
Connect is a no-op for local messangers since no external connection is needed. It exists to satisfy the Messanger interface.
Always returns nil (success).
func (*MessangerLocal) Error ¶
func (m *MessangerLocal) Error() error
Error returns the last error encountered by this messanger, if any. Returns nil if no error has occurred.
func (*MessangerLocal) ID ¶
func (m *MessangerLocal) ID() string
ID returns the unique identifier for this local messanger instance.
func (*MessangerLocal) Pub ¶
func (m *MessangerLocal) Pub(topic string, value any) error
Pub publishes a value to the specified topic through the local routing tree. The value is converted to bytes and wrapped in a Msg structure before delivery.
Supported value types: []byte, string, int, bool, float64
Parameters:
- topic: The complete topic path to publish to (e.g., "ss/c/station/led")
- value: The data to publish (will be converted to []byte)
Returns an error if:
- topic is empty
- no subscribers exist
- value cannot be converted to bytes
- no matching handlers are found in the routing tree
Example:
msg := NewMessangerLocal("local")
msg.Subscribe("ss/c/+/led", ledHandler)
msg.Pub("ss/c/station1/led", "on") // Delivered to ledHandler
func (*MessangerLocal) PubMsg ¶
func (m *MessangerLocal) PubMsg(msg *Msg) error
PubMsg publishes a pre-constructed message through the local routing tree. The message is looked up in the routing tree and delivered to all matching handlers.
Parameters:
- msg: The message to publish (must not be nil)
Returns an error if:
- msg is nil
- no handlers match the message topic
Example:
msg := NewMsg("ss/c/station1/led", []byte("on"), "controller")
messanger.PubMsg(msg)
func (*MessangerLocal) Subscribe ¶
func (m *MessangerLocal) Subscribe(topic string, handler MsgHandler) error
Subscribe registers a message handler for the given topic pattern in the local routing tree. The topic can include MQTT-style wildcards for flexible matching.
The handler is inserted into the topic routing tree and stored in the base subscription map for tracking.
Wildcard Examples:
- "ss/c/+/temp" matches "ss/c/station1/temp", "ss/c/station2/temp", etc.
- "ss/c/#" matches all topics starting with "ss/c/"
- "ss/c/station/+" matches any single-level child of "ss/c/station/"
Parameters:
- topic: The topic pattern to subscribe to (with optional wildcards)
- handler: The callback function to invoke when matching messages arrive
Returns nil on success, or an error if subscription fails.
type MessangerMQTT ¶
type MessangerMQTT struct {
*MessangerBase
*MQTT
sync.Mutex `json:"-"`
}
MessangerMQTT provides distributed messaging through an MQTT broker. It implements the Messanger interface by wrapping an MQTT client and providing publish-subscribe capabilities across a network.
This implementation is suitable for:
- Distributed IoT systems with multiple devices/stations
- Cloud-connected applications
- Systems requiring reliable message delivery
- Multi-node deployments
The messanger connects to an MQTT broker (like Mosquitto, EMQX, or the embedded broker) and handles topic subscriptions, message publishing, and connection management.
Thread Safety: MessangerMQTT is safe for concurrent use via embedded mutex.
Example:
msg := NewMessangerMQTT("mqtt-client", "localhost")
err := msg.Connect()
if err != nil {
log.Fatal(err)
}
msg.Subscribe("ss/c/+/temp", tempHandler)
msg.Pub("ss/d/station1/temp", "25.5")
func NewMessangerMQTT ¶
func NewMessangerMQTT(id string, broker string) *MessangerMQTT
NewMessangerMQTT creates a new MQTT messanger instance that connects to the specified broker without authentication.
The messanger uses default credentials (otto/otto123) which can be overridden by using NewMessangerMQTTWithAuth() instead.
Parameters:
- id: Unique identifier for this messanger instance (used as MQTT client ID)
- broker: Address of the MQTT broker (hostname or IP, without protocol or port)
Returns a pointer to the initialized MessangerMQTT. Call Connect() to establish the broker connection.
Example:
msg := NewMessangerMQTT("sensor1", "localhost")
err := msg.Connect()
func NewMessangerMQTTWithAuth ¶ added in v0.0.10
func NewMessangerMQTTWithAuth(id string, broker string, username string, password string) *MessangerMQTT
NewMessangerMQTTWithAuth creates a new MQTT messanger instance with custom authentication credentials for connecting to a secured MQTT broker.
Special broker values:
- "mock": Uses a mock MQTT client for testing (no actual network connection)
Parameters:
- id: Unique identifier for this messanger (used as MQTT client ID)
- broker: Address of the MQTT broker (hostname or IP)
- username: MQTT authentication username (empty for default "otto")
- password: MQTT authentication password (empty for default "otto123")
Returns a pointer to the initialized MessangerMQTT. Call Connect() to establish the broker connection.
Example:
msg := NewMessangerMQTTWithAuth("sensor1", "mqtt.example.com", "user", "pass")
err := msg.Connect()
func (*MessangerMQTT) Close ¶
func (m *MessangerMQTT) Close()
Close cleanly shuts down the MQTT messanger by:
- Closing the MQTT broker connection
- Clearing local subscription tracking
This method is thread-safe via the embedded mutex. After Close() is called, the messanger should not be used.
Example:
defer msg.Close() // Ensure cleanup on exit
func (*MessangerMQTT) Connect ¶ added in v0.0.10
func (m *MessangerMQTT) Connect() error
Connect establishes a connection to the MQTT broker and subscribes to any topics that were registered before connection was established.
This method:
- Connects to the configured MQTT broker
- Subscribes to all topics in the local subscription map
- Logs errors for any failed subscriptions but continues with others
Returns an error if the initial broker connection fails. Individual subscription failures are logged but don't cause the method to fail.
Example:
msg := NewMessangerMQTT("client", "localhost")
msg.Subscribe("ss/c/+/led", ledHandler) // Registered but not active yet
err := msg.Connect() // Now connected and subscribed to ss/c/+/led
func (*MessangerMQTT) Error ¶
func (m *MessangerMQTT) Error() error
Error returns the last error from the underlying MQTT client, if any. Returns nil if no MQTT client exists or no error has occurred.
func (*MessangerMQTT) ID ¶
func (m *MessangerMQTT) ID() string
ID returns the unique identifier for this MQTT messanger instance.
func (*MessangerMQTT) Pub ¶
func (m *MessangerMQTT) Pub(topic string, value any) error
Pub publishes data to the specified topic via MQTT. The data is sent to all subscribers of that topic across the network.
Parameters:
- topic: The MQTT topic to publish to (e.g., "ss/d/station1/temp")
- value: The data to publish (any type accepted by MQTT client)
Returns nil on success. Error handling is currently limited; check logs for issues.
Example:
msg := NewMessangerMQTT("client", "localhost")
msg.Connect()
msg.Pub("ss/d/station1/temp", "25.5")
func (*MessangerMQTT) PubMsg ¶
func (m *MessangerMQTT) PubMsg(msg *Msg) error
PubMsg publishes a pre-constructed Msg structure via MQTT. The message's topic and data fields are used for the MQTT publication.
Parameters:
- msg: The message to publish (must not be nil)
Returns an error if msg is nil, otherwise returns nil.
Example:
msg := NewMsg("ss/d/station1/temp", []byte("25.5"), "sensor")
messanger.PubMsg(msg)
func (*MessangerMQTT) Subscribe ¶
func (m *MessangerMQTT) Subscribe(topic string, handler MsgHandler) error
Subscribe registers a message handler for the given topic pattern on the MQTT broker. The topic pattern can include MQTT wildcards ('+' for single level, '#' for multi-level).
The subscription is stored locally and, if already connected to the broker, is immediately activated. If not yet connected, it will be activated when Connect() is called.
Parameters:
- topic: The MQTT topic pattern to subscribe to (e.g., "ss/c/+/temp")
- handler: The callback function to invoke when matching messages arrive
Returns an error if the broker subscription fails (when connected).
Example:
msg := NewMessangerMQTT("client", "localhost")
msg.Connect()
msg.Subscribe("ss/c/station1/#", func(m *Msg) error {
fmt.Printf("Received: %s\n", m.String())
return nil
})
type MockClient ¶
type MockClient struct {
LastTopic string // Last topic published to (deprecated)
LastMessage string // Last message published (deprecated)
// contains filtered or unexported fields
}
MockClient implements the complete gomqtt.Client interface for testing. It simulates an MQTT client without requiring a real broker connection.
The mock tracks all operations (connect, publish, subscribe) and allows tests to:
- Verify publications via GetPublications()
- Verify subscriptions via GetSubscriptions()
- Inject errors via SetConnectError(), SetPublishError(), etc.
- Simulate incoming messages via SimulateMessage()
- Control connection state
Thread Safety: All methods are protected by an RWMutex for concurrent access.
Example:
mock := NewMockClient()
mock.Connect() // Always succeeds unless SetConnectError() was called
mock.Publish("test/topic", 0, false, "payload")
pubs := mock.GetPublications()
assert.Equal(t, 1, len(pubs))
func NewMockClient ¶
func NewMockClient() *MockClient
NewMockClient creates and initializes a new mock MQTT client. The client starts in a disconnected state with no subscriptions.
Returns a pointer to the mock client.
Example:
mock := NewMockClient()
mqtt := NewMQTT("test", "mock", "")
mqtt.SetMQTTClient(mock)
func (*MockClient) AddRoute ¶
func (m *MockClient) AddRoute(topic string, callback gomqtt.MessageHandler)
Routing methods
func (*MockClient) ClearPublications ¶
func (m *MockClient) ClearPublications()
ClearPublications clears the publication history
func (*MockClient) Connect ¶
func (m *MockClient) Connect() gomqtt.Token
func (*MockClient) Disconnect ¶
func (m *MockClient) Disconnect(quiesce uint)
func (*MockClient) GetLastPublication ¶
func (m *MockClient) GetLastPublication() *Publication
GetLastPublication returns the most recent publication
func (*MockClient) GetPublications ¶
func (m *MockClient) GetPublications() []Publication
GetPublications returns all publications made through this mock client
func (*MockClient) GetSubscriptions ¶
func (m *MockClient) GetSubscriptions() map[string]Subscription
GetSubscriptions returns all active subscriptions
func (*MockClient) HasSubscription ¶
func (m *MockClient) HasSubscription(topic string) bool
HasSubscription checks if a topic is subscribed
func (*MockClient) ID ¶ added in v0.0.10
func (m *MockClient) ID() string
ID returns the identifier for this mock client. Always returns "mock" to distinguish it from real clients.
func (*MockClient) IsConnectionOpen ¶
func (m *MockClient) IsConnectionOpen() bool
func (*MockClient) IsDisconnectCalled ¶
func (m *MockClient) IsDisconnectCalled() bool
IsDisconnectCalled returns true if Disconnect was called
func (*MockClient) OptionsReader ¶
func (m *MockClient) OptionsReader() gomqtt.ClientOptionsReader
Configuration methods
func (*MockClient) Publish ¶
func (m *MockClient) Publish(topic string, qos byte, retained bool, payload interface{}) gomqtt.Token
Publishing methods
func (*MockClient) RemoveRoute ¶
func (m *MockClient) RemoveRoute(topic string)
func (*MockClient) SetConnectError ¶
func (m *MockClient) SetConnectError(err error)
SetConnectError configures the mock to return an error on Connect()
func (*MockClient) SetOrderMatters ¶
func (m *MockClient) SetOrderMatters(matter bool)
func (*MockClient) SetPublishError ¶
func (m *MockClient) SetPublishError(err error)
SetPublishError configures the mock to return an error on Publish()
func (*MockClient) SetSubscribeError ¶
func (m *MockClient) SetSubscribeError(err error)
SetSubscribeError configures the mock to return an error on Subscribe()
func (*MockClient) SimulateConnectionLost ¶
func (m *MockClient) SimulateConnectionLost(err error)
SimulateConnectionLost simulates a connection loss
func (*MockClient) SimulateMessage ¶
func (m *MockClient) SimulateMessage(topic string, payload []byte) error
SimulateMessage triggers a message delivery to subscribed handlers This allows testing of message handling logic
func (*MockClient) Subscribe ¶
func (m *MockClient) Subscribe(topic string, qos byte, callback gomqtt.MessageHandler) gomqtt.Token
Subscription methods
func (*MockClient) SubscribeMultiple ¶
func (m *MockClient) SubscribeMultiple(filters map[string]byte, callback gomqtt.MessageHandler) gomqtt.Token
func (*MockClient) Unsubscribe ¶
func (m *MockClient) Unsubscribe(topics ...string) gomqtt.Token
type MockClientOptionsReader ¶
type MockClientOptionsReader struct {
// contains filtered or unexported fields
}
MockClientOptionsReader is a simple stub implementation of the gomqtt.ClientOptionsReader interface for testing.
func (*MockClientOptionsReader) ClientID ¶
func (m *MockClientOptionsReader) ClientID() string
ClientID returns the client identifier.
type MockMessage ¶
type MockMessage struct {
// contains filtered or unexported fields
}
MockMessage implements the gomqtt.Message interface for testing. It represents an MQTT message received by a subscriber.
func NewMockMessage ¶
func NewMockMessage(topic string, payload []byte) *MockMessage
NewMockMessage creates a new mock MQTT message for testing. The message is created with QoS 0 and not retained.
Parameters:
- topic: The topic the message is published to
- payload: The message payload as bytes
Returns a pointer to the mock message.
func (*MockMessage) Duplicate ¶
func (m *MockMessage) Duplicate() bool
Duplicate returns whether this is a duplicate message delivery.
func (*MockMessage) MessageID ¶
func (m *MockMessage) MessageID() uint16
MessageID returns the unique identifier for this message.
func (*MockMessage) Payload ¶
func (m *MockMessage) Payload() []byte
Payload returns the message payload as bytes.
func (*MockMessage) Qos ¶
func (m *MockMessage) Qos() byte
Qos returns the Quality of Service level for this message.
func (*MockMessage) Retained ¶
func (m *MockMessage) Retained() bool
Retained returns whether this message was retained by the broker.
func (*MockMessage) Topic ¶
func (m *MockMessage) Topic() string
Topic returns the topic this message was published to.
type MockToken ¶
type MockToken struct {
// contains filtered or unexported fields
}
MockToken implements the gomqtt.Token interface for testing. It represents the result of an asynchronous MQTT operation (connect, publish, subscribe). The token tracks whether the operation completed and any error that occurred.
func NewMockToken ¶
NewMockToken creates a new mock token with the specified error state. Pass nil for a successful operation token.
Parameters:
- err: The error to return, or nil for success
Returns a pointer to the mock token.
func (*MockToken) Done ¶
func (t *MockToken) Done() <-chan struct{}
Done returns a channel that is closed when the operation completes. For the mock, the channel is already closed (operation is instant).
func (*MockToken) Error ¶
Error returns the error from the operation, if any. Returns nil if the operation succeeded.
func (*MockToken) Wait ¶
Wait blocks until the operation completes (immediately for mock). Returns true if the operation succeeded, false if it failed.
func (*MockToken) WaitTimeout ¶
WaitTimeout waits for the operation to complete with a timeout. For the mock, this behaves the same as Wait() and returns immediately.
Parameters:
- timeout: Maximum time to wait (ignored in mock)
Returns true if the operation succeeded, false if it failed.
type Msg ¶
type Msg struct {
ID int64 `json:"id"`
Topic string `json:"topic"`
Path []string `json:"path"`
Args []string `json:"args"`
Data []byte `json:"msg"`
Source string `json:"source"`
Valid bool `json:"valid"`
Timestamp time.Duration `json:"timestamp"`
}
Msg represents a message in the Otto messaging system. It contains the message payload along with routing information and metadata for tracking and debugging.
Each message has a globally unique ID and timestamp for tracing message flow through the system. The topic is parsed into a path array for efficient routing.
Fields:
- ID: Unique monotonically increasing identifier for this message
- Topic: Full topic path (e.g., "ss/c/station/temp")
- Path: Topic split into segments for routing (e.g., ["ss", "c", "station", "temp"])
- Args: Optional arguments extracted from the topic path
- Data: The message payload as a byte array
- Source: Identifier of the component that created this message
- Valid: Whether the topic follows Otto's topic format conventions
- Timestamp: When the message was created
Messages are immutable once created and should be created using NewMsg().
func NewMsg ¶
NewMsg creates a new message with the given topic, payload data, and source identifier. The topic is automatically parsed into path segments for routing, and the message is validated against Otto's topic format.
If message saving is enabled via GetMsgSaver().StartSaving(), the message will be recorded for debugging purposes.
Parameters:
- topic: The full topic path (e.g., "ss/c/station/temp")
- data: The message payload as bytes
- source: Identifier of the component creating this message (e.g., "mqtt-sub", "local")
Returns a pointer to the newly created message.
Example:
msg := NewMsg("ss/c/station1/temp", []byte("25.5"), "sensor")
fmt.Printf("Message ID: %d, Topic: %s\n", msg.ID, msg.Topic)
func (*Msg) Bool ¶ added in v0.0.10
Bool interprets the message payload as a boolean value. It recognizes common boolean string representations:
- true: "true", "1", "on", "yes"
- false: everything else
Returns true if the payload matches a truthy value, false otherwise.
Example:
msg := NewMsg("ss/c/station/led", []byte("on"), "controller")
if msg.Bool() {
fmt.Println("LED is on")
}
func (*Msg) Byte ¶
Byte returns the raw byte array payload of the message. This is an alias for accessing the Data field directly.
func (*Msg) Dump ¶
Dump returns a human-readable multi-line string representation of the message including all fields. This is useful for debugging and logging.
Returns a formatted string with ID, Path, Args, Source, Timestamp, and Data.
Example output:
ID: 123 Path: ["ss" "c" "station1" "temp"] Args: [] Src: sensor Time: 1234567890 Data: 25.5
func (*Msg) Float64 ¶
Float64 parses the message payload as a floating-point number. The payload should be a string representation of a number.
Returns the parsed float64 value. If parsing fails, returns 0.0.
Example:
msg := NewMsg("ss/d/station/temp", []byte("25.5"), "sensor")
temp := msg.Float64() // Returns 25.5
func (*Msg) IsJSON ¶
IsJSON checks whether the message payload is valid JSON. This is useful for determining if the payload needs JSON parsing or should be treated as a plain string/binary data.
Returns true if the payload is valid JSON, false otherwise.
func (*Msg) JSON ¶
JSON encodes the entire message (not just the payload) as JSON. This serializes the Msg struct including all fields (ID, Topic, Data, etc.).
Returns the JSON byte array and any encoding error.
Note: To decode just the payload as JSON, use Map() instead.
func (*Msg) Last ¶
Last returns the last segment of the message topic path. This is typically the sensor or command name.
Returns the last path element, or empty string if the path is empty.
Example:
msg := NewMsg("ss/c/station1/temp", []byte("25"), "sensor")
last := msg.Last() // Returns "temp"
func (*Msg) Map ¶
Map decodes the message payload as a JSON object into a map. The payload must be valid JSON object format.
Returns a map with string keys and interface{} values, or an error if the payload is not valid JSON or not an object.
Example:
msg := NewMsg("ss/d/station/sensor", []byte(`{"temp":25.5,"humidity":60}`), "sensor")
data, err := msg.Map()
if err == nil {
temp := data["temp"].(float64)
}
func (*Msg) Station ¶
Station extracts the station identifier from the message topic. Otto topics follow the format "ss/[c|d]/station/sensor", where the third element (index 2) is the station ID.
Returns the station ID string, or empty string if the topic doesn't have enough elements (less than 3).
Example:
msg := NewMsg("ss/c/station1/temp", []byte("25"), "sensor")
station := msg.Station() // Returns "station1"
type MsgHandler ¶
MsgHandler is a callback function type for handling incoming messages. Subscribers provide a MsgHandler function that will be invoked when a message is received on a subscribed topic. The handler receives a pointer to the Msg and should return an error if message processing fails.
type MsgPrinter ¶
type MsgPrinter struct{}
MsgPrinter is a simple utility type for debugging that prints received messages. It implements the MessageHandler interface.
TODO: Add functionality to filter and print messages by ID.
func (*MsgPrinter) MsgHandler ¶
func (m *MsgPrinter) MsgHandler(msg *Msg) error
MsgHandler prints the received message to stdout in a structured format. This is useful for debugging message flow.
Parameters:
- msg: The message to print
Returns nil (always succeeds).
Example:
printer := &MsgPrinter{}
msg.Subscribe("ss/c/+/temp", printer.MsgHandler)
type MsgSaver ¶
type MsgSaver struct {
Messages []*Msg `json:"saved-messages"` // All recorded messages
Saving bool `json:"saving"` // Whether message recording is active
}
MsgSaver records messages for debugging, testing, and audit purposes. When saving is enabled, all messages created via NewMsg() are automatically appended to the Messages slice.
This is useful for:
- Debugging message flow through the system
- Testing message handlers
- Auditing message history
- Replaying messages for testing
Note: In production, be mindful of memory usage as messages accumulate. Consider periodically clearing or persisting messages to external storage.
TODO: Add file persistence and protocol-based message delivery.
func GetMsgSaver ¶
func GetMsgSaver() *MsgSaver
GetMsgSaver returns the singleton MsgSaver instance, creating it if needed. This provides a global message recording facility that can be enabled or disabled at runtime.
Returns a pointer to the MsgSaver instance.
Example:
saver := GetMsgSaver() saver.StartSaving() // ... messages are now being recorded ... saver.StopSaving()
func (*MsgSaver) Dump ¶
func (ms *MsgSaver) Dump()
Dump prints all saved messages to stdout in a human-readable format. Each message is separated by a line of dashes for readability.
This is useful for debugging and inspecting the message history.
func (*MsgSaver) ServeHTTP ¶
func (ms *MsgSaver) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements http.Handler to provide a REST API endpoint for accessing saved messages. Returns the MsgSaver state as JSON, including all saved messages and the current saving status.
Response format:
{
"saved-messages": [{"id": 1, "topic": "...", ...}, ...],
"saving": true
}
This endpoint is useful for debugging and monitoring message flow.
func (*MsgSaver) StartSaving ¶
func (ms *MsgSaver) StartSaving()
StartSaving enables message recording. After calling this, all messages created via NewMsg() will be stored in the Messages slice.
Example:
saver := GetMsgSaver() saver.StartSaving()
func (*MsgSaver) StopSaving ¶
func (ms *MsgSaver) StopSaving()
StopSaving disables message recording. Messages created after this call will not be saved. Previously saved messages remain in the Messages slice.
Example:
saver := GetMsgSaver() saver.StopSaving()
type Publication ¶
type Publication struct {
Topic string // The topic published to
Payload interface{} // The message payload
QoS byte // Quality of Service level
Retained bool // Whether the message should be retained
}
Publication represents a message that was published through the mock client. Tests can inspect publications to verify that code published the right messages.
type Subscription ¶
type Subscription struct {
Topic string // The topic pattern subscribed to
QoS byte // Quality of Service level
Handler gomqtt.MessageHandler // The callback function for messages
}
Subscription represents a topic subscription made through the mock client. Tests can inspect subscriptions to verify that code subscribed correctly.
type Topics ¶
type Topics struct {
TopicFmt string // Format string for topic generation (e.g., "ss/%s/%s/%s")
Topicmap map[string]int // Map of topic to usage count
}
Topics manages topic formatting and usage tracking for the Otto messaging system. It provides helper methods for creating properly formatted topics and tracks how many times each topic has been used.
Otto uses a standardized topic format: "ss/[c|d]/station/sensor" Where:
- "ss" is the namespace prefix (Smart Station)
- "c" indicates control topics (commands to devices)
- "d" indicates data topics (sensor readings, telemetry)
- station is the station/device identifier
- sensor is the sensor/actuator/command name
Example topics:
- "ss/c/station1/led" - Control topic to turn LED on/off
- "ss/d/station1/temp" - Data topic for temperature readings
func GetTopics ¶
func GetTopics() *Topics
GetTopics returns the singleton Topics instance for the application. This provides access to topic formatting and usage tracking.
Returns a pointer to the Topics instance.
Example:
topics := GetTopics()
controlTopic := topics.Control("led")
func (*Topics) Control ¶
Control generates a control topic for the current station. Control topics are used to send commands to devices (e.g., turn on LED, set speed). The format is: "ss/c/{station}/{topic}"
This method also increments the usage counter for the generated topic.
Parameters:
- topic: The command or actuator name (e.g., "led", "motor", "relay")
Returns the fully formatted control topic string.
Example:
topics := GetTopics()
ledTopic := topics.Control("led") // Returns "ss/c/mystation/led"
func (*Topics) Data ¶
Data generates a data topic for the current station. Data topics are used to publish sensor readings and telemetry. The format is: "ss/d/{station}/{topic}"
This method also increments the usage counter for the generated topic.
Parameters:
- topic: The sensor or data stream name (e.g., "temp", "humidity", "motion")
Returns the fully formatted data topic string.
Example:
topics := GetTopics()
tempTopic := topics.Data("temp") // Returns "ss/d/mystation/temp"
func (Topics) ServeHTTP ¶
func (t Topics) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements http.Handler to provide a REST API endpoint for inspecting topic usage. Returns JSON containing the topic format and a map of all topics used by this station with their usage counts.
Response format:
{
"TopicFmt": "ss/%s/%s/%s",
"Topicmap": {
"ss/c/station1/led": 5,
"ss/d/station1/temp": 120
}
}
This is useful for monitoring which topics are being used and how frequently.