mqtt

package module
v0.0.0-...-c3fc0b7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2018 License: MIT Imports: 8 Imported by: 3

README

Golang MQTT-Server

This repo is a implementation of the MQTT Protocol for the Go Programming Language.

GoDoc

What is MQTT?

MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely
simple and lightweight messaging protocol, designed for constrained devices
and low-bandwidth, high-latency or unreliable networks. The design principles
are to minimise network bandwidth and device resource requirements whilst also
attempting to ensure reliability and some degree of assurance of delivery.
These principles also turn out to make the protocol ideal of the emerging
“machine-to-machine” (M2M) or “Internet of Things” world of connected devices,
and for mobile applications where bandwidth and battery power are at a premium.

→ See MQTT FAQ

Install Go

Go is an open source programming language that makes it easy to build simple,
reliable, and efficient software.

You can download Go at golang.org/dl. There are executables for Microsoft Windows, Apple macOS and Linux, as well as the Go source code. There are also Golang releases for Docker Logo Docker!

Run the MQTT-Server

First of all, grab this repository by running the following line. The go get command will clone this repo into the $GOPATH directory.

go get github.com/j-forster/mqtt

Now build and install the server with:

go install github.com/j-forster/mqtt/server

This will compile the server code and move the executable to $GOPATH/bin/.

To start the server:

$GOPATH/bin/server

The default MQTT (TCP) port is :1883. You can now connect with any MQTT client.

Benchmark

To run the benchmark tests, use:

npm run test -- -g bench -p 1883 -c 100 -l 10
Parameter Description
-g Tests to run. bench means 'benchmark tests'
-p Server Port. Default: 1883
-c Total number of packages to send. Default: 100
-l Number of packages to send concurrently. Default: 10
Results

Benchmark results depend on your system and configration!

The following results have been recorded with a Windows 10 x64 machine.

Go MQTT-Server:

[Bench] Network: single publisher -> single subscriber
[Bench] Sending 10000 packages, 10 concurrent ..
[Bench] Time delta: 642.22588 ms
[Bench] Msg/Sec: 15570.845572277469 Msg/s

Eclipse Mosquitto MQTT Broker:

[Bench] Network: single publisher -> single subscriber
[Bench] Sending 10000 packages, 10 concurrent ..
[Bench] Time delta: 645.351073 ms
[Bench] Msg/Sec: 15495.441812025932 Msg/s

Documentation

Index

Constants

View Source
const (
	CONNECTING = 0
	CONNECTED  = 1
	CLOSING    = 3
	CLOSED     = 4
)
View Source
const (
	ACCEPTED            = 0
	UNACCEPTABLE_PROTOV = 1
	IDENTIFIER_REJ      = 2
	SERVER_UNAVAIL      = 3
	BAD_USER_OR_PASS    = 4
	NOT_AUTHORIZED      = 5
)

CONNACK return codes

View Source
const (
	CONNECT     = 1
	CONNACK     = 2
	PUBLISH     = 3
	PUBACK      = 4
	PUBREC      = 5
	PUBREL      = 6
	PUBCOMP     = 7
	SUBSCRIBE   = 8
	SUBACK      = 9
	UNSUBSCRIBE = 10
	UNSUBACK    = 11
	PINGREQ     = 12
	PINGRESP    = 13
	DISCONNECT  = 14
)

message types

View Source
const (
	CREATE = 1
	REMOVE = 2
)

Variables

View Source
var (
	InclompleteHeader       = errors.New("incomplete header")
	MaxMessageLength        = errors.New("message length exceeds server maximum")
	MessageLengthInvalid    = errors.New("message length exceeds maximum")
	IncompleteMessage       = errors.New("incomplete message")
	UnknownMessageType      = errors.New("unknown mqtt message type")
	ReservedMessageType     = errors.New("reserved message type")
	ConnectMsgLacksProtocol = errors.New("connect message has no protocol field")
	ConnectProtocolUnexp    = errors.New("connect message protocol is not 'MQIsdp'")
	TooLongClientID         = errors.New("connect client id is too long")
	UnknownMessageID        = errors.New("unknown message id")
)

errors

Functions

func Head(b0 byte, length int, total int) ([]byte, []byte)

sowas wie Body() oder New() weil mal mit body und mal nur head benötigt wird..

func Join

func Join(conn net.Conn, server *Server)

func ListenAndServe

func ListenAndServe(addr string, handler Handler) error

Types

type Context

type Context struct {

	//	server   *Server
	ClientID string

	Will *Message
	// contains filtered or unexported fields
}

func NewContext

func NewContext(w io.Writer, c io.Closer, server *Server) *Context

func (*Context) Alive

func (ctx *Context) Alive() bool

func (*Context) Close

func (ctx *Context) Close() error

func (*Context) ConnAck

func (ctx *Context) ConnAck(code byte)

func (*Context) Fail

func (ctx *Context) Fail(err error) error

func (*Context) Failf

func (ctx *Context) Failf(format string, a ...interface{}) error

func (*Context) Get

func (ctx *Context) Get(key string) interface{}

func (*Context) PingResp

func (ctx *Context) PingResp()

func (*Context) Publish

func (ctx *Context) Publish(sub *Subscription, msg *Message)

func (*Context) Read

func (ctx *Context) Read(reader io.Reader)

read from a reader (input stream) a new mqtt message

func (*Context) ReadConnectMessage

func (ctx *Context) ReadConnectMessage(reader io.Reader, fh *FixedHeader, buf []byte)

parse CONNECT messages

func (*Context) ReadPubcompMessage

func (ctx *Context) ReadPubcompMessage(reader io.Reader, fh *FixedHeader, buf []byte)

parse a PUBCOMP message (a response to a PUBREL from a client to this server)

func (*Context) ReadPublishMessage

func (ctx *Context) ReadPublishMessage(reader io.Reader, fh *FixedHeader, buf []byte)

parse a PUBLISH message and tell the server about it

func (*Context) ReadPubrecMessage

func (ctx *Context) ReadPubrecMessage(reader io.Reader, fh *FixedHeader, buf []byte)

parse a PUBREC message (a response to a publish from this server to a client on qos 2)

func (*Context) ReadPubrelMessage

func (ctx *Context) ReadPubrelMessage(reader io.Reader, fh *FixedHeader, buf []byte)

parse a PUBREL message (a response to a PUBREC at QoS 2) the message has alredy been stored at the previous PUBREC message

func (*Context) ReadSubscribeMessage

func (ctx *Context) ReadSubscribeMessage(reader io.Reader, fh *FixedHeader, buf []byte)

parse a SUBSCRIBE message and send SUBACK

func (*Context) Set

func (ctx *Context) Set(key string, value interface{})

func (*Context) Subscribe

func (ctx *Context) Subscribe(topic string, qos byte) byte

func (*Context) Unsubscribe

func (ctx *Context) Unsubscribe(topic string)

func (*Context) Write

func (ctx *Context) Write(data []byte) (n int, err error)

type FixedHeader

type FixedHeader struct {
	// contains filtered or unexported fields
}

func (*FixedHeader) Read

func (fh *FixedHeader) Read(reader io.Reader) error

type Handler

type Handler interface {
	Connect(ctx *Context, username, password string) error
	Disconnect(ctx *Context)
	Publish(ctx *Context, msg *Message) error
	Subscribe(ctx *Context, topic string, qos byte) error
}

type Message

type Message struct {
	Topic string
	Buf   []byte
	QoS   byte
	// contains filtered or unexported fields
}

type Publisher

type Publisher interface {
	Publish(msg *Message)
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(closer io.Closer, handler Handler) *Server

func (*Server) Alive

func (svr *Server) Alive() bool

func (*Server) Close

func (svr *Server) Close()

func (*Server) Publish

func (svr *Server) Publish(ctx *Context, msg *Message)

func (*Server) Run

func (svr *Server) Run()

func (*Server) Serve

func (svr *Server) Serve(rwc io.ReadWriteCloser)

func (*Server) Subscribe

func (svr *Server) Subscribe(ctx *Context, topic string, qos byte) *Subscription

func (*Server) Unsubscribe

func (svr *Server) Unsubscribe(subs *Subscription)

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func NewSubscription

func NewSubscription(ctx *Context, qos byte) *Subscription

func (*Subscription) ChainLength

func (s *Subscription) ChainLength() int

func (*Subscription) Publish

func (s *Subscription) Publish(msg *Message)

func (*Subscription) Unsubscribe

func (sub *Subscription) Unsubscribe()

type SubscriptionChange

type SubscriptionChange struct {
	// contains filtered or unexported fields
}

type SubscriptionHandler

type SubscriptionHandler interface {
	Subscribe(ctx *Context, topic string, qos byte) *Subscription
	Unsubscribe(subs *Subscription)
}

type SubscriptionRequest

type SubscriptionRequest struct {
	// contains filtered or unexported fields
}

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(parent *Topic, name string) *Topic

func (*Topic) Enqueue

func (topic *Topic) Enqueue(queue **Subscription, s *Subscription)

func (*Topic) Find

func (topic *Topic) Find(s []string) *Subscription

func (*Topic) PrintIndent

func (topic *Topic) PrintIndent(builder *strings.Builder, indent string)

func (*Topic) Publish

func (topic *Topic) Publish(s []string, msg *Message)

func (*Topic) Remove

func (topic *Topic) Remove()

func (*Topic) String

func (topic *Topic) String() string

func (*Topic) Subscribe

func (topic *Topic) Subscribe(t []string, sub *Subscription)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL