mqtt

package module
v0.0.0-...-87f6d0e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2025 License: MIT Imports: 24 Imported by: 0

README

MQTT Server Implementation

项目概述

这是一个完整的MQTT服务器实现,支持MQTT v3.1.1和v5.0协议。

协议支持

MQTT v3.1.1 (OASIS Standard, 29 October 2014)
  • 完整的14种控制报文支持
  • 三种QoS等级:0(最多一次)、1(至少一次)、2(恰好一次)
  • 遗嘱消息、用户名密码认证
  • 会话持久化
MQTT v5.0 (OASIS Standard, 7 March 2019)
  • 在v3.1.1基础上增加了属性系统
  • 新增AUTH报文用于扩展认证
  • 支持会话过期间隔、接收最大值、最大报文长度等高级特性
  • 主题别名、订阅标识符等优化功能

代码注释规范

本项目已按照MQTT协议规范为所有代码添加了详细注释,包含:

1. 协议章节索引
  • 每个字段都标注了对应的协议章节号
  • 便于开发者快速查找协议原文
  • 例如:参考章节: 3.1.2.1 Protocol Name
2. 版本差异说明
  • 明确标注v3.1.1和v5.0的区别
  • 突出新增功能和行为变化
  • 帮助开发者理解协议演进
3. 字段详细说明
  • 位置:在报文中的具体位置
  • 类型:数据类型和编码方式
  • 含义:字段的具体作用
  • 约束:协议规定的限制条件
4. 行为差异说明
  • 不同版本协议的行为差异
  • 错误处理方式的变化
  • 兼容性注意事项

已注释的核心文件

packet/packet.go
  • MQTT控制报文通用接口
  • 报文类型分发逻辑
  • 版本差异说明
packet/0x0.fixed_header.go
  • 固定报头结构
  • 标志位验证规则
  • 剩余长度编码
packet/0x1.connect.go
  • CONNECT报文完整实现
  • 连接标志位解析
  • v5.0属性系统支持

使用示例

// 创建MQTT v5.0客户端连接
connect := &packet.CONNECT{
    FixedHeader: &packet.FixedHeader{Version: packet.VERSION500},
    ClientID: "test-client",
    KeepAlive: 60,
    Props: &packet.ConnectProperties{
        SessionExpiryInterval: 3600, // 1小时会话过期
        ReceiveMaximum: 100,         // 最大接收100条消息
    },
}

// 序列化报文
var buf bytes.Buffer
err := connect.Pack(&buf)

PUBACK报文结构:

graph TD
    A[PUBACK报文] --> B[固定报头 Fixed Header]
    A --> C[可变报头 Variable Header]
    A --> D[载荷 Payload]
    
    B --> B1[报文类型: 0x04]
    B --> B2[标志位: 必须为0]
    B --> B3[剩余长度]
    
    C --> C1[报文标识符 Packet ID]
    C --> C2[原因码 Reason Code - v5.0]
    C --> C3[属性 Properties - v5.0]
    
    D --> D1[无载荷]
    
    C2 --> C2a[0x00: 成功]
    C2 --> C2b[0x10: 无匹配订阅者]
    C2 --> C2c[0x80: 未指定错误]
    C2 --> C2d[0x83: 实现特定错误]
    
    C3 --> C3a[原因字符串]
    C3 --> C3b[用户属性]

开发指南

添加新功能
  1. 参考MQTT协议文档确定功能规范
  2. 在代码中添加详细的协议章节注释
  3. 说明v3.1.1和v5.0的差异
  4. 添加字段位置、类型、含义等说明
协议兼容性
  • 优先保证v3.1.1兼容性
  • v5.0功能作为可选扩展
  • 明确标注版本差异

协议文档参考

贡献指南

欢迎提交Issue和Pull Request。在贡献代码时,请:

  1. 遵循现有的注释规范
  2. 添加完整的协议章节索引
  3. 说明版本差异
  4. 包含字段的详细说明

许可证

本项目采用MIT许可证,详见LICENSE文件。

Documentation

Index

Constants

View Source
const (
	RESERVED    byte = 0x0
	CONNECT     byte = 0x1
	CONNACK     byte = 0x2
	PUBLISH     byte = 0x3
	PUBACK      byte = 0x4
	PUBREC      byte = 0x5
	PUBREL      byte = 0x6
	PUBCOMP     byte = 0x7
	SUBSCRIBE   byte = 0x8
	SUBACK      byte = 0x9
	UNSUBSCRIBE byte = 0xA
	UNSUBACK    byte = 0xB
	PINGREQ     byte = 0xC
	PINGRESP    byte = 0xD
	DISCONNECT  byte = 0xE
	AUTH        byte = 0xF
)

Control packet types. Position: byte 1, bits 7-4

Variables

View Source
var CONFIG = &config{
	Auth: map[string]string{
		"":     "",
		"root": "admin",
	},
}
View Source
var ErrAbortHandler = errors.New("mqtt: abort Handler")

ErrAbortHandler is a sentinel panic value to abort a handler. While any panic from ServeHTTP aborts the response to the client, panicking with ErrAbortHandler also suppresses logging of a stack trace to the server's error log.

View Source
var ErrServerClosed = errors.New("mqtt: Server closed")

ErrServerClosed is returned by the Server.Serve, [ServeTLS], [ListenAndServe], and [ListenAndServeTLS] methods after a call to Server.Shutdown or [Server.Close].

Functions

func Fedstart

func Fedstart(ctx context.Context, listen string, join string) error

func Httpd

func Httpd() error

func IN

func IN(x string, m ...string) bool

func ServerLog

func ServerLog(ctx context.Context, stat *requests.Stat)

Types

type Client

type Client struct {
	// URL specifies either the URI being requested (for server requests) or the URL to access (for client requests).
	//
	// For server requests, the URL is parsed from the URI supplied on the Request-Line as stored in RequestURI.
	// For most requests, fields other than Path and RawQuery will be empty. (See RFC 7230, Section 5.3)
	//
	// For client requests, the URL's Host specifies the server to
	// connect to, while the Request's Host field optionally
	// specifies the Host header value to send in the MQTT request.
	URL *url.URL

	// DialContext specifies the dial function for creating unencrypted TCP connections.
	// If DialContext is nil (and the deprecated Dial below is also nil), then the transport dials using package net.
	//
	// DialContext runs concurrently with calls to RoundTrip.
	// A RoundTrip call that initiates a dial may end up using
	// a connection dialed previously when the earlier connection
	// becomes idle before the later DialContext completes.
	DialContext func(ctx context.Context, network, addr string) (net.Conn, error)

	// DialTLSContext specifies an optional dial function for creating TLS connections for non-proxied HTTPS requests.
	//
	// If DialTLSContext is nil (and the deprecated DialTLS below is also nil), DialContext and TLSClientConfig are used.
	//
	// If DialTLSContext is set, the Dial and DialContext hooks are not used for HTTPS
	// requests and the TLSClientConfig and TLSHandshakeTimeout are ignored.
	// The returned net.Conn is assumed to already be past the TLS handshake.
	DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)

	// TLSClientConfig specifies the TLS configuration to use with tls.Client.
	// If nil, the default configuration is used.
	// If non-nil, HTTP/2 support may not be enabled by default.
	TLSClientConfig *tls.Config

	// TLSHandshakeTimeout specifies the maximum amount of time to wait for a TLS handshake. Zero means no timeout.
	TLSHandshakeTimeout time.Duration

	// Timeout specifies a time limit for requests made by this Client.
	// The timeout includes connection time, any redirects, and reading the response body.
	// The timer remains running after Get, Head, Post, or Do return and will interrupt reading of the Response.Body.
	//
	// A Timeout of zero means no timeout.
	//
	// The Client cancels requests to the underlying Transport as if the Request's Context ended.
	//
	// For compatibility, the Client will also use the deprecated CancelRequest method on Transport if found.
	// New RoundTripper implementations should use the Request's Context
	// for cancellation instead of implementing CancelRequest.
	Timeout time.Duration
	// contains filtered or unexported fields
}

A Client is an MQTT client. Its zero value ([DefaultClient]) is a usable client that uses [DefaultTransport].

The [Client.Transport] typically has internal state (cached TCP connections), so Clients should be reused instead of created as needed. Clients are safe for concurrent use by multiple goroutines.

A Client is higher-level than a [RoundTripper] (such as [Transport]) and additionally handles HTTP details such as cookies and redirects.

func New

func New(opts ...Option) *Client

func (*Client) Close

func (c *Client) Close() error

func (*Client) Connect

func (c *Client) Connect(ctx context.Context) error

func (*Client) ConnectAndSubscribe

func (c *Client) ConnectAndSubscribe(ctx context.Context) error

func (*Client) Disconnect

func (c *Client) Disconnect() error

func (*Client) ID

func (c *Client) ID() string

func (*Client) OnMessage

func (c *Client) OnMessage(fn func(*packet.Message))

func (*Client) RoundTrip

func (c *Client) RoundTrip(req packet.Packet) (packet.Packet, error)

RoundTrip implements the [RoundTripper] interface.

For higher-level HTTP client support (such as handling of cookies and redirects), see [Get], [Post], and the Client type.

Like the RoundTripper interface, the error types returned by RoundTrip are unspecified.

func (*Client) ServeMessage

func (c *Client) ServeMessage(ctx context.Context) error

func (*Client) ServeMessageLoop

func (c *Client) ServeMessageLoop(ctx context.Context) error

func (*Client) SubmitMessage

func (c *Client) SubmitMessage(message *packet.Message) error

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context) error

type ConnState

type ConnState int

A ConnState represents the state of a client connection to a server. It's used by the optional [Server.ConnState] hook.

const (
	// StateNew represents a new connection that is expected to
	// send a request immediately. Connections begin at this
	// state and then transition to either StateActive or
	// StateClosed.
	StateNew ConnState = iota

	// StateActive represents a connection that has read 1 or more
	// bytes of a request. The Server.ConnState hook for
	// StateActive fires before the request has entered a handler
	// and doesn't fire again until the request has been
	// handled. After the request is handled, the state
	// transitions to StateClosed, StateHijacked, or StateIdle.
	// For HTTP/2, StateActive fires on the transition from zero
	// to one active request, and only transitions away once all
	// active requests are complete. That means that ConnState
	// cannot be used to do per-request work; ConnState only notes
	// the overall state of the connection.
	StateActive

	// StateIdle represents a connection that has finished
	// handling a request and is in the keep-alive state, waiting
	// for a new request. Connections transition from StateIdle
	// to either StateActive or StateClosed.
	StateIdle

	// StateHijacked represents a hijacked connection.
	// This is a terminal state. It does not transition to StateClosed.
	StateHijacked

	// StateClosed represents a closed connection.
	// This is a terminal state. Hijacked connections do not
	// transition to StateClosed.
	StateClosed
)

type Endpoint

type Endpoint struct {
	// contains filtered or unexported fields
}

Endpoint 不提供事务能力,事务需要其他的上层协议来处理

func (*Endpoint) List

func (e *Endpoint) List() map[string]string

func (*Endpoint) Ping

func (e *Endpoint) Ping()

Ping 这里有个问题,如果一个节点退出后,立即加入集群,会存在之前的节点没有清理的问题 这里的目前解决办法就是,节点sleep5秒之后再重新进入集群

func (*Endpoint) Send

func (e *Endpoint) Send(content []byte) error

type Handler

type Handler interface {
	ServeMQTT(ResponseWriter, packet.Packet)
}

A Handler responds to an MQTT request.

type HandlerFunc

type HandlerFunc func(ResponseWriter, packet.Packet)

func (HandlerFunc) ServeMQTT

func (f HandlerFunc) ServeMQTT(rw ResponseWriter, r packet.Packet)

type InFight

type InFight struct {
	// contains filtered or unexported fields
}

func (*InFight) Get

func (i *InFight) Get(id uint16) (*packet.PUBLISH, bool)

func (*InFight) Put

func (i *InFight) Put(pkt *packet.PUBLISH) bool

type Listen

type Listen struct {
	URL      string `yaml:"url"`
	CertFile string `yaml:"certFile"`
	KeyFile  string `yaml:"keyFile"`
}

type MemorySubscribed

type MemorySubscribed struct {
	// contains filtered or unexported fields
}

func NewMemorySubscribed

func NewMemorySubscribed(s *Server) *MemorySubscribed

func (*MemorySubscribed) CleanEmptyTopic

func (m *MemorySubscribed) CleanEmptyTopic()

func (*MemorySubscribed) Print

func (m *MemorySubscribed) Print()

func (*MemorySubscribed) Publish

func (m *MemorySubscribed) Publish(message *packet.Message, props *packet.PublishProperties) error

Publish 发布消息,如果是新topic需要额外处理存量connect订阅列表的构建

func (*MemorySubscribed) Subscribe

func (m *MemorySubscribed) Subscribe(c *conn)

func (*MemorySubscribed) Unsubscribe

func (m *MemorySubscribed) Unsubscribe(c *conn)

type Option

type Option func(*Options)

func Subscription

func Subscription(subscription ...packet.Subscription) Option

func URL

func URL(url string) Option

func Version

func Version[T ~string | ~byte](version T) Option

type Options

type Options struct {
	URL           string // client used
	ClientID      string
	Version       byte
	Subscriptions []packet.Subscription
}

type ResponseWriter

type ResponseWriter interface {
	OnSend(request packet.Packet) error
}

type Server

type Server struct {
	Handler          Handler
	WebsocketHandler websocket.Handler

	// TLSConfig optionally provides a TLS configuration for use
	// by ServeTLS and ListenAndServeTLS. Note that this value is
	// cloned by ServeTLS and ListenAndServeTLS, so it's not
	// possible to modify the configuration with methods like
	// tls.Config.SetSessionTicketKeys. To use
	// SetSessionTicketKeys, use Server.Serve with a TLS Listener
	// instead.
	TLSConfig *tls.Config

	// ConnState specifies an optional callback function that is
	// called when a client connection changes state. See the
	// ConnState type and associated constants for details.
	ConnState func(net.Conn, ConnState)

	// ConnContext optionally specifies a function that modifies
	// the context used for a new connection c. The provided ctx
	// is derived from the base context and has a ServerContextKey
	// value.
	ConnContext func(ctx context.Context, c net.Conn) context.Context
	// contains filtered or unexported fields
}

A Server defines parameters for running an HTTP server. The zero value for Server is a valid configuration.

func NewServer

func NewServer(ctx context.Context) *Server

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(opts ...Option) error

func (*Server) ListenAndServeTLS

func (s *Server) ListenAndServeTLS(certFile, keyFile string, opts ...Option) error

func (*Server) ListenAndServeWebsocket

func (s *Server) ListenAndServeWebsocket(opts ...Option) error

ListenAndServeWebsocket 启动基于 WebSocket 的 MQTT 服务 (ws)

func (*Server) ListenAndServeWebsocketTLS

func (s *Server) ListenAndServeWebsocketTLS(certFile, keyFile string, opts ...Option) error

ListenAndServeWebsocketTLS 启动基于 TLS 的 WebSocket 服务 (wss)

func (*Server) Serve

func (s *Server) Serve(l net.Listener) error

Serve accepts incoming connections on the Listener l, creating a new service goroutine for each. The service goroutines read requests and then call srv.Handler to reply to them.

HTTP/2 support is only enabled if the Listener returns *tls.Conn connections. and they were configured with "h2" in the TLS Config.NextProtos.

Serve always returns a non-nil error and closes l. After Server.Shutdown or [Server.Close], the returned error is ErrServerClosed.

func (*Server) ServeTLS

func (s *Server) ServeTLS(l net.Listener, certFile, keyFile string) error

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

type Stat

type Stat struct {
	Uptime            prometheus.Counter
	ActiveConnections prometheus.Gauge
	PacketReceived    prometheus.Counter
	ByteReceived      prometheus.Counter
	PacketSent        prometheus.Counter
	ByteSent          prometheus.Counter
}

func (*Stat) RefreshUptime

func (s *Stat) RefreshUptime()

func (*Stat) Register

func (s *Stat) Register()

type TopicSubscribed

type TopicSubscribed struct {
	TopicName string
	// contains filtered or unexported fields
}

TopicSubscribed 用来存储当前topic有哪些客户端订阅了

func NewTopicSubscribed

func NewTopicSubscribed(topicName string) *TopicSubscribed

func (*TopicSubscribed) Exchange

func (s *TopicSubscribed) Exchange(message *packet.Message, props *packet.PublishProperties) error

func (*TopicSubscribed) Len

func (s *TopicSubscribed) Len() int

func (*TopicSubscribed) Subscribe

func (s *TopicSubscribed) Subscribe(c *conn)

func (*TopicSubscribed) Unsubscribe

func (s *TopicSubscribed) Unsubscribe(c *conn) int

Directories

Path Synopsis
cmd
benchmark command
http-beanch command
mqtt-client command
mqtt-server command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL