broker

package
v1.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: Apache-2.0 Imports: 34 Imported by: 16

Documentation

Index

Constants

View Source
const (
	SUB = "1"
	PUB = "2"
)
View Source
const (
	MessagePoolNum        = 1024
	MessagePoolMessageNum = 1024
)
View Source
const (
	// BrokerInfoTopic special pub topic for cluster info
	BrokerInfoTopic = "broker000100101info"
	// CLIENT is an end user.
	CLIENT = 0
	// ROUTER is another router in the cluster.
	ROUTER = 1
	//REMOTE is the router connect to other cluster
	REMOTE  = 2
	CLUSTER = 3
)
View Source
const (
	Connected    = 1
	Disconnected = 2
)
View Source
const (
	// ACCEPT_MIN_SLEEP is the minimum acceptable sleep times on temporary errors.
	ACCEPT_MIN_SLEEP = 100 * time.Millisecond
	// ACCEPT_MAX_SLEEP is the maximum acceptable sleep times on temporary errors
	ACCEPT_MAX_SLEEP = 10 * time.Second
	// DEFAULT_ROUTE_CONNECT Route solicitation intervals.
	DEFAULT_ROUTE_CONNECT = 5 * time.Second
	// DEFAULT_TLS_TIMEOUT
	DEFAULT_TLS_TIMEOUT = 5 * time.Second
)
View Source
const (
	CONNECT = uint8(iota + 1)
	CONNACK
	PUBLISH
	PUBACK
	PUBREC
	PUBREL
	PUBCOMP
	SUBSCRIBE
	SUBACK
	UNSUBSCRIBE
	UNSUBACK
	PINGREQ
	PINGRESP
	DISCONNECT
)
View Source
const (
	QosAtMostOnce byte = iota
	QosAtLeastOnce
	QosExactlyOnce
	QosFailure = 0x80
)
View Source
const (
	CONNECTIONS = "api/v1/connections"
)

Variables

Functions

func FileExist added in v1.5.4

func FileExist(name string) bool

func GenUniqueId

func GenUniqueId() string

func InitHTTPMoniter

func InitHTTPMoniter(b *Broker)

func NewInfo

func NewInfo(sid, url string) *packets.PublishPacket

func NewTLSConfig

func NewTLSConfig(tlsInfo TLSInfo) (*tls.Config, error)

func ProcessMessage

func ProcessMessage(msg *Message)

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(config *Config) (*Broker, error)

func (*Broker) BroadcastInfoMessage

func (b *Broker) BroadcastInfoMessage(remoteID string, msg *packets.PublishPacket)

func (*Broker) BroadcastSubOrUnsubMessage

func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket)

func (*Broker) BroadcastUnSubscribe

func (b *Broker) BroadcastUnSubscribe(topicsToUnSubscribeFrom []string)

func (*Broker) CheckConnectAuth

func (b *Broker) CheckConnectAuth(clientID, username, password string) bool

func (*Broker) CheckRemoteExist

func (b *Broker) CheckRemoteExist(remoteID, url string) bool

func (*Broker) CheckTopicAuth

func (b *Broker) CheckTopicAuth(action, clientID, username, ip, topic string) bool

func (*Broker) ConnectToDiscovery

func (b *Broker) ConnectToDiscovery()

func (*Broker) DisConnClientByClientId

func (b *Broker) DisConnClientByClientId(clientId string)

func (*Broker) OnlineOfflineNotification

func (b *Broker) OnlineOfflineNotification(info Info, online bool, lastMsg int64)

func (*Broker) Publish

func (b *Broker) Publish(e *bridge.Elements) bool

func (*Broker) PublishMessage

func (b *Broker) PublishMessage(packet *packets.PublishPacket)

func (*Broker) PublishMessageByClientId

func (b *Broker) PublishMessageByClientId(packet *packets.PublishPacket, clientId string) error

func (*Broker) SendLocalSubsToRouter

func (b *Broker) SendLocalSubsToRouter(c *client)

func (*Broker) Start

func (b *Broker) Start()

func (*Broker) StartClientListening

func (b *Broker) StartClientListening(Tls bool)

func (*Broker) StartClusterListening

func (b *Broker) StartClusterListening()

func (*Broker) StartPipeSocketListening added in v1.5.5

func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool)

StartPipeSocketListening We use the open source npipe library to jump over pipe communication in linux

func (*Broker) StartUnixSocketClientListening added in v1.5.4

func (b *Broker) StartUnixSocketClientListening(socketPath string, unixSocket bool)

func (*Broker) StartWebsocketListening

func (b *Broker) StartWebsocketListening()

func (*Broker) SubmitWork

func (b *Broker) SubmitWork(clientId string, msg *Message)

type Config

type Config struct {
	Worker          int       `json:"workerNum"`
	HTTPPort        string    `json:"httpPort"`
	Host            string    `json:"host"`
	Port            string    `json:"port"`
	Cluster         RouteInfo `json:"cluster"`
	Router          string    `json:"router"`
	TlsHost         string    `json:"tlsHost"`
	TlsPort         string    `json:"tlsPort"`
	WsPath          string    `json:"wsPath"`
	WsPort          string    `json:"wsPort"`
	WsTLS           bool      `json:"wsTLS"`
	TlsInfo         TLSInfo   `json:"tlsInfo"`
	Debug           bool      `json:"debug"`
	Plugin          Plugins   `json:"plugins"`
	UnixFilePath    string    `json:"unixFilePath"`
	WindowsPipeName string    `json:"windowsPipeName"`
}
var DefaultConfig *Config = &Config{
	Worker: 4096,
	Host:   "0.0.0.0",
	Port:   "1883",
}

func ConfigureConfig

func ConfigureConfig(args []string) (*Config, error)

func LoadConfig

func LoadConfig(filename string) (*Config, error)

type ConnClient added in v1.5.2

type ConnClient struct {
	Info        `json:"info"`
	LastMsgTime int64 `json:"lastMsg"`
}

type InflightStatus

type InflightStatus uint8
const (
	Publish InflightStatus = 0
	Pubrel  InflightStatus = 1
)

type Info added in v1.5.2

type Info struct {
	ClientID  string    `json:"clientId"`
	Username  string    `json:"username"`
	Password  []byte    `json:"password"`
	Keepalive uint16    `json:"keepalive"`
	WillMsg   PubPacket `json:"willMsg"`
}

type Message

type Message struct {
	// contains filtered or unexported fields
}

type NamedPlugins

type NamedPlugins struct {
	Auth   string
	Bridge string
}

type OnlineOfflineMsg added in v1.5.2

type OnlineOfflineMsg struct {
	ClientID    string `json:"clientID"`
	Online      bool   `json:"online"`
	Timestamp   string `json:"timestamp"`
	ClientInfo  Info   `json:"info"`
	LastMsgTime int64  `json:"lastMsg"`
}

type Plugins

type Plugins struct {
	Auth   auth.Auth
	Bridge bridge.BridgeMQ
}

func (*Plugins) UnmarshalJSON

func (p *Plugins) UnmarshalJSON(b []byte) error

type PubPacket added in v1.5.2

type PubPacket struct {
	TopicName string `json:"topicName"`
	Payload   []byte `json:"payload"`
}

type RouteInfo

type RouteInfo struct {
	Host string `json:"host"`
	Port string `json:"port"`
}

type TLSInfo

type TLSInfo struct {
	Verify   bool   `json:"verify"`
	CaFile   string `json:"caFile"`
	CertFile string `json:"certFile"`
	KeyFile  string `json:"keyFile"`
}

Directories

Path Synopsis
lib

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL