broker

package
v0.0.0-...-5f750f4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2021 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SUB = "1"
	PUB = "2"
)
View Source
const (
	MessagePoolNum        = 1024
	MessagePoolMessageNum = 1024
)
View Source
const (
	// special pub topic for cluster info BrokerInfoTopic
	BrokerInfoTopic = "broker000100101info"
	// CLIENT is an end user.
	CLIENT = 0
	// ROUTER is another router in the cluster.
	ROUTER = 1
	//REMOTE is the router connect to other cluster
	REMOTE  = 2
	CLUSTER = 3
)
View Source
const (
	Connected    = 1
	Disconnected = 2
)
View Source
const (
	// ACCEPT_MIN_SLEEP is the minimum acceptable sleep times on temporary errors.
	ACCEPT_MIN_SLEEP = 100 * time.Millisecond
	// ACCEPT_MAX_SLEEP is the maximum acceptable sleep times on temporary errors
	ACCEPT_MAX_SLEEP = 10 * time.Second
	// DEFAULT_ROUTE_CONNECT Route solicitation intervals.
	DEFAULT_ROUTE_CONNECT = 5 * time.Second
	// DEFAULT_TLS_TIMEOUT
	DEFAULT_TLS_TIMEOUT = 5 * time.Second
)
View Source
const (
	CONNECT = uint8(iota + 1)
	CONNACK
	PUBLISH
	PUBACK
	PUBREC
	PUBREL
	PUBCOMP
	SUBSCRIBE
	SUBACK
	UNSUBSCRIBE
	UNSUBACK
	PINGREQ
	PINGRESP
	DISCONNECT
)
View Source
const (
	QosAtMostOnce byte = iota
	QosAtLeastOnce
	QosExactlyOnce
	QosFailure = 0x80
)

Variables

Functions

func GenUniqueId

func GenUniqueId() string

func InitHTTPMoniter

func InitHTTPMoniter(b *Broker)

func NewInfo

func NewInfo(sid, url string, isforword bool) *packets.PublishPacket

func NewTLSConfig

func NewTLSConfig(tlsInfo TLSInfo) (*tls.Config, error)

func ProcessMessage

func ProcessMessage(msg *Message)

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(config *Config) (*Broker, error)

func (*Broker) BroadcastInfoMessage

func (b *Broker) BroadcastInfoMessage(remoteID string, msg *packets.PublishPacket)

func (*Broker) BroadcastSubOrUnsubMessage

func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket)

func (*Broker) BroadcastUnSubscribe

func (b *Broker) BroadcastUnSubscribe(topicsToUnSubscribeFrom []string)

func (*Broker) CheckConnectAuth

func (b *Broker) CheckConnectAuth(clientID, username, password string) bool

func (*Broker) CheckRemoteExist

func (b *Broker) CheckRemoteExist(remoteID, url string) bool

func (*Broker) CheckTopicAuth

func (b *Broker) CheckTopicAuth(action, clientID, username, ip, topic string) bool

func (*Broker) ConnectToDiscovery

func (b *Broker) ConnectToDiscovery()

func (*Broker) OnlineOfflineNotification

func (b *Broker) OnlineOfflineNotification(clientID string, online bool)

func (*Broker) Publish

func (b *Broker) Publish(e *bridge.Elements)

func (*Broker) PublishMessage

func (b *Broker) PublishMessage(packet *packets.PublishPacket)

func (*Broker) SendLocalSubsToRouter

func (b *Broker) SendLocalSubsToRouter(c *client)

func (*Broker) Start

func (b *Broker) Start()

func (*Broker) StartClientListening

func (b *Broker) StartClientListening(Tls bool)

func (*Broker) StartClusterListening

func (b *Broker) StartClusterListening()

func (*Broker) StartWebsocketListening

func (b *Broker) StartWebsocketListening()

func (*Broker) SubmitWork

func (b *Broker) SubmitWork(clientId string, msg *Message)

type Config

type Config struct {
	Worker   int       `json:"workerNum"`
	HTTPPort string    `json:"httpPort"`
	Host     string    `json:"host"`
	Port     string    `json:"port"`
	Cluster  RouteInfo `json:"cluster"`
	Router   string    `json:"router"`
	TlsHost  string    `json:"tlsHost"`
	TlsPort  string    `json:"tlsPort"`
	WsPath   string    `json:"wsPath"`
	WsPort   string    `json:"wsPort"`
	WsTLS    bool      `json:"wsTLS"`
	TlsInfo  TLSInfo   `json:"tlsInfo"`
	Debug    bool      `json:"debug"`
	Plugin   Plugins   `json:"plugins"`
}
var DefaultConfig *Config = &Config{
	Worker: 4096,
	Host:   "0.0.0.0",
	Port:   "1883",
}

func ConfigureConfig

func ConfigureConfig(args []string) (*Config, error)

func LoadConfig

func LoadConfig(filename string) (*Config, error)

type InflightStatus

type InflightStatus uint8
const (
	Publish InflightStatus = 0
	Pubrel  InflightStatus = 1
)

type Message

type Message struct {
	// contains filtered or unexported fields
}

type NamedPlugins

type NamedPlugins struct {
	Auth   string
	Bridge string
}

type Plugins

type Plugins struct {
	Auth   auth.Auth
	Bridge bridge.BridgeMQ
}

func (*Plugins) UnmarshalJSON

func (p *Plugins) UnmarshalJSON(b []byte) error

type RouteInfo

type RouteInfo struct {
	Host string `json:"host"`
	Port string `json:"port"`
}

type TLSInfo

type TLSInfo struct {
	Verify   bool   `json:"verify"`
	CaFile   string `json:"caFile"`
	CertFile string `json:"certFile"`
	KeyFile  string `json:"keyFile"`
}

Directories

Path Synopsis
lib

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL