gomqtt

package module
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2025 License: MIT Imports: 10 Imported by: 0

README

GoMQTT - 高性能MQTT客户端库

GoMQTT是一个基于Go语言开发的高性能MQTT客户端库,提供了连接池管理、异步发布、批量发布、智能订阅等高级功能,适用于物联网、消息队列等场景。

✨ 特性

  • 🔄 连接池管理 - 支持多连接复用,提高并发性能
  • 异步发布 - 非阻塞消息发布,提升吞吐量
  • 📦 批量发布 - 批量消息发送,减少网络开销
  • 🔔 智能订阅 - 自动重试和重连订阅机制
  • 🔒 SSL/TLS支持 - 安全的加密通信
  • ⏱️ 超时控制 - 完善的超时和错误处理机制
  • 📊 性能优化 - 连接配置优化,支持高并发场景

🚀 快速开始

安装
go get gitee.com/xcode-zone/gomqtt
基本使用示例
package main

import (
	"fmt"
	"log"
	"time"

	"gitee.com/xcode-zone/gomqtt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
	// 配置MQTT连接
	config := &gomqtt.Config{
		Broker:   "tcp://127.0.0.1:1883",
		Username: "user",
		Password: "pass",
		ClientID: "test_client",
	}

	// 获取客户端选项
	opts, err := gomqtt.GetClientOptions(config)
	if err != nil {
		log.Fatal(err)
	}

	// 初始化客户端
	err = gomqtt.Init(opts)
	if err != nil {
		log.Fatal(err)
	}

	// 订阅消息
	gomqtt.Subscribe(gomqtt.SubscribeType{
		Topic: "test/topic",
		Qos:   1,
		Callback: func(client mqtt.Client, msg mqtt.Message) {
			fmt.Printf("收到消息: %s\n", string(msg.Payload()))
		},
	})

	// 发布消息
	err = gomqtt.Publish("test/topic", "Hello MQTT", 1, false)
	if err != nil {
		log.Fatal(err)
	}

	// 等待消息处理
	time.Sleep(2 * time.Second)
}

📚 API文档

配置结构体
Config
type Config struct {
	Broker     string // Broker地址,例如 tcp://127.0.0.1:1883 或 ssl://127.0.0.1:8883
	Username   string // 用户名,可选
	Password   string // 密码,可选
	CACert     string // CA证书路径,SSL连接时使用
	ClientCert string // 客户端证书路径,双向认证时使用
	ClientKey  string // 客户端私钥路径,双向认证时使用
	ClientID   string // 客户端ID,可选(自动生成)
}
核心函数
GetClientOptions
func GetClientOptions(conf *Config) (*mqtt.ClientOptions, error)

根据配置创建MQTT客户端选项,包含性能优化设置。

Init
func Init(opts *mqtt.ClientOptions) error

初始化MQTT客户端连接。

发布消息接口
Publish
func Publish(topic string, payload interface{}, qos byte, retained bool) error

同步发布消息,默认5秒超时。

PublishWithTimeout
func PublishWithTimeout(topic string, payload interface{}, qos byte, retained bool, timeout time.Duration) error

带自定义超时的同步发布消息。

PublishAsync
func PublishAsync(topic string, payload interface{}, qos byte, retained bool)

异步发布消息,不等待确认。

批量发布器
BatchPublisher
type BatchPublisher struct {
	// 内部消息队列
}

func NewBatchPublisher() *BatchPublisher
func (bp *BatchPublisher) AddMessage(topic string, payload interface{}, qos byte, retained bool)
func (bp *BatchPublisher) PublishBatch() error
func (bp *BatchPublisher) PublishBatchAsync()

使用示例:

bp := gomqtt.NewBatchPublisher()
bp.AddMessage("topic1", "message1", 1, false)
bp.AddMessage("topic2", "message2", 1, false)
err := bp.PublishBatch() // 同步批量发布
// 或 bp.PublishBatchAsync() // 异步批量发布
订阅消息
SubscribeType
type SubscribeType struct {
	Topic      string
	Qos        byte
	Callback   mqtt.MessageHandler
	RetryTimes int // 重试次数,0表示无限重试
}
Subscribe
func Subscribe(item SubscribeType)

注册消息订阅。

ResubscribeAll
func ResubscribeAll()

重新订阅所有已注册的主题。

连接管理
GetClient
func GetClient(opts *mqtt.ClientOptions) (mqtt.Client, error)

获取MQTT客户端(支持连接池)。

CloseClient
func CloseClient(clientID string)

关闭指定客户端连接。

CloseAllClients
func CloseAllClients()

关闭所有客户端连接。

🔧 高级用法

SSL/TLS连接示例
config := &gomqtt.Config{
	Broker:     "ssl://mqtt.example.com:8883",
	CACert:     "/path/to/ca.crt",
	ClientCert: "/path/to/client.crt", // 可选,双向认证
	ClientKey:  "/path/to/client.key", // 可选,双向认证
}
异步发布示例
// 高性能场景使用异步发布
for i := 0; i < 1000; i++ {
	gomqtt.PublishAsync("sensor/data", fmt.Sprintf("data-%d", i), 0, false)
}
批量发布示例
bp := gomqtt.NewBatchPublisher()

// 收集一批消息
for i := 0; i < 100; i++ {
	bp.AddMessage("batch/topic", fmt.Sprintf("message-%d", i), 1, false)
}

// 一次性批量发布
err := bp.PublishBatch()
if err != nil {
	log.Printf("批量发布失败: %v", err)
}
连接池使用示例
// 多个goroutine共享连接池
for i := 0; i < 10; i++ {
	go func(id int) {
		config := &gomqtt.Config{
			Broker:   "tcp://127.0.0.1:1883",
			ClientID: fmt.Sprintf("client_%d", id),
		}
		
		opts, _ := gomqtt.GetClientOptions(config)
		client, _ := gomqtt.GetClient(opts) // 复用连接
		
		// 使用client进行发布/订阅操作
	}(i)
}

🚨 错误处理

超时错误
err := gomqtt.PublishWithTimeout("topic", "data", 1, false, 10*time.Second)
if err != nil {
	if errors.Is(err, gomqtt.ErrTimeout) {
		log.Println("发布超时")
	} else {
		log.Printf("发布失败: %v", err)
	}
}
连接错误处理
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
	log.Printf("连接丢失: %v", err)
	// 自动重连逻辑
})

📊 性能基准测试

发布性能测试
func BenchmarkPublish(b *testing.B) {
	// 初始化客户端...
	
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		gomqtt.PublishAsync("benchmark", "test", 0, false)
	}
}
批量发布性能测试
func BenchmarkBatchPublish(b *testing.B) {
	// 初始化客户端...
	
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		bp := gomqtt.NewBatchPublisher()
		for j := 0; j < 100; j++ {
			bp.AddMessage("batch", "test", 0, false)
		}
		bp.PublishBatchAsync()
	}
}

🔒 安全最佳实践

  1. 使用SSL/TLS加密:生产环境务必使用加密连接
  2. 认证信息保护:避免硬编码用户名密码
  3. 客户端ID管理:使用有意义的客户端ID便于监控
  4. 连接超时设置:合理设置连接和发布超时时间
  5. 错误日志记录:完善的错误处理和日志记录

🤝 贡献

欢迎提交Issue和Pull Request来改进这个项目。

📄 许可证

本项目基于MIT许可证开源,详见LICENSE文件。

📞 支持

如有问题请提交Issue或联系维护者。

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrTimeout = errors.New("operation timeout")

自定义错误

Functions

func CloseAllClients added in v1.2.0

func CloseAllClients()

CloseAllClients 关闭所有连接

func CloseClient added in v1.2.0

func CloseClient(clientID string)

CloseClient 关闭指定连接

func GetClient

func GetClient(opts *mqtt.ClientOptions) (client mqtt.Client, err error)

GetClient 获取MQTT连接(支持连接池)

func GetClientOptions

func GetClientOptions(conf *Config) (*mqtt.ClientOptions, error)

GetClientOptions 获取MQTT连接配置项

func Init

func Init(opts *mqtt.ClientOptions) (err error)

Init 初始化监听器

func OnConnectHandler added in v1.2.0

func OnConnectHandler(handler mqtt.OnConnectHandler) mqtt.OnConnectHandler

OnConnectHandler 连接上服务器的操作

func Publish

func Publish(topic string, payload any, qos byte, retained bool) (err error)

Publish 通用发布消息接口(异步优化)

func PublishAsync added in v1.2.0

func PublishAsync(topic string, payload any, qos byte, retained bool)

PublishAsync 异步发布消息(不等待确认)

func PublishWithTimeout added in v1.2.0

func PublishWithTimeout(topic string, payload any, qos byte, retained bool, timeout time.Duration) (err error)

PublishWithTimeout 带超时的发布消息接口

func ResubscribeAll added in v1.2.0

func ResubscribeAll()

ResubscribeAll 重连时重新订阅所有主题(导出函数)

func Subscribe

func Subscribe(item SubscribeType)

Subscribe 注册订阅消息

Types

type BatchPublisher added in v1.2.0

type BatchPublisher struct {
	// contains filtered or unexported fields
}

BatchPublisher 批量发布器

func NewBatchPublisher added in v1.2.0

func NewBatchPublisher() *BatchPublisher

NewBatchPublisher 创建批量发布器

func (*BatchPublisher) AddMessage added in v1.2.0

func (bp *BatchPublisher) AddMessage(topic string, payload any, qos byte, retained bool)

AddMessage 添加消息到批量发布

func (*BatchPublisher) PublishBatch added in v1.2.0

func (bp *BatchPublisher) PublishBatch() error

PublishBatch 批量发布消息

func (*BatchPublisher) PublishBatchAsync added in v1.2.0

func (bp *BatchPublisher) PublishBatchAsync()

PublishBatchAsync 异步批量发布(不等待确认)

type ClientPool added in v1.2.0

type ClientPool struct {
	// contains filtered or unexported fields
}

type Config

type Config struct {
	Broker     string // Broker地址,例如tcp://127.0.0.1:1883或ssl://127.0.0.1:8883. 如果配置ssl,则必须配置CACert
	Username   string // 用户名,可选
	Password   string // 密码,可选
	CACert     string // CA证书,单向认证只需要配置此文件即可,无需ClientCert和ClientKey,可选
	ClientCert string // ClientCert,可选
	ClientKey  string // ClientKey,可选
	ClientID   string // ClientID,可选
}

Config MQTT的配置信息格式

type Message added in v1.2.0

type Message struct {
	Topic    string
	Payload  any
	Qos      byte
	Retained bool
}

Message 自定义消息结构

type SubscribeType

type SubscribeType struct {
	Topic      string
	Qos        byte
	Callback   mqtt.MessageHandler
	RetryTimes int // 为0表示无限重试
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL