mqtt

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

README

go-mqtt

Go Reference GitHub go.mod Go version (subdirectory of monorepo) GitHub tag (with filter) Go Report Card GitHub

go-mqtt is a mqtt golang client that implements the mqttv3 and mqttv5 protocols

Done

  • basic client
  • packet reading and writing
  • publish qos 0
  • subscribe and unsubscribe
  • re-connector

TODO

  • mqttv3 check
  • qos 1 and 2
  • prefix tree based message router
  • event hook

MQTT-v5.0 oasis doc

MQTT-v3.1.1 oasis doc

How to use

get package:

go get github.com/lynnplus/go-mqtt

connect to mqtt broker

package main

import (
	"github.com/lynnplus/go-mqtt"
	"github.com/lynnplus/go-mqtt/packets"
)

func main() {
	client := mqtt.NewClient(&mqtt.ConnDialer{
		Address: "tcp://127.0.0.1:1883",
	}, mqtt.ClientConfig{})

	pkt := packets.NewConnect("client_id", "username", []byte("password"))
	ack, err := client.Connect(context.Background(), pkt)
	if err != nil {
		panic(err)
	}
	if ack.ReasonCode != 0 {
		panic(packets.NewReasonCodeError(ack.ReasonCode, ""))
	}
	//do something
	_ = client.Disconnect()
}

The package provides two connection methods, synchronous and asynchronous. When asynchronous, the response result can only be obtained in the callback.

Asynchronous call connection:

package main

func main() {
	client := mqtt.NewClient(&mqtt.ConnDialer{
		Address: "tcp://127.0.0.1:1883",
	}, mqtt.ClientConfig{
		OnConnected: func(c *mqtt.Client, ack *packets.Connack) {
			fmt.Println(ack.ReasonCode)
		},
	})
	pkt := packets.NewConnect("client_id", "username", []byte("password"))
	err := client.StartConnect(context.Background(), pkt)
}

一个基本的聊天示例请查看 https://github.com/lynnplus/go-mqtt/blob/master/examples/chat/main.go

Features

Dialer

The package provides a dialer that implements tcp and tls by default. If the user needs other connection protocol support, such as websocket, the Dialer interface provided in the package can be implemented.

MQTTv5 Enhanced authentication
Re-connect

...

Router

...

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidArguments = errors.New("invalid argument")
	ErrNotConnected     = errors.New("client not connected")
)
View Source
var ErrPacketIdentifiersExhausted = errors.New("packet identifiers exhausted")
View Source
var ErrPongTimeout = errors.New("pong timeout")

Functions

func RegisterPayloadUnmarshaler

func RegisterPayloadUnmarshaler(contentType string, unmarshaler PayloadUnmarshaler)

func UnmarshalPayload

func UnmarshalPayload(contentType string, body []byte, v any) error

func UnmarshalToString

func UnmarshalToString(body []byte, v any) error

Types

type Auther

type Auther interface {
	Authenticate(clientId string, auth *packets.Auth) (*packets.Auth, error)
	Authenticated(clientId string, ack *packets.Connack) error
}

Auther is the interface that implements the enhanced identity authentication flow in the mqtt5 protocol.

The general flow is that the user carries authentication data in the Connect packet. After the server authenticates, if it needs to continue to authenticate, it will send back an Auth packet with the reason code Continue Authentication. The client starts the next step of authentication and sends it to the server again until Server sends Connack. Authenticated may be called multiple times in succession between Connect and Connack packets.

An example of identity authentication based on Scram sha256:

import "github.com/xdg-go/scram"

type ScramAuth struct {
	conv   *scram.ClientConversation
	method string
}

 func (s *ScramAuth) New(username, password string, hash crypto.Hash, methodName string) ([]byte, error) {
 	c, err := scram.HashGeneratorFcn(hash.New).NewClient(username, password, "")
 	if err != nil {
 		return nil, err
 	}
	s.conv = c.NewConversation()
	s.method = methodName
	resp, err := s.conv.Step("")
	return []byte(resp), err
}

func (s *ScramAuth) Authenticate(clientId string, auth *packets.Auth) (*packets.Auth, error) {
	data, err := s.conv.Step(string(auth.Properties.AuthData))
	if err != nil {
		return nil, err
	}
	resp := packets.NewAuthWith(packets.ContinueAuthentication, s.method, []byte(data))
	return resp, nil
}

func (s *ScramAuth) Authenticated(clientId string, ack *packets.Connack) error {
	if ack.Properties == nil || ack.Properties.AuthMethod == "" {
		return nil
	}
	_, err := s.conv.Step(string(ack.Properties.AuthData))
	return err
}
 func main(){
	user:="user"
	password:="password"
	method := "SCRAM-SHA-256"
	auth := &ScramAuth{}
	data, err := auth.New(user, password, crypto.SHA256, method)
	if err != nil {
		panic(err)
	}
	client := mqtt.NewClient(&mqtt.ConnDialer{
		Address: "tcp://127.0.0.1:1883",
	}, mqtt.ClientConfig{ Auther: auth, })
	pkt := packets.NewConnect("client_id", "", nil)
	pkt.Properties = &packets.ConnProperties{
		AuthMethod: method,
		AuthData:   data,
	}
	client.Connect(context.Background(), pkt)
}

Note: It is not recommended to enable disconnection retry and enhanced authentication at the same time, because unexpected problems may occur during authentication processes such as scram.

type AutoReConnector

type AutoReConnector struct {
	AutoReconnect bool //Whether to automatically reconnect after the connection is lost
	ConnectRetry  bool //Whether to retry after the first connection failure
	MaxRetryDelay time.Duration
	// contains filtered or unexported fields
}

func NewAutoReConnector

func NewAutoReConnector() *AutoReConnector

func (*AutoReConnector) ConnectionFailure

func (a *AutoReConnector) ConnectionFailure(dialer Dialer, err error) (Dialer, *time.Timer)

func (*AutoReConnector) ConnectionLost

func (a *AutoReConnector) ConnectionLost(pkt *packets.Disconnect, err error) *time.Timer

func (*AutoReConnector) Reset

func (a *AutoReConnector) Reset()

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(dialer Dialer, config ClientConfig) *Client

func (*Client) Connect

func (c *Client) Connect(ctx context.Context, pkt *packets.Connect) (*packets.Connack, error)

func (*Client) Disconnect

func (c *Client) Disconnect() error

Disconnect is used to send Disconnect data packets to the MQTT server. The data packets use reason code 0. Regardless of whether it is sent successfully or not, the connection will be disconnected and no reconnection attempt will be made.

func (*Client) DisconnectWith

func (c *Client) DisconnectWith(pkt *packets.Disconnect) error

DisconnectWith is used to send Disconnect data packets to the MQTT server. Regardless of whether it is sent successfully or not, the connection will be disconnected and no reconnection attempt will be made.

func (*Client) IsConnected

func (c *Client) IsConnected() bool

IsConnected returns whether the client is connected

func (*Client) PublishNR

func (c *Client) PublishNR(ctx context.Context, pkt *packets.Publish) error

PublishNR is used to send a packet without ack response to the server, and its qos will be forced to 0

func (*Client) SendPing

func (c *Client) SendPing(ctx context.Context) error

func (*Client) StartConnect

func (c *Client) StartConnect(ctx context.Context, pkt *packets.Connect) error

StartConnect

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context, pkt *packets.Subscribe) (*packets.Suback, error)

Subscribe sends a subscription message to the server, blocking and waiting for the server to respond to Suback or a timeout occurs. The function will return the server's response(packets.Suback) and any errors.

Note: that the function does not check the error code inside the Suback.

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(ctx context.Context, pkt *packets.Unsubscribe) (*packets.Unsuback, error)

type ClientConfig

type ClientConfig struct {
	ManualACK bool
	Logger    Logger
	Pinger    Pinger
	Router    Router
	Auther    Auther
	ClientListener
	Session SessionState
	//packet send timeout
	PacketTimeout  time.Duration
	ReConnector    ReConnector
	ConnectTimeout time.Duration
}

type ClientListener

type ClientListener struct {
	OnConnected      func(client *Client, ack *packets.Connack)
	OnConnectionLost func(client *Client, err error)
	// OnConnectFailed is called only when dialing fails or the server refuses the connection
	OnConnectFailed func(client *Client, err error)
	// OnServerDisconnect is called only when a packets.DISCONNECT is received from server
	OnServerDisconnect func(client *Client, pkt *packets.Disconnect)
	OnClientError      func(client *Client, err error)
}

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

type ConnDialer

type ConnDialer struct {
	Address   string
	TLSConfig *tls.Config
}

func (*ConnDialer) Dial

func (c *ConnDialer) Dial(ctx context.Context, timeout time.Duration) (net.Conn, error)

type ConnState

type ConnState struct {
	// contains filtered or unexported fields
}

func (*ConnState) CompareAndSwap

func (x *ConnState) CompareAndSwap(old, new ConnStatus) (swapped bool)

func (*ConnState) IsConnected

func (x *ConnState) IsConnected() bool

func (*ConnState) Load

func (x *ConnState) Load() ConnStatus

func (*ConnState) Store

func (x *ConnState) Store(val ConnStatus)

func (*ConnState) String

func (x *ConnState) String() string

func (*ConnState) Swap

func (x *ConnState) Swap(new ConnStatus) (old ConnStatus)

type ConnStatus

type ConnStatus int32
const (
	StatusNone ConnStatus = iota
	StatusConnecting
	StatusConnected
	StatusDisconnecting
)

func (ConnStatus) String

func (c ConnStatus) String() string

type Context

type Context interface {
	Topic() string
	PacketID() packets.PacketID
	SubscriptionID() int
}

type DefaultPinger

type DefaultPinger struct {
	// contains filtered or unexported fields
}

func NewDefaultPinger

func NewDefaultPinger() *DefaultPinger

func (*DefaultPinger) Ping

func (p *DefaultPinger) Ping()

func (*DefaultPinger) Pong

func (p *DefaultPinger) Pong()

func (*DefaultPinger) Run

func (p *DefaultPinger) Run(ctx context.Context, keepAlive time.Duration, c *Client) error

type DefaultRouter

type DefaultRouter struct{}

func (*DefaultRouter) Route

func (d *DefaultRouter) Route(ctx Context)

type DefaultSession

type DefaultSession struct {
	// contains filtered or unexported fields
}

func NewDefaultSession

func NewDefaultSession() *DefaultSession

func (*DefaultSession) ConnectionCompleted

func (s *DefaultSession) ConnectionCompleted(conn *packets.Connect, ack *packets.Connack) error

func (*DefaultSession) ConnectionLost

func (s *DefaultSession) ConnectionLost(dp *packets.Disconnect) error

func (*DefaultSession) ResponsePacket

func (s *DefaultSession) ResponsePacket(pkt packets.Packet) error

func (*DefaultSession) RevokePacket

func (s *DefaultSession) RevokePacket(pkt packets.Packet) error

func (*DefaultSession) SubmitPacket

func (s *DefaultSession) SubmitPacket(pkt IdentifiablePacket) (<-chan packets.Packet, error)

type Dialer

type Dialer interface {
	Dial(ctx context.Context, timeout time.Duration) (net.Conn, error)
}

type EmptyLogger

type EmptyLogger struct {
}

func (*EmptyLogger) Debug

func (e *EmptyLogger) Debug(format string, args ...any)

func (*EmptyLogger) Error

func (e *EmptyLogger) Error(format string, args ...any)

type HookName

type HookName string
const (
	HookConnect     HookName = "connect"
	HookPublish     HookName = "publish"
	HookSubscribe   HookName = "subscribe"
	HookUnsubscribe HookName = "unsubscribe"
	HookDisconnect  HookName = "disconnect"
)

type IdentifiablePacket

type IdentifiablePacket interface {
	packets.Packet
	SetID(id packets.PacketID)
}

type Logger

type Logger interface {
	Debug(format string, args ...any)
	Error(format string, args ...any)
}

type MessageHandler

type MessageHandler func(ctx Context)

type PayloadUnmarshaler

type PayloadUnmarshaler func(body []byte, v any) error

type Pinger

type Pinger interface {
	Run(ctx context.Context, keepAlive time.Duration, c *Client) error
	// Ping is called when a packet is sent to the server.
	// If the packet is packets.PINGREQ, it will not be called
	Ping()
	// Pong is called when a packets.PINGRESP packet is received from the server
	Pong()
}

type ReConnector

type ReConnector interface {
	// ConnectionLost is called when the connection is lost or the server sends a disconnection packet.
	// It returns a timer to indicate when the client should reinitiate the connection.
	// If the timer is empty, it means that no reconnection will be initiated.
	ConnectionLost(pkt *packets.Disconnect, err error) *time.Timer
	// ConnectionFailure is called when the connection fails to be established.
	ConnectionFailure(dialer Dialer, err error) (Dialer, *time.Timer)
	// Reset is called when the connection is successful or initialization is required.
	Reset()
}

ReConnector is an interface that implements the client disconnection and reconnection flow.

After the client successfully establishes a connection, if the connection is lost, Connection Lost will first be called to reconnect. If the connection fails to be established during the reconstruction process, Connection Failure will be called to retry the flow.

type RequestedPacket

type RequestedPacket struct {
	PacketType packets.PacketType
	Response   chan packets.Packet
	Consumed   bool
}

type Router

type Router interface {
	Route(ctx Context)
}

type ServerProperties

type ServerProperties struct {
	MaximumPacketSize    uint32
	ReceiveMaximum       uint16
	TopicAliasMaximum    uint16
	MaximumQoS           byte
	RetainAvailable      bool
	WildcardSubAvailable bool
	SubIDAvailable       bool
	SharedSubAvailable   bool
}

ServerProperties is a struct that holds server properties

func NewServerProperties

func NewServerProperties() *ServerProperties

func (*ServerProperties) ReconfigureFromResponse

func (s *ServerProperties) ReconfigureFromResponse(resp *packets.Connack)

type SessionState

type SessionState interface {
	ConnectionCompleted(conn *packets.Connect, ack *packets.Connack) error
	ConnectionLost(dp *packets.Disconnect) error

	// SubmitPacket submits a data packet to the session state and assigns an available id to the data packet
	SubmitPacket(pkt IdentifiablePacket) (<-chan packets.Packet, error)
	// RevokePacket revokes a submitted packet from the session state.
	// It only supports the revocation of limited packet types.
	RevokePacket(pkt packets.Packet) error
	ResponsePacket(pkt packets.Packet) error
}

type SessionStore

type SessionStore interface {
}

type Trigger

type Trigger struct{}

Directories

Path Synopsis
examples
Package packets implements the data packet type definition of mqtt v3.1.1 and mqtt v5.0 as well as the reader and writer of data packets
Package packets implements the data packet type definition of mqtt v3.1.1 and mqtt v5.0 as well as the reader and writer of data packets

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL