rmqtt

package module
v0.0.0-...-47cb541 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2024 License: GPL-3.0 Imports: 13 Imported by: 0

README

rose-mqtt

Golang Toolkit for MQTT

Support

  • MQTT v3.1.1

Demo


Methods

Config
  • WithXXX
  • SetXXX
Publish
  • Publish()
Subscribe
  • Subscribe()
  • SubscribeMultiple()
  • RegisterConsumer()
  • RegisterConsumers()
  • RegisterMultipleConsumer()
  • RegisterMultipleConsumers()
  • RegisterOnlyTopic()

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AutoRetry

func AutoRetry(callback func() error, maxRetries int, interval time.Duration) (err error)

AutoRetry 失败后重试指定次数

func ConvOrderedArrayToMap

func ConvOrderedArrayToMap(array []TopicQosPair) map[string]QosLevel

ConvOrderedArrayToMap 将有序集合转换为 map

func IsStrEmpty

func IsStrEmpty(str string) bool

func JsonMarshal

func JsonMarshal(v interface{}) string

func JsonUnMarshal

func JsonUnMarshal(s string, v interface{}) error

func ReconnectManualHandler

func ReconnectManualHandler(client mqtt.Client, err error)

ReconnectManualHandler 手动实现自动重连机制

func SliceRemoveOne

func SliceRemoveOne(slice []string, value string) []string

func SliceRmvSubSlice

func SliceRmvSubSlice(source []string, remove []string) []string

func SubCallbackKey

func SubCallbackKey(topic string, qos QosLevel) string

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig(options ...Option) *Config

func (*Config) SetCleanSession

func (c *Config) SetCleanSession(clean bool) *Config

func (*Config) SetClientID

func (c *Config) SetClientID(id string) *Config

func (*Config) SetConnHandler

func (c *Config) SetConnHandler(handler mqtt.OnConnectHandler) *Config

func (*Config) SetConnLostHandler

func (c *Config) SetConnLostHandler(handler mqtt.ConnectionLostHandler) *Config

func (*Config) SetDefaultPublishHandler

func (c *Config) SetDefaultPublishHandler(handler mqtt.MessageHandler) *Config

func (*Config) SetKeepAlive

func (c *Config) SetKeepAlive(t time.Duration) *Config

func (*Config) SetKeepAliveSec

func (c *Config) SetKeepAliveSec(sec int64) *Config

func (*Config) SetReconnectType

func (c *Config) SetReconnectType(t ReConnType) *Config

func (*Config) SetTlsCaCertFile

func (c *Config) SetTlsCaCertFile(pem string) *Config

func (*Config) SetTlsClientCertFile

func (c *Config) SetTlsClientCertFile(cert string, key string) *Config

func (*Config) SetTlsConfig

func (c *Config) SetTlsConfig(verify bool) *Config

func (*Config) SetUserAndPwd

func (c *Config) SetUserAndPwd(userName, passWord string) *Config

func (*Config) SetWaitTimeout

func (c *Config) SetWaitTimeout(t time.Duration) *Config

func (*Config) SetWill

func (c *Config) SetWill(topic, payload string, qos QosLevel, retained bool) *Config

SetWill 设置遗嘱消息

func (*Config) SetWillByte

func (c *Config) SetWillByte(topic string, payload []byte, qos QosLevel, retained bool) *Config

SetWillByte 设置遗嘱消息

type Consumer

type Consumer struct {
	Topic    string
	QosType  QosLevel
	CallBack mqtt.MessageHandler
}

type MQTTClient

type MQTTClient struct {
	Ops *mqtt.ClientOptions
	// contains filtered or unexported fields
}

func NewMQTTClient

func NewMQTTClient(brokerURI string, cfg *Config) *MQTTClient

func (*MQTTClient) Close

func (c *MQTTClient) Close()

func (*MQTTClient) Connect

func (c *MQTTClient) Connect() (err error)

func (*MQTTClient) DefaultOnConnect

func (c *MQTTClient) DefaultOnConnect(cli mqtt.Client)

DefaultOnConnect 重连后自动注册订阅

func (*MQTTClient) Publish

func (c *MQTTClient) Publish(topic string, qos QosLevel, retained bool, payload interface{}) error

Publish 发布消息

func (*MQTTClient) PublishNoRt

func (c *MQTTClient) PublishNoRt(topic string, qos QosLevel, payload interface{}) error

PublishNoRt 发布非保留消息

func (*MQTTClient) PublishRt

func (c *MQTTClient) PublishRt(topic string, qos QosLevel, payload interface{}) error

PublishRt 发布保留消息

func (*MQTTClient) RegisterConsumer

func (c *MQTTClient) RegisterConsumer(consumer *Consumer)

func (*MQTTClient) RegisterConsumers

func (c *MQTTClient) RegisterConsumers(consumers []*Consumer)

RegisterConsumers 批量注册消费者

func (*MQTTClient) RegisterMultipleConsumer

func (c *MQTTClient) RegisterMultipleConsumer(mConsumer *MultipleConsumer)

func (*MQTTClient) RegisterMultipleConsumers

func (c *MQTTClient) RegisterMultipleConsumers(mConsumers []*MultipleConsumer)

func (*MQTTClient) RegisterOnlyTopic

func (c *MQTTClient) RegisterOnlyTopic(topic string, qos QosLevel)

func (*MQTTClient) Subscribe

func (c *MQTTClient) Subscribe(topic string, qos QosLevel, callback mqtt.MessageHandler) error

func (*MQTTClient) SubscribeMultiple

func (c *MQTTClient) SubscribeMultiple(topics map[string]QosLevel, callback mqtt.MessageHandler) error

func (*MQTTClient) UnSubscribe

func (c *MQTTClient) UnSubscribe(topics ...string) error

type MultipleConsumer

type MultipleConsumer struct {
	Topics   map[string]QosLevel
	CallBack mqtt.MessageHandler
}

type Option

type Option func(c *Config)

func WithCleanSession

func WithCleanSession(clean bool) Option

func WithClientID

func WithClientID(id string) Option

func WithDebug

func WithDebug(debug bool) Option

func WithUserAndPwd

func WithUserAndPwd(userName, passWord string) Option

func WithWaitTimeout

func WithWaitTimeout(t time.Duration) Option

func WithWaitTimeoutSec

func WithWaitTimeoutSec(sec int64) Option

type QosLevel

type QosLevel byte
const (
	Qos0 QosLevel = 0
	Qos1 QosLevel = 1
	Qos2 QosLevel = 2
)

type ReConnType

type ReConnType string

重试方式

const (
	ReConnTypeDefault   ReConnType = ""       // 默认
	ReConnTypeAutomatic ReConnType = "auto"   // 自动
	ReConnTypeManual    ReConnType = "manual" // 手动
)

type TopicQosPair

type TopicQosPair struct {
	Topic string
	Qos   QosLevel
}

func ConvMapToOrderedArray

func ConvMapToOrderedArray(m map[string]QosLevel) ([]string, []TopicQosPair)

ConvMapToOrderedArray 将 map 转换为有序集合

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL