server

package
v0.0.0-...-e5a160e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2018 License: MIT Imports: 38 Imported by: 0

Documentation

Overview

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.

Index

Constants

View Source
const KILOBYTE = 1024
View Source
const MAX_REQUEST_SIZE = MEGABYTE * 2
View Source
const MEGABYTE = 1024 * KILOBYTE

Variables

View Source
var (
	V311_MAGIC   = []byte("MQTT")
	V311_VERSION = uint8(4)
	V3_MAGIC     = []byte("MQIsdp")
	V3_VERSION   = uint8(3)
)

Functions

This section is empty.

Types

type Application

type Application struct {
	Engine  *Momonga
	Servers []Server
	// contains filtered or unexported fields
}

Application manages start / stop, singnal handling and listeners.

+-----------+ |APPLICATION| start / stop, signal handling +-----------+ | LISTENER | listen and accept +-----------+ | HANDLER | parse and execute commands +-----------+ | ENGINE | implements commands api +-----------+

func NewApplication

func NewApplication(configPath string) *Application

func (*Application) Loop

func (self *Application) Loop()

func (*Application) RegisterServer

func (self *Application) RegisterServer(svr Server)

func (*Application) Start

func (self *Application) Start()

func (*Application) Stop

func (self *Application) Stop()

type Authenticator

type Authenticator interface {
	Init(config *configuration.Config)
	Authenticate(user_id, password []byte) (bool, error)
	Shutdown()
}

type DisconnectError

type DisconnectError struct {
}

func (*DisconnectError) Error

func (e *DisconnectError) Error() string

type DummyPlug

type DummyPlug struct {
	Identity string
	Switch   chan bool

	Running bool
	// contains filtered or unexported fields
}

func NewDummyPlug

func NewDummyPlug(engine *Momonga) *DummyPlug

func (*DummyPlug) AppendSubscribedTopic

func (self *DummyPlug) AppendSubscribedTopic(string, *SubscribeSet)

func (*DummyPlug) Close

func (self *DummyPlug) Close() error

func (*DummyPlug) DisableCleanSession

func (self *DummyPlug) DisableCleanSession()

func (*DummyPlug) GetId

func (self *DummyPlug) GetId() string

func (*DummyPlug) GetOutGoingTable

func (self *DummyPlug) GetOutGoingTable() *util.MessageTable

func (*DummyPlug) GetRealId

func (self *DummyPlug) GetRealId() string

func (*DummyPlug) GetState

func (self *DummyPlug) GetState() State

func (*DummyPlug) GetSubscribedTopics

func (self *DummyPlug) GetSubscribedTopics() map[string]*SubscribeSet

func (*DummyPlug) GetWillMessage

func (self *DummyPlug) GetWillMessage() *mqtt.WillMessage

func (*DummyPlug) HasWillMessage

func (self *DummyPlug) HasWillMessage() bool

func (*DummyPlug) IsAlived

func (self *DummyPlug) IsAlived() bool

func (*DummyPlug) IsBridge

func (self *DummyPlug) IsBridge() bool

func (*DummyPlug) ReadMessage

func (self *DummyPlug) ReadMessage() (mqtt.Message, error)

func (*DummyPlug) RemoveSubscribedTopic

func (self *DummyPlug) RemoveSubscribedTopic(string)

func (*DummyPlug) ResetState

func (self *DummyPlug) ResetState()

func (*DummyPlug) Run

func (self *DummyPlug) Run()

func (*DummyPlug) SetId

func (self *DummyPlug) SetId(id string)

func (*DummyPlug) SetKeepaliveInterval

func (self *DummyPlug) SetKeepaliveInterval(int)

func (*DummyPlug) SetState

func (self *DummyPlug) SetState(State)

func (*DummyPlug) SetWillMessage

func (self *DummyPlug) SetWillMessage(mqtt.WillMessage)

func (*DummyPlug) ShouldCleanSession

func (self *DummyPlug) ShouldCleanSession() bool

func (*DummyPlug) Stop

func (self *DummyPlug) Stop()

func (*DummyPlug) WriteMessageQueue

func (self *DummyPlug) WriteMessageQueue(request mqtt.Message)

func (*DummyPlug) WriteMessageQueue2

func (self *DummyPlug) WriteMessageQueue2(msg []byte)

type EmptyAuthenticator

type EmptyAuthenticator struct {
}

EmptyAuthenticator allows anything.

func (*EmptyAuthenticator) Authenticate

func (self *EmptyAuthenticator) Authenticate(user_id, password []byte) (bool, error)

func (*EmptyAuthenticator) Init

func (self *EmptyAuthenticator) Init(config *configuration.Config)

func (*EmptyAuthenticator) Shutdown

func (self *EmptyAuthenticator) Shutdown()

type Handler

type Handler struct {
	Engine     *Momonga
	Connection Connection
}

Handler dispatches messages which sent by client. this struct will be use client library soon.

とかいいつつ、ackとかはhandlerで返してねーとか立ち位置分かりづらい Engine側でMQTTの基本機能を全部やれればいいんだけど、そうすると client library別にしないと無理なんだよなー。 目指すところとしては、基本部分はデフォルトのHandlerで動くから それで動かないところだけうわがいてね!って所。 Handler自体は受け渡ししかやらんのでlockしなくて大丈夫なはず

func NewHandler

func NewHandler(conn Connection, engine *Momonga) *Handler

func (*Handler) Close

func (self *Handler) Close()

func (*Handler) Disconnect

func (self *Handler) Disconnect()

func (*Handler) HandshakeInternal

func (self *Handler) HandshakeInternal(p *codec.ConnectMessage)

func (*Handler) Parsed

func (self *Handler) Parsed()

func (*Handler) Pingreq

func (self *Handler) Pingreq()

func (*Handler) Puback

func (self *Handler) Puback(messageId uint16)

func (*Handler) Pubcomp

func (self *Handler) Pubcomp(messageId uint16)

func (*Handler) Publish

func (self *Handler) Publish(p *codec.PublishMessage)

func (*Handler) Pubrec

func (self *Handler) Pubrec(messageId uint16)

func (*Handler) Pubrel

func (self *Handler) Pubrel(messageId uint16)

func (*Handler) Subscribe

func (self *Handler) Subscribe(p *codec.SubscribeMessage)

func (*Handler) Unsubscribe

func (self *Handler) Unsubscribe(messageId uint16, granted int, payloads []codec.SubscribePayload)

type HttpListener

type HttpListener struct {
	net.Listener
	WebSocketMount string
	// contains filtered or unexported fields
}

func NewHttpListener

func NewHttpListener(listener net.Listener) *HttpListener

func (*HttpListener) Accept

func (self *HttpListener) Accept() (c net.Conn, err error)

func (*HttpListener) Addr

func (self *HttpListener) Addr() net.Addr

func (*HttpListener) Close

func (self *HttpListener) Close() error

func (*HttpListener) File

func (self *HttpListener) File() (f *os.File, err error)

type HttpServer

type HttpServer struct {
	http.Server
	Engine  *Momonga
	Address string
	// contains filtered or unexported fields
}

func NewHttpServer

func NewHttpServer(engine *Momonga, config *configuration.Config, inherit bool) *HttpServer

func (*HttpServer) Graceful

func (self *HttpServer) Graceful()

func (*HttpServer) ListenAndServe

func (self *HttpServer) ListenAndServe() error

func (*HttpServer) Listener

func (self *HttpServer) Listener() Listener

func (*HttpServer) Serve

func (self *HttpServer) Serve(l net.Listener) error

func (*HttpServer) Stop

func (self *HttpServer) Stop()

type Listener

type Listener interface {
	Accept() (c net.Conn, err error)

	Close() error

	Addr() net.Addr

	File() (f *os.File, err error)
}

type MmuxConnection

type MmuxConnection struct {
	// Primary
	Connection       *Connection
	OfflineQueue     []mqtt.Message
	MaxOfflineQueue  int
	Identifier       string
	CleanSession     bool
	OutGoingTable    *util.MessageTable
	SubscribeMap     map[string]bool
	Created          time.Time
	Hash             uint32
	Mutex            sync.RWMutex
	SubscribedTopics map[string]*SubscribeSet
}

MQTT Multiplexer Connection

Multiplexer、というかなんだろ。EngineとConnectionとの仲介でおいとくやつ。 Sessionがあるのでこういうふうにしとくと楽かな、と

func NewMmuxConnection

func NewMmuxConnection() *MmuxConnection

func (*MmuxConnection) AppendSubscribedTopic

func (self *MmuxConnection) AppendSubscribedTopic(topic string, set *SubscribeSet)

func (*MmuxConnection) Attach

func (self *MmuxConnection) Attach(conn Connection)

func (*MmuxConnection) Close

func (self *MmuxConnection) Close() error

func (*MmuxConnection) Detach

func (self *MmuxConnection) Detach(conn Connection, dummy *DummyPlug)

func (*MmuxConnection) DisableCleanSession

func (self *MmuxConnection) DisableCleanSession()

func (*MmuxConnection) GetHash

func (self *MmuxConnection) GetHash() uint32

func (*MmuxConnection) GetId

func (self *MmuxConnection) GetId() string

func (*MmuxConnection) GetOutGoingTable

func (self *MmuxConnection) GetOutGoingTable() *util.MessageTable

func (*MmuxConnection) GetRealId

func (self *MmuxConnection) GetRealId() string

func (*MmuxConnection) GetState

func (self *MmuxConnection) GetState() State

func (*MmuxConnection) GetSubscribedTopics

func (self *MmuxConnection) GetSubscribedTopics() map[string]*SubscribeSet

func (*MmuxConnection) GetWillMessage

func (self *MmuxConnection) GetWillMessage() *mqtt.WillMessage

func (*MmuxConnection) HasWillMessage

func (self *MmuxConnection) HasWillMessage() bool

func (*MmuxConnection) IsAlived

func (self *MmuxConnection) IsAlived() bool

func (*MmuxConnection) IsBridge

func (self *MmuxConnection) IsBridge() bool

func (*MmuxConnection) IsSubscribed

func (self *MmuxConnection) IsSubscribed(topic string) bool

func (*MmuxConnection) ReadMessage

func (self *MmuxConnection) ReadMessage() (mqtt.Message, error)

func (*MmuxConnection) RemoveSubscribedTopic

func (self *MmuxConnection) RemoveSubscribedTopic(topic string)

func (*MmuxConnection) ResetState

func (self *MmuxConnection) ResetState()

func (*MmuxConnection) SetId

func (self *MmuxConnection) SetId(id string)

func (*MmuxConnection) SetKeepaliveInterval

func (self *MmuxConnection) SetKeepaliveInterval(interval int)

func (*MmuxConnection) SetState

func (self *MmuxConnection) SetState(state State)

func (*MmuxConnection) SetWillMessage

func (self *MmuxConnection) SetWillMessage(msg mqtt.WillMessage)

func (*MmuxConnection) ShouldCleanSession

func (self *MmuxConnection) ShouldCleanSession() bool

func (*MmuxConnection) WriteMessageQueue

func (self *MmuxConnection) WriteMessageQueue(request mqtt.Message)

type Momonga

type Momonga struct {
	OutGoingTable *util.MessageTable
	InflightTable map[string]*util.MessageTable
	TopicMatcher  TopicMatcher
	// TODO: improve this.
	Connections map[uint32]map[string]*MmuxConnection
	RetryMap    map[string][]*Retryable
	EnableSys   bool
	Started     time.Time
	DataStore   datastore.Datastore
	LockPool    map[uint32]*sync.RWMutex

	Authenticators []Authenticator
	// contains filtered or unexported fields
}

goroutine (2)

RunMaintenanceThread
Run

func NewMomonga

func NewMomonga(config *configuration.Config) *Momonga

QoS 1, 2 are available. but really suck implementation. reconsider qos design later.

func (*Momonga) Authenticate

func (self *Momonga) Authenticate(user_id, password []byte) (bool, error)

func (*Momonga) CleanSubscription

func (self *Momonga) CleanSubscription(conn Connection)

func (*Momonga) Config

func (self *Momonga) Config() *configuration.Config

func (*Momonga) DisableSys

func (self *Momonga) DisableSys()

func (*Momonga) Doom

func (self *Momonga) Doom()

func (*Momonga) GetConnectionByClientId

func (self *Momonga) GetConnectionByClientId(clientId string) (*MmuxConnection, error)

func (*Momonga) HandleConnection

func (self *Momonga) HandleConnection(conn Connection)

func (*Momonga) Handshake

func (self *Momonga) Handshake(p *codec.ConnectMessage, conn *MyConnection) *MmuxConnection

func (*Momonga) RemoveConnectionByClientId

func (self *Momonga) RemoveConnectionByClientId(clientId string)

func (*Momonga) RetainMatch

func (self *Momonga) RetainMatch(topic string) []*codec.PublishMessage

TODO: wanna implement trie. but regexp works well. retain should persist their data. though, how do we fetch it efficiently...

func (*Momonga) Run

func (self *Momonga) Run()

func (*Momonga) RunMaintenanceThread

func (self *Momonga) RunMaintenanceThread()

below methods are intend to maintain engine itself (remove needless connection, dispatch queue).

func (*Momonga) SendMessage

func (self *Momonga) SendMessage(topic string, message []byte, qos int)

func (*Momonga) SendPublishMessage

func (self *Momonga) SendPublishMessage(msg *codec.PublishMessage, client_id string, is_bridged bool)

func (*Momonga) SendWillMessage

func (self *Momonga) SendWillMessage(conn Connection)

func (*Momonga) SetConnectionByClientId

func (self *Momonga) SetConnectionByClientId(clientId string, conn *MmuxConnection)

func (*Momonga) Subscribe

func (self *Momonga) Subscribe(p *codec.SubscribeMessage, conn Connection)

func (*Momonga) Terminate

func (self *Momonga) Terminate()

func (*Momonga) Unsubscribe

func (self *Momonga) Unsubscribe(messageId uint16, granted int, payloads []codec.SubscribePayload, conn Connection)

type MyBroker

type MyBroker struct {
	Clients            MyClients
	Messages           MyMessages
	Load               MyLoad
	SubscriptionsCount *expvar.Int
	Uptime             *expvar.Int
}

type MyClients

type MyClients struct {
	Connected    *expvar.Int
	Total        *expvar.Int
	Maximum      *expvar.Int
	Disconnected *expvar.Int
}

type MyConn

type MyConn struct {
	net.Conn
	// contains filtered or unexported fields
}

func (*MyConn) Close

func (self *MyConn) Close() error

type MyHttpServer

type MyHttpServer struct {
	Engine         *Momonga
	WebSocketMount string
}

func (*MyHttpServer) ServeHTTP

func (self *MyHttpServer) ServeHTTP(w http.ResponseWriter, req *http.Request)

type MyListener

type MyListener struct {
	net.Listener
	// contains filtered or unexported fields
}

func (*MyListener) Accept

func (self *MyListener) Accept() (net.Conn, error)

func (*MyListener) File

func (self *MyListener) File() (f *os.File, err error)

type MyLoad

type MyLoad struct {
	BytesSend     *expvar.Int
	BytesReceived *expvar.Int
}

type MyMessages

type MyMessages struct {
	Received       *expvar.Int
	Sent           *expvar.Int
	Stored         *expvar.Int
	PublishDropped *expvar.Int
	RetainedCount  *expvar.Int
}

type MyMetrics

type MyMetrics struct {
	System MySystem

	NumGoroutine      *expvar.Int
	NumCgoCall        *expvar.Int
	Uptime            *expvar.Int
	MemFree           *expvar.Int
	MemUsed           *expvar.Int
	MemActualFree     *expvar.Int
	MemActualUsed     *expvar.Int
	MemTotal          *expvar.Int
	LoadOne           *expvar.Float
	LoadFive          *expvar.Float
	LoadFifteen       *expvar.Float
	CpuUser           *expvar.Float
	CpuNice           *expvar.Float
	CpuSys            *expvar.Float
	CpuIdle           *expvar.Float
	CpuWait           *expvar.Float
	CpuIrq            *expvar.Float
	CpuSoftIrq        *expvar.Float
	CpuStolen         *expvar.Float
	CpuTotal          *expvar.Float
	MessageSentPerSec *myexpvar.DiffInt
	ConnectPerSec     *myexpvar.DiffInt
	GoroutinePerConn  *expvar.Float
}
var Metrics *MyMetrics = &MyMetrics{
	System: MySystem{
		Broker: MyBroker{
			Clients: MyClients{
				Connected:    expvar.NewInt("sys.broker.clients.connected"),
				Total:        expvar.NewInt("sys.broker.clients.total"),
				Maximum:      expvar.NewInt("sys.broker.clients.maximum"),
				Disconnected: expvar.NewInt("sys.broker.clients.disconnected"),
			},
			Uptime: expvar.NewInt("sys.broker.uptime"),
			Messages: MyMessages{
				Received:       expvar.NewInt("sys.broker.messages.received"),
				Sent:           expvar.NewInt("sys.broker.messages.sent"),
				Stored:         expvar.NewInt("sys.broker.messages.stored"),
				PublishDropped: expvar.NewInt("sys.broker.messages.publish.dropped"),
				RetainedCount:  expvar.NewInt("sys.broker.messages.retained.count"),
			},
			Load: MyLoad{
				BytesSend:     expvar.NewInt("sys.broker.load.bytes_send"),
				BytesReceived: expvar.NewInt("sys.broker.load.bytes_received"),
			},
			SubscriptionsCount: expvar.NewInt("sys.broker.subscriptions.count"),
		},
	},

	NumGoroutine:  expvar.NewInt("numgoroutine"),
	NumCgoCall:    expvar.NewInt("numcgocall"),
	Uptime:        expvar.NewInt("uptime"),
	MemFree:       expvar.NewInt("memfree"),
	MemUsed:       expvar.NewInt("memused"),
	MemActualFree: expvar.NewInt("memactualfree"),
	MemActualUsed: expvar.NewInt("memactualused"),
	MemTotal:      expvar.NewInt("memtotal"),
	LoadOne:       expvar.NewFloat("loadone"),
	LoadFive:      expvar.NewFloat("loadfive"),
	LoadFifteen:   expvar.NewFloat("loadfifteen"),
	CpuUser:       expvar.NewFloat("cpuuser"),
	CpuNice:       expvar.NewFloat("cpunice"),
	CpuSys:        expvar.NewFloat("cpusys"),
	CpuIdle:       expvar.NewFloat("cpuidle"),
	CpuWait:       expvar.NewFloat("cpuwait"),
	CpuIrq:        expvar.NewFloat("cpuirq"),
	CpuSoftIrq:    expvar.NewFloat("cpusoftirq"),
	CpuStolen:     expvar.NewFloat("cpustolen"),
	CpuTotal:      expvar.NewFloat("cputotal"),

	MessageSentPerSec: myexpvar.NewDiffInt("msg_sent_per_sec"),
	ConnectPerSec:     myexpvar.NewDiffInt("connect_per_sec"),
	GoroutinePerConn:  expvar.NewFloat("goroutine_per_conn"),
}

TODO: should not use expvar as we can't hold multiple MyMetrics metrics.

type MySystem

type MySystem struct {
	Broker MyBroker
}

type Retryable

type Retryable struct {
	Id      string
	Payload interface{}
}

TODO: haven't used this yet.

type Server

type Server interface {
	ListenAndServe() error

	Serve(l net.Listener) error

	Graceful()

	Stop()

	Listener() Listener
}

type TcpServer

type TcpServer struct {
	ListenAddress string
	Engine        *Momonga
	// contains filtered or unexported fields
}

func NewTcpServer

func NewTcpServer(engine *Momonga, config *configuration.Config, inherit bool) *TcpServer

func (*TcpServer) Graceful

func (self *TcpServer) Graceful()

func (*TcpServer) ListenAndServe

func (self *TcpServer) ListenAndServe() error

func (*TcpServer) Listener

func (self *TcpServer) Listener() Listener

func (*TcpServer) Serve

func (self *TcpServer) Serve(l net.Listener) error

func (*TcpServer) Stop

func (self *TcpServer) Stop()

type TlsServer

type TlsServer struct {
	ListenAddress string
	Engine        *Momonga
	// contains filtered or unexported fields
}

func NewTlsServer

func NewTlsServer(engine *Momonga, config *configuration.Config, inherit bool) *TlsServer

func (*TlsServer) Graceful

func (self *TlsServer) Graceful()

func (*TlsServer) ListenAndServe

func (self *TlsServer) ListenAndServe() error

func (*TlsServer) Listener

func (self *TlsServer) Listener() Listener

func (*TlsServer) Serve

func (self *TlsServer) Serve(l net.Listener) error

func (*TlsServer) Stop

func (self *TlsServer) Stop()

type TopicMatcher

type TopicMatcher interface {
	// TODO: should force []*SubscribeSet
	Match(Topic string) []interface{}

	Add(Topic string, Value interface{})

	Remove(Topic string, val interface{})

	Dump(writer io.Writer)
}

type UnixServer

type UnixServer struct {
	Engine  *Momonga
	Address string
	// contains filtered or unexported fields
}

func NewUnixServer

func NewUnixServer(engine *Momonga, config *configuration.Config, inherit bool) *UnixServer

func (*UnixServer) Graceful

func (self *UnixServer) Graceful()

func (*UnixServer) ListenAndServe

func (self *UnixServer) ListenAndServe() error

func (*UnixServer) Listener

func (self *UnixServer) Listener() Listener

func (*UnixServer) Serve

func (self *UnixServer) Serve(l net.Listener) error

func (*UnixServer) Stop

func (self *UnixServer) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL