Documentation
¶
Index ¶
- Variables
- func RegisterPayloadUnmarshaler(contentType string, unmarshaler PayloadUnmarshaler)
- func UnmarshalPayload(contentType string, body []byte, v any) error
- func UnmarshalToString(body []byte, v any) error
- type Auther
- type AutoReConnector
- type Client
- func (c *Client) Connect(ctx context.Context, pkt *packets.Connect) (*packets.Connack, error)
- func (c *Client) Disconnect() error
- func (c *Client) DisconnectWith(pkt *packets.Disconnect) error
- func (c *Client) IsConnected() bool
- func (c *Client) PublishNR(ctx context.Context, pkt *packets.Publish) error
- func (c *Client) SendPing(ctx context.Context) error
- func (c *Client) StartConnect(ctx context.Context, pkt *packets.Connect) error
- func (c *Client) Subscribe(ctx context.Context, pkt *packets.Subscribe) (*packets.Suback, error)
- func (c *Client) Unsubscribe(ctx context.Context, pkt *packets.Unsubscribe) (*packets.Unsuback, error)
- type ClientConfig
- type ClientListener
- type Conn
- type ConnDialer
- type ConnState
- type ConnStatus
- type Context
- type DefaultPinger
- type DefaultRouter
- type DefaultSession
- func (s *DefaultSession) ConnectionCompleted(conn *packets.Connect, ack *packets.Connack) error
- func (s *DefaultSession) ConnectionLost(dp *packets.Disconnect) error
- func (s *DefaultSession) ResponsePacket(pkt packets.Packet) error
- func (s *DefaultSession) RevokePacket(pkt packets.Packet) error
- func (s *DefaultSession) SubmitPacket(pkt IdentifiablePacket) (<-chan packets.Packet, error)
- type Dialer
- type EmptyLogger
- type HookName
- type IdentifiablePacket
- type Logger
- type MessageHandler
- type PayloadUnmarshaler
- type Pinger
- type ReConnector
- type RequestedPacket
- type Router
- type ServerProperties
- type SessionState
- type SessionStore
- type Trigger
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidArguments = errors.New("invalid argument") ErrNotConnected = errors.New("client not connected") )
var ErrPacketIdentifiersExhausted = errors.New("packet identifiers exhausted")
var ErrPongTimeout = errors.New("pong timeout")
Functions ¶
func RegisterPayloadUnmarshaler ¶
func RegisterPayloadUnmarshaler(contentType string, unmarshaler PayloadUnmarshaler)
func UnmarshalToString ¶
Types ¶
type Auther ¶
type Auther interface { Authenticate(clientId string, auth *packets.Auth) (*packets.Auth, error) Authenticated(clientId string, ack *packets.Connack) error }
Auther is the interface that implements the enhanced identity authentication flow in the mqtt5 protocol.
The general flow is that the user carries authentication data in the Connect packet. After the server authenticates, if it needs to continue to authenticate, it will send back an Auth packet with the reason code Continue Authentication. The client starts the next step of authentication and sends it to the server again until Server sends Connack. Authenticated may be called multiple times in succession between Connect and Connack packets.
An example of identity authentication based on Scram sha256:
import "github.com/xdg-go/scram" type ScramAuth struct { conv *scram.ClientConversation method string } func (s *ScramAuth) New(username, password string, hash crypto.Hash, methodName string) ([]byte, error) { c, err := scram.HashGeneratorFcn(hash.New).NewClient(username, password, "") if err != nil { return nil, err } s.conv = c.NewConversation() s.method = methodName resp, err := s.conv.Step("") return []byte(resp), err } func (s *ScramAuth) Authenticate(clientId string, auth *packets.Auth) (*packets.Auth, error) { data, err := s.conv.Step(string(auth.Properties.AuthData)) if err != nil { return nil, err } resp := packets.NewAuthWith(packets.ContinueAuthentication, s.method, []byte(data)) return resp, nil } func (s *ScramAuth) Authenticated(clientId string, ack *packets.Connack) error { if ack.Properties == nil || ack.Properties.AuthMethod == "" { return nil } _, err := s.conv.Step(string(ack.Properties.AuthData)) return err } func main(){ user:="user" password:="password" method := "SCRAM-SHA-256" auth := &ScramAuth{} data, err := auth.New(user, password, crypto.SHA256, method) if err != nil { panic(err) } client := mqtt.NewClient(&mqtt.ConnDialer{ Address: "tcp://127.0.0.1:1883", }, mqtt.ClientConfig{ Auther: auth, }) pkt := packets.NewConnect("client_id", "", nil) pkt.Properties = &packets.ConnProperties{ AuthMethod: method, AuthData: data, } client.Connect(context.Background(), pkt) }
Note: It is not recommended to enable disconnection retry and enhanced authentication at the same time, because unexpected problems may occur during authentication processes such as scram.
type AutoReConnector ¶
type AutoReConnector struct { AutoReconnect bool //Whether to automatically reconnect after the connection is lost ConnectRetry bool //Whether to retry after the first connection failure MaxRetryDelay time.Duration // contains filtered or unexported fields }
func NewAutoReConnector ¶
func NewAutoReConnector() *AutoReConnector
func (*AutoReConnector) ConnectionFailure ¶
func (*AutoReConnector) ConnectionLost ¶
func (a *AutoReConnector) ConnectionLost(pkt *packets.Disconnect, err error) *time.Timer
func (*AutoReConnector) Reset ¶
func (a *AutoReConnector) Reset()
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(dialer Dialer, config ClientConfig) *Client
func (*Client) Disconnect ¶
Disconnect is used to send Disconnect data packets to the MQTT server. The data packets use reason code 0. Regardless of whether it is sent successfully or not, the connection will be disconnected and no reconnection attempt will be made.
func (*Client) DisconnectWith ¶
func (c *Client) DisconnectWith(pkt *packets.Disconnect) error
DisconnectWith is used to send Disconnect data packets to the MQTT server. Regardless of whether it is sent successfully or not, the connection will be disconnected and no reconnection attempt will be made.
func (*Client) IsConnected ¶
IsConnected returns whether the client is connected
func (*Client) PublishNR ¶
PublishNR is used to send a packet without ack response to the server, and its qos will be forced to 0
func (*Client) StartConnect ¶
StartConnect
func (*Client) Subscribe ¶
Subscribe sends a subscription message to the server, blocking and waiting for the server to respond to Suback or a timeout occurs. The function will return the server's response(packets.Suback) and any errors.
Note: that the function does not check the error code inside the Suback.
func (*Client) Unsubscribe ¶
type ClientConfig ¶
type ClientConfig struct { ManualACK bool Logger Logger Pinger Pinger Router Router Auther Auther ClientListener Session SessionState //packet send timeout PacketTimeout time.Duration ReConnector ReConnector ConnectTimeout time.Duration }
type ClientListener ¶
type ClientListener struct { OnConnected func(client *Client, ack *packets.Connack) OnConnectionLost func(client *Client, err error) // OnConnectFailed is called only when dialing fails or the server refuses the connection OnConnectFailed func(client *Client, err error) // OnServerDisconnect is called only when a packets.DISCONNECT is received from server OnServerDisconnect func(client *Client, pkt *packets.Disconnect) OnClientError func(client *Client, err error) }
type ConnDialer ¶
type ConnState ¶
type ConnState struct {
// contains filtered or unexported fields
}
func (*ConnState) CompareAndSwap ¶
func (x *ConnState) CompareAndSwap(old, new ConnStatus) (swapped bool)
func (*ConnState) IsConnected ¶
func (*ConnState) Load ¶
func (x *ConnState) Load() ConnStatus
func (*ConnState) Store ¶
func (x *ConnState) Store(val ConnStatus)
func (*ConnState) Swap ¶
func (x *ConnState) Swap(new ConnStatus) (old ConnStatus)
type ConnStatus ¶
type ConnStatus int32
const ( StatusNone ConnStatus = iota StatusConnecting StatusConnected StatusDisconnecting )
func (ConnStatus) String ¶
func (c ConnStatus) String() string
type DefaultPinger ¶
type DefaultPinger struct {
// contains filtered or unexported fields
}
func NewDefaultPinger ¶
func NewDefaultPinger() *DefaultPinger
func (*DefaultPinger) Ping ¶
func (p *DefaultPinger) Ping()
func (*DefaultPinger) Pong ¶
func (p *DefaultPinger) Pong()
type DefaultRouter ¶
type DefaultRouter struct{}
func (*DefaultRouter) Route ¶
func (d *DefaultRouter) Route(ctx Context)
type DefaultSession ¶
type DefaultSession struct {
// contains filtered or unexported fields
}
func NewDefaultSession ¶
func NewDefaultSession() *DefaultSession
func (*DefaultSession) ConnectionCompleted ¶
func (*DefaultSession) ConnectionLost ¶
func (s *DefaultSession) ConnectionLost(dp *packets.Disconnect) error
func (*DefaultSession) ResponsePacket ¶
func (s *DefaultSession) ResponsePacket(pkt packets.Packet) error
func (*DefaultSession) RevokePacket ¶
func (s *DefaultSession) RevokePacket(pkt packets.Packet) error
func (*DefaultSession) SubmitPacket ¶
func (s *DefaultSession) SubmitPacket(pkt IdentifiablePacket) (<-chan packets.Packet, error)
type EmptyLogger ¶
type EmptyLogger struct { }
func (*EmptyLogger) Debug ¶
func (e *EmptyLogger) Debug(format string, args ...any)
func (*EmptyLogger) Error ¶
func (e *EmptyLogger) Error(format string, args ...any)
type IdentifiablePacket ¶
type MessageHandler ¶
type MessageHandler func(ctx Context)
type PayloadUnmarshaler ¶
type ReConnector ¶
type ReConnector interface { // ConnectionLost is called when the connection is lost or the server sends a disconnection packet. // It returns a timer to indicate when the client should reinitiate the connection. // If the timer is empty, it means that no reconnection will be initiated. ConnectionLost(pkt *packets.Disconnect, err error) *time.Timer // ConnectionFailure is called when the connection fails to be established. ConnectionFailure(dialer Dialer, err error) (Dialer, *time.Timer) // Reset is called when the connection is successful or initialization is required. Reset() }
ReConnector is an interface that implements the client disconnection and reconnection flow.
After the client successfully establishes a connection, if the connection is lost, Connection Lost will first be called to reconnect. If the connection fails to be established during the reconstruction process, Connection Failure will be called to retry the flow.
type RequestedPacket ¶
type RequestedPacket struct { PacketType packets.PacketType Response chan packets.Packet Consumed bool }
type ServerProperties ¶
type ServerProperties struct { MaximumPacketSize uint32 ReceiveMaximum uint16 TopicAliasMaximum uint16 MaximumQoS byte RetainAvailable bool WildcardSubAvailable bool SubIDAvailable bool }
ServerProperties is a struct that holds server properties
func NewServerProperties ¶
func NewServerProperties() *ServerProperties
func (*ServerProperties) ReconfigureFromResponse ¶
func (s *ServerProperties) ReconfigureFromResponse(resp *packets.Connack)
type SessionState ¶
type SessionState interface { ConnectionCompleted(conn *packets.Connect, ack *packets.Connack) error ConnectionLost(dp *packets.Disconnect) error // SubmitPacket submits a data packet to the session state and assigns an available id to the data packet SubmitPacket(pkt IdentifiablePacket) (<-chan packets.Packet, error) // RevokePacket revokes a submitted packet from the session state. // It only supports the revocation of limited packet types. RevokePacket(pkt packets.Packet) error ResponsePacket(pkt packets.Packet) error }
type SessionStore ¶
type SessionStore interface { }
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
examples
|
|
Package packets implements the data packet type definition of mqtt v3.1.1 and mqtt v5.0 as well as the reader and writer of data packets
|
Package packets implements the data packet type definition of mqtt v3.1.1 and mqtt v5.0 as well as the reader and writer of data packets |