autopaho

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2023 License: EPL-2.0 Imports: 16 Imported by: 95

README

AutoPaho

AutoPaho has a number of aims:

  • Provide an easy-to-use MQTT v5 client that provides commonly requested functionality (e.g. connection, automatic reconnection).
  • Demonstrate the use of paho.golang/paho.
  • Enable us to smoke test paho.golang/paho features (ensuring they are they usable in a real world situation)

Basic Usage

package main

import (
	"context"
	"fmt"
	"net/url"
	"os"
	"os/signal"
	"syscall"

	"github.com/eclipse/paho.golang/autopaho"
	"github.com/eclipse/paho.golang/paho"
)

func main() {
	// App will run until cancelled by user (e.g. ctrl-c)
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
	defer stop()

	u, err := url.Parse("mqtt://test.mosquitto.org:1883")
	if err != nil {
		panic(err)
	}

	cliCfg := autopaho.ClientConfig{
		BrokerUrls: []*url.URL{u},
		KeepAlive:  20, // Keepalive message should be sent every 20 seconds
		OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
			fmt.Println("mqtt connection up")
			if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
				Subscriptions: []paho.SubscribeOptions{
					{Topic: "testTopic", QoS: 1},
				},
			}); err != nil {
				fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received!\n", err)
			}
			fmt.Println("mqtt subscription made")
		},
		OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
		// eclipse/paho.golang/paho provides base mqtt functionality, the below config will be passed in for each connection
		ClientConfig: paho.ClientConfig{
			ClientID: "TestClient",
			Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) {
				fmt.Printf("received message on topic %s; body: %s (retain: %t)\n", m.Topic, m.Payload, m.Retain)
			}),
			OnClientError: func(err error) { fmt.Printf("server requested disconnect: %s\n", err) },
			OnServerDisconnect: func(d *paho.Disconnect) {
				if d.Properties != nil {
					fmt.Printf("server requested disconnect: %s\n", d.Properties.ReasonString)
				} else {
					fmt.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode)
				}
			},
		},
	}

	c, err := autopaho.NewConnection(ctx, cliCfg)
	if err != nil {
		panic(err)
	}
	// Wait for the connection to come up
	if err = c.AwaitConnection(ctx); err != nil {
		panic(err)
	}
	// Publish a test message
	if _, err = c.Publish(ctx, &paho.Publish{
		QoS:     1,
		Topic:   "testTopic",
		Payload: []byte("TestMessage"),
	}); err != nil {
		panic(err)
	}

	<-ctx.Done() // Wait for user to trigger exit
	fmt.Println("signal caught - exiting")
}

QOS 1 & 2

paho.golang/paho supports QOS 1 & 2 but does not maintain a "Session State" . This means that QOS1/2 messages will be sent but no attempt will be made to resend them if there is an issue.

Work in progress

See this issue for more info (currently nearing completion).

Use case: I want to regularly publish messages and leave it to autopaho to ensure they are delivered (regardless of the connection state).

Acceptance tests:

  • Publish a message before connection is established; it should be queued and delivered when connection comes up.
  • Connection drops and messages are published whilst reconnection is in progress. They should be queued and delivered when connection is available.
  • Publish messages at a rate in excess of Receive Maximum; they should be queued and sent, in order, when possible.
  • Application restarts during any of the above - queued messages are sent out when connection comes up.

Desired features:

  • Fire and forget - async publish; we trust the library to deliver the message so once its in the store the client can forget about it.
  • Minimal RAM use - the connection may be down for a long time and we may not have much ram. So messages should be on disk (ideally with no trace of them in RAM)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ConnectionDownError = errors.New("connection with the MQTT broker is currently down")

ConnectionDownError Down will be returned when a request is made but the connection to the broker is down Note: It is possible that the connection will drop between the request being made and a response being received in which case a different error will be received (this is only returned if the connection is down at the time the request is made).

Functions

This section is empty.

Types

type ClientConfig

type ClientConfig struct {
	BrokerUrls        []*url.URL       // URL(s) for the broker (schemes supported include 'mqtt' and 'tls')
	TlsCfg            *tls.Config      // Configuration used when connecting using TLS
	KeepAlive         uint16           // Keepalive period in seconds (the maximum time interval that is permitted to elapse between the point at which the Client finishes transmitting one MQTT Control Packet and the point it starts sending the next)
	ConnectRetryDelay time.Duration    // How long to wait between connection attempts (defaults to 10s)
	ConnectTimeout    time.Duration    // How long to wait for the connection process to complete (defaults to 10s)
	WebSocketCfg      *WebSocketConfig // Enables customisation of the websocket connection

	// AttemptConnection, if provided, will be called to establish a network connection.
	// The returned `conn` must support thread safe writing; most wrapped net.Conn implementations like tls.Conn
	// are not thread safe for writing.
	// To fix, use packets.NewThreadSafeConn wrapper or extend the custom net.Conn struct with sync.Locker.
	AttemptConnection func(context.Context, ClientConfig, *url.URL) (net.Conn, error)

	OnConnectionUp func(*ConnectionManager, *paho.Connack) // Called (within a goroutine) when a connection is made (including reconnection). Connection Manager passed to simplify subscriptions.
	OnConnectError func(error)                             // Called (within a goroutine) whenever a connection attempt fails. Will wrap autopaho.ConnackError on server deny.

	Debug      paho.Logger // By default set to NOOPLogger{},set to a logger for debugging info
	PahoDebug  paho.Logger // debugger passed to the paho package (will default to NOOPLogger{})
	PahoErrors paho.Logger // error logger passed to the paho package (will default to NOOPLogger{})

	// We include the full paho.ClientConfig in order to simplify moving between the two packages.
	// Note that Conn will be ignored.
	paho.ClientConfig
	// contains filtered or unexported fields
}

ClientConfig adds a few values, required to manage the connection, to the standard paho.ClientConfig (note that conn will be ignored)

func (*ClientConfig) ResetUsernamePassword

func (cfg *ClientConfig) ResetUsernamePassword()

ResetUsernamePassword clears any configured username and password on the client configuration

func (*ClientConfig) SetConnectPacketConfigurator

func (cfg *ClientConfig) SetConnectPacketConfigurator(fn func(*paho.Connect) *paho.Connect) bool

SetConnectPacketConfigurator assigns a callback for modification of the Connect packet, called before the connection is opened, allowing the application to adjust its configuration before establishing a connection. This function should be treated as asynchronous, and expected to have no side effects.

func (*ClientConfig) SetUsernamePassword

func (cfg *ClientConfig) SetUsernamePassword(username string, password []byte)

SetUsernamePassword configures username and password properties for the Connect packets These values are staged in the ClientConfig, and preparation of the Connect packet is deferred.

func (*ClientConfig) SetWillMessage

func (cfg *ClientConfig) SetWillMessage(topic string, payload []byte, qos byte, retain bool)

SetWillMessage configures the Will topic, payload, QOS and Retain facets of the client connection These values are staged in the ClientConfig, for later preparation of the Connect packet.

type ConnackError added in v0.12.0

type ConnackError struct {
	ReasonCode byte   // CONNACK reason code
	Reason     string // CONNACK Reason string from properties
	Err        error  // underlying error
}

ConnackError will be passed when the server denies connection in CONNACK packet

func NewConnackError added in v0.12.0

func NewConnackError(err error, connack *paho.Connack) *ConnackError

NewConnackError returns a new ConnackError

func (*ConnackError) Error added in v0.12.0

func (c *ConnackError) Error() string

func (*ConnackError) Unwrap added in v0.12.0

func (c *ConnackError) Unwrap() error

type ConnectionManager

type ConnectionManager struct {
	// contains filtered or unexported fields
}

ConnectionManager manages the connection with the broker and provides thew ability to publish messages

func NewConnection

func NewConnection(ctx context.Context, cfg ClientConfig) (*ConnectionManager, error)

NewConnection creates a connection manager and begins the connection process (will retry until the context is cancelled)

func (*ConnectionManager) AwaitConnection

func (c *ConnectionManager) AwaitConnection(ctx context.Context) error

AwaitConnection will return when the connection comes up or the context is cancelled (only returns an error if context is cancelled). If you require more complex connection management then consider using the OnConnectionUp callback.

func (*ConnectionManager) Disconnect

func (c *ConnectionManager) Disconnect(ctx context.Context) error

Disconnect closes the connection (if one is up) and shuts down any active processes before returning Note: We cannot currently tell when the mqtt has fully shutdown (so it may still be in the process of closing down)

func (*ConnectionManager) Done

func (c *ConnectionManager) Done() <-chan struct{}

Done returns a channel that will be closed when the connection handler has shutdown cleanly Note: We cannot currently tell when the mqtt has fully shutdown (so it may still be in the process of closing down)

func (*ConnectionManager) Publish

Publish is used to send a publication to the MQTT server. It is passed a pre-prepared Publish packet and blocks waiting for the appropriate response, or for the timeout to fire. Any response message is returned from the function, along with any errors.

func (*ConnectionManager) Subscribe

func (c *ConnectionManager) Subscribe(ctx context.Context, s *paho.Subscribe) (*paho.Suback, error)

Subscribe is used to send a Subscription request to the MQTT server. It is passed a pre-prepared Subscribe packet and blocks waiting for a response Suback, or for the timeout to fire. Any response Suback is returned from the function, along with any errors.

func (*ConnectionManager) Unsubscribe

func (c *ConnectionManager) Unsubscribe(ctx context.Context, u *paho.Unsubscribe) (*paho.Unsuback, error)

Unsubscribe is used to send an Unsubscribe request to the MQTT server. It is passed a pre-prepared Unsubscribe packet and blocks waiting for a response Unsuback, or for the timeout to fire. Any response Unsuback is returned from the function, along with any errors.

type DisconnectError

type DisconnectError struct {
	// contains filtered or unexported fields
}

DisconnectError will be passed when the server requests disconnection (allows this error type to be detected)

func (*DisconnectError) Error

func (d *DisconnectError) Error() string

type WebSocketConfig

type WebSocketConfig struct {
	Dialer func(url *url.URL, tlsCfg *tls.Config) *websocket.Dialer // If non-nil this will be called before each websocket connection (allows full configuration of the dialer used)
	Header func(url *url.URL, tlsCfg *tls.Config) http.Header       // If non-nil this will be called before each connection attempt to get headers to include with request
}

WebSocketConfig enables customisation of the websocket connection

Directories

Path Synopsis
cmd
rpc
examples
extensions
rpc
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL