Documentation
¶
Overview ¶
Package mqtt provides an MQTT v3.1 client library.
Index ¶
- Constants
- Variables
- func DefaultErrorHandler(client *MqttClient, reason error)
- type ClientOptions
- func (o *ClientOptions) AddBroker(server string) *ClientOptions
- func (opts *ClientOptions) SetBinaryWill(topic string, payload []byte, qos QoS, retained bool) *ClientOptions
- func (opts *ClientOptions) SetCleanSession(clean bool) *ClientOptions
- func (opts *ClientOptions) SetClientId(clientid string) *ClientOptions
- func (opts *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions
- func (opts *ClientOptions) SetKeepAlive(keepAlive uint) *ClientOptions
- func (opts *ClientOptions) SetOnConnectionLost(onLost OnConnectionLost) *ClientOptions
- func (opts *ClientOptions) SetOrderMatters(order bool) *ClientOptions
- func (opts *ClientOptions) SetPassword(password string) *ClientOptions
- func (opts *ClientOptions) SetProtocolVersion(v byte) *ClientOptions
- func (opts *ClientOptions) SetStore(store Store) *ClientOptions
- func (opts *ClientOptions) SetTlsConfig(tlsConfig *tls.Config) *ClientOptions
- func (opts *ClientOptions) SetUsername(username string) *ClientOptions
- func (opts *ClientOptions) SetWill(topic string, payload string, qos QoS, retained bool) *ClientOptions
- func (opts *ClientOptions) SetWriteTimeout(t time.Duration)
- func (opts *ClientOptions) UnsetWill() *ClientOptions
- type ConnRC
- type FileStore
- func (store *FileStore) All() []string
- func (store *FileStore) Close()
- func (store *FileStore) Del(key string)
- func (store *FileStore) Get(key string, protocolVersion byte) (m *Message)
- func (store *FileStore) Open()
- func (store *FileStore) Put(key string, m *Message, protocolVersion byte)
- func (store *FileStore) Reset()
- type MId
- type MemoryStore
- func (store *MemoryStore) All() []string
- func (store *MemoryStore) Close()
- func (store *MemoryStore) Del(key string)
- func (store *MemoryStore) Get(key string, protocolVersion byte) *Message
- func (store *MemoryStore) Open()
- func (store *MemoryStore) Put(key string, message *Message, protocolVersion byte)
- func (store *MemoryStore) Reset()
- type Message
- func (m *Message) Bytes(protocolVersion byte) []byte
- func (m *Message) DupFlag() bool
- func (m *Message) MsgId() MId
- func (m *Message) Payload() []byte
- func (m *Message) QoS() QoS
- func (m *Message) RetainedFlag() bool
- func (m *Message) SetQoS(qos QoS)
- func (m *Message) SetRetainedFlag(isRetained bool)
- func (m *Message) Topic() string
- type MessageHandler
- type MqttClient
- func (c *MqttClient) Disconnect(quiesce uint)
- func (c *MqttClient) EndSubscription(topics ...string) (<-chan Receipt, error)
- func (c *MqttClient) ForceDisconnect()
- func (c *MqttClient) GetAlias() <-chan Receipt
- func (c *MqttClient) GetAliasList(topic string) <-chan Receipt
- func (c *MqttClient) GetAliasList2(topic string) <-chan Receipt
- func (c *MqttClient) GetState(alias string) <-chan Receipt
- func (c *MqttClient) GetState2(alias string) <-chan Receipt
- func (c *MqttClient) IsConnected() bool
- func (c *MqttClient) Presence(callback MessageHandler, topic string) (<-chan Receipt, error)
- func (c *MqttClient) Publish(qos QoS, topic string, payload interface{}) <-chan Receipt
- func (c *MqttClient) PublishMessage(topic string, message *Message) <-chan Receipt
- func (c *MqttClient) PublishToAlias(alias string, payload interface{}) <-chan Receipt
- func (c *MqttClient) SetAlias(alias string) <-chan Receipt
- func (c *MqttClient) Start() ([]Receipt, error)
- func (c *MqttClient) StartSubscription(callback MessageHandler, filters ...*TopicFilter) (<-chan Receipt, error)
- func (c *MqttClient) UnPresence(topic string) (<-chan Receipt, error)
- type MsgType
- type OnConnectionLost
- type QoS
- type Receipt
- type Store
- type TopicFilter
- type TopicName
- type YunbaClient
- type YunbaInfo
Constants ¶
const ( NET component = "[net] " PNG component = "[pinger] " CLI component = "[client] " DEC component = "[decode] " MES component = "[message] " STR component = "[store] " MID component = "[msgids] " TST component = "[test] " STA component = "[state] " ERR component = "[error] " )
const ( GETALIAS = 1 GETTOPIC = 3 GETALIASLIST = 5 GETALIASLIST2 = 15 GETSTATE = 9 GETSTATE2 = 19 )
Variables ¶
var ErrBadCredentials = errors.New("Bad user name or password")
var ErrInvalidClientID = errors.New("Identifier rejected")
var ErrInvalidProtocolVersion = errors.New("Unnacceptable protocol version")
* Connect Errors
var ErrInvalidQoS = errors.New("Invalid QoS")
* QoS Errors
var ErrInvalidTopicFilterEmptyString = errors.New("Invalid TopicFilter - may not be empty string")
var ErrInvalidTopicFilterMultilevel = errors.New("Invalid TopicFilter - multi-level wildcard must be last level")
var ErrInvalidTopicNameEmptyString = errors.New("Invalid TopicName - may not be empty string")
* Topic Errors
var ErrInvalidTopicNameWildcard = errors.New("Invalid TopicName - may not contain wild card")
var ErrNotAuthorized = errors.New("Not Authorized")
var ErrNotConnected = errors.New("Not Connected")
var ErrUnknownReason = errors.New("Unknown RC")
Functions ¶
func DefaultErrorHandler ¶
func DefaultErrorHandler(client *MqttClient, reason error)
Types ¶
type ClientOptions ¶
type ClientOptions struct {
// contains filtered or unexported fields
}
ClientOptions contains configurable options for an MqttClient.
func NewClientOptions ¶
func NewClientOptions() *ClientOptions
NewClientClientOptions will create a new ClientClientOptions type with some default values.
Port: 1883 CleanSession: True Timeout: 30 (seconds) Tracefile: os.Stdout
func (*ClientOptions) AddBroker ¶
func (o *ClientOptions) AddBroker(server string) *ClientOptions
AddBroker adds a broker URI to the list of brokers to be used. The format should be scheme://host:port Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname) and "port" is the port on which the broker is accepting connections.
func (*ClientOptions) SetBinaryWill ¶
func (opts *ClientOptions) SetBinaryWill(topic string, payload []byte, qos QoS, retained bool) *ClientOptions
SetBinaryWill accepts a []byte will message to be set. When the client connects, it will give this will message to the broker, which will then publish the provided payload (the will) to any clients that are subscribed to the provided topic.
func (*ClientOptions) SetCleanSession ¶
func (opts *ClientOptions) SetCleanSession(clean bool) *ClientOptions
SetCleanSession will set the "clean session" flag in the connect message when this client connects to an MQTT broker. By setting this flag, you are indicating that no messages saved by the broker for this client should be delivered. Any messages that were going to be sent by this client before diconnecting previously but didn't will not be sent upon connecting to the broker.
func (*ClientOptions) SetClientId ¶
func (opts *ClientOptions) SetClientId(clientid string) *ClientOptions
SetClientId will set the client id to be used by this client when connecting to the MQTT broker. According to the MQTT v3.1 specification, a client id mus be no longer than 23 characters.
func (*ClientOptions) SetDefaultPublishHandler ¶
func (opts *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions
SetDefaultPublishHandler
func (*ClientOptions) SetKeepAlive ¶
func (opts *ClientOptions) SetKeepAlive(keepAlive uint) *ClientOptions
SetKeepAlive will set the amount of time (in seconds) that the client should wait before sending a PING request to the broker. This will allow the client to know that a connection has not been lost with the server.
func (*ClientOptions) SetOnConnectionLost ¶
func (opts *ClientOptions) SetOnConnectionLost(onLost OnConnectionLost) *ClientOptions
SetOnConnectionLost will set the OnConnectionLost callback to be executed in the case where the client unexpectedly loses connection with the MQTT broker.
func (*ClientOptions) SetOrderMatters ¶
func (opts *ClientOptions) SetOrderMatters(order bool) *ClientOptions
SetOrderMatters will set the message routing to guarantee order within each QoS level. By default, this value is true. If set to false, this flag indicates that messages can be delivered asynchronously from the client to the application and possibly arrive out of order.
func (*ClientOptions) SetPassword ¶
func (opts *ClientOptions) SetPassword(password string) *ClientOptions
SetPassword will set the password to be used by this client when connecting to the MQTT broker. Note: without the use of SSL/TLS, this information will be sent in plaintext accross the wire.
func (*ClientOptions) SetProtocolVersion ¶
func (opts *ClientOptions) SetProtocolVersion(v byte) *ClientOptions
SetProtocolVersion will set ProtocolVersion
func (*ClientOptions) SetStore ¶
func (opts *ClientOptions) SetStore(store Store) *ClientOptions
SetStore will set the implementation of the Store interface used to provide message persistence in cases where QoS levels QoS_ONE or QoS_TWO are used. If no store is provided, then the client will use MemoryStore by default.
func (*ClientOptions) SetTlsConfig ¶
func (opts *ClientOptions) SetTlsConfig(tlsConfig *tls.Config) *ClientOptions
SetTlsConfig will set an SSL/TLS configuration to be used when connecting to an MQTT broker. Please read the official Go documentation for more information.
func (*ClientOptions) SetUsername ¶
func (opts *ClientOptions) SetUsername(username string) *ClientOptions
SetUsername will set the username to be used by this client when connecting to the MQTT broker. Note: without the use of SSL/TLS, this information will be sent in plaintext accross the wire.
func (*ClientOptions) SetWill ¶
func (opts *ClientOptions) SetWill(topic string, payload string, qos QoS, retained bool) *ClientOptions
SetWill accepts a string will message to be set. When the client connects, it will give this will message to the broker, which will then publish the provided payload (the will) to any clients that are subscribed to the provided topic.
func (*ClientOptions) SetWriteTimeout ¶
func (opts *ClientOptions) SetWriteTimeout(t time.Duration)
SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a timeout error. A duration of 0 never times out.
func (*ClientOptions) UnsetWill ¶
func (opts *ClientOptions) UnsetWill() *ClientOptions
UnsetWill will cause any set will message to be disregarded.
type FileStore ¶
FileStore implements the store interface using the filesystem to provide true persistence, even across client failure. This is designed to use a single directory per running client. If you are running multiple clients on the same filesystem, you will need to be careful to specify unique store directories for each.
func NewFileStore ¶
NewFileStore will create a new FileStore which stores its messages in the directory provided.
func (*FileStore) All ¶
All will provide a list of all of the keys associated with messages currenly residing in the FileStore.
func (*FileStore) Close ¶
func (store *FileStore) Close()
Close will disallow the FileStore from being used.
func (*FileStore) Del ¶
Del will remove the persisted message associated with the provided key from the FileStore.
func (*FileStore) Get ¶
Get will retrieve a message from the store, the one associated with the provided key value.
type MId ¶
type MId uint64
MId is 16 bit message id as specified by the MQTT spec. In general, these values should not be depended upon by the client application.
type MemoryStore ¶
MemoryStore implements the store interface to provide a "persistence" mechanism wholly stored in memory. This is only useful for as long as the client instance exists.
func NewMemoryStore ¶
func NewMemoryStore() *MemoryStore
NewMemoryStore returns a pointer to a new instance of MemoryStore, the instance is not initialized and ready to use until Open() has been called on it.
func (*MemoryStore) All ¶
func (store *MemoryStore) All() []string
All returns a slice of strings containing all the keys currently in the MemoryStore.
func (*MemoryStore) Close ¶
func (store *MemoryStore) Close()
Close will disallow modifications to the state of the store.
func (*MemoryStore) Del ¶
func (store *MemoryStore) Del(key string)
Del takes a key, searches the MemoryStore and if the key is found deletes the Message pointer associated with it.
func (*MemoryStore) Get ¶
func (store *MemoryStore) Get(key string, protocolVersion byte) *Message
Get takes a key and looks in the store for a matching Message returning either the Message pointer or nil.
func (*MemoryStore) Open ¶
func (store *MemoryStore) Open()
Open initializes a MemoryStore instance.
func (*MemoryStore) Put ¶
func (store *MemoryStore) Put(key string, message *Message, protocolVersion byte)
Put takes a key and a pointer to a Message and stores the message.
func (*MemoryStore) Reset ¶
func (store *MemoryStore) Reset()
Reset eliminates all persisted message data in the store.
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
func NewMessage ¶
Create a default PUBLISH Message with the specified payload If message == nil, create a zero length message Defaults: QoS=1, Retained=False
func (*Message) Bytes ¶
Bytes operates on a Message pointer and returns a slice of bytes representing the Message ready for transmission over the network
func (*Message) DupFlag ¶
DupFlag returns the boolean value of the duplicate message flag as encoded in the fixed header
func (*Message) RetainedFlag ¶
RetainedFlag returns a boolean value indicating whether this message was a retained message
func (*Message) SetQoS ¶
setQoS takes a QoS value and encodes this value in the fixed header of the Message
func (*Message) SetRetainedFlag ¶
SetRetainedFlag takes a boolean value indicating whether the server should retain the message and encodes it in the fixed header
type MessageHandler ¶
type MessageHandler func(client *MqttClient, message Message)
MessageHandler is a callback type which can be set to be executed upon the arrival of messages published to topics to which the client is subscribed.
type MqttClient ¶
Numerous connection options may be specified by configuring a and then supplying a ClientOptions type.
func NewClient ¶
func NewClient(ops *ClientOptions) *MqttClient
NewClient will create an MQTT v3.1 client with all of the options specified in the provided ClientOptions. The client must have the Start method called on it before it may be used. This is to make sure resources (such as a net connection) are created before the application is actually ready.
func (*MqttClient) Disconnect ¶
func (c *MqttClient) Disconnect(quiesce uint)
Disconnect will end the connection with the server, but not before waiting the specified number of milliseconds to wait for existing work to be completed.
func (*MqttClient) EndSubscription ¶
func (c *MqttClient) EndSubscription(topics ...string) (<-chan Receipt, error)
EndSubscription will end the subscription from each of the topics provided. Messages published to those topics from other clients will no longer be received.
func (*MqttClient) ForceDisconnect ¶
func (c *MqttClient) ForceDisconnect()
ForceDisconnect will end the connection with the mqtt broker immediately.
func (*MqttClient) GetAlias ¶
func (c *MqttClient) GetAlias() <-chan Receipt
getAlias will getAlias of this Client
func (*MqttClient) GetAliasList ¶
func (c *MqttClient) GetAliasList(topic string) <-chan Receipt
GetAliasList of this Client
func (*MqttClient) GetAliasList2 ¶
func (c *MqttClient) GetAliasList2(topic string) <-chan Receipt
GetAliasList2 of this Client
func (*MqttClient) GetState ¶
func (c *MqttClient) GetState(alias string) <-chan Receipt
GetState of this Client
func (*MqttClient) GetState2 ¶
func (c *MqttClient) GetState2(alias string) <-chan Receipt
GetState2 of this Client
func (*MqttClient) IsConnected ¶
func (c *MqttClient) IsConnected() bool
func (*MqttClient) Presence ¶
func (c *MqttClient) Presence(callback MessageHandler, topic string) (<-chan Receipt, error)
YunBa API presence see http://yunba.io/docs2/c/api/#MQTTClient_presence
func (*MqttClient) Publish ¶
func (c *MqttClient) Publish(qos QoS, topic string, payload interface{}) <-chan Receipt
Publish will publish a message with the specified QoS and content to the specified topic. Returns a read only channel used to track the delivery of the message.
func (*MqttClient) PublishMessage ¶
func (c *MqttClient) PublishMessage(topic string, message *Message) <-chan Receipt
PublishMessage will publish a Message to the specified topic. Returns a read only channel used to track the delivery of the message.
func (*MqttClient) PublishToAlias ¶
func (c *MqttClient) PublishToAlias(alias string, payload interface{}) <-chan Receipt
YunBa API publish_to_alias see http://yunba.io/docs2/c/api/#MQTTClient_publish_to_alias
func (*MqttClient) SetAlias ¶
func (c *MqttClient) SetAlias(alias string) <-chan Receipt
YunBa API set_alias see http://yunba.io/docs2/c/api/#MQTTClient_set_alias
func (*MqttClient) Start ¶
func (c *MqttClient) Start() ([]Receipt, error)
Start will create a connection to the message broker If clean session is false, then a slice will be returned containing Receipts for all messages that were in-flight at the last disconnect. If clean session is true, then any existing client state will be removed.
func (*MqttClient) StartSubscription ¶
func (c *MqttClient) StartSubscription(callback MessageHandler, filters ...*TopicFilter) (<-chan Receipt, error)
Start a new subscription. Provide a MessageHandler to be executed when a message is published on one of the topics provided.
func (*MqttClient) UnPresence ¶
func (c *MqttClient) UnPresence(topic string) (<-chan Receipt, error)
YunBa API presence see http://yunba.io/docs2/c/api/#MQTTClient_unpresence
type MsgType ¶
type MsgType byte
const ( /* 0x00 is reserved */ CONNECT MsgType = 0x01 CONNACK MsgType = 0x02 PUBLISH MsgType = 0x03 PUBACK MsgType = 0x04 PUBREC MsgType = 0x05 PUBREL MsgType = 0x06 PUBCOMP MsgType = 0x07 SUBSCRIBE MsgType = 0x08 SUBACK MsgType = 0x09 UNSUBSCRIBE MsgType = 0x0A UNSUBACK MsgType = 0x0B PINGREQ MsgType = 0x0C PINGRESP MsgType = 0x0D DISCONNECT MsgType = 0x0E EXTEND MsgType = 0x0F )
MsgType
func DecodeMsgType ¶
decodeMsgType returns the type of the message
type OnConnectionLost ¶
type OnConnectionLost func(client *MqttClient, reason error)
OnConnectionLost is a callback type which can be set to be executed upon an unintended disconnection from the MQTT broker. Disconnects caused by calling Disconnect or ForceDisconnect will not cause an OnConnectionLost callback to execute.
type Receipt ¶
type Receipt struct { }
Receipt is a sort of token object that you will receive upon delivery of a published message.
type Store ¶
type Store interface { Open() Put(string, *Message, byte) Get(string, byte) *Message All() []string Del(string) Close() Reset() }
Store is an interface which can be used to provide implementations for message persistence. Because we may have to store distinct messages with the same message ID, we need a unique key for each message. This is possible by prepending "i." or "o." to each message id
type TopicFilter ¶
type TopicFilter struct { QoS // contains filtered or unexported fields }
func NewTopicFilter ¶
func NewTopicFilter(topic string, qos byte) (*TopicFilter, error)