broker

package
v0.0.0-...-fe63319 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2022 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

package broker provides a MQTT 3.1.1 compliant MQTT broker.

Index

Constants

View Source
const (
	// 当前服务器的版本
	Version = "1.1.1"
)

Variables

View Source
var (
	// ErrListenerIDExists indicates that a listener with the same id already exists.
	ErrListenerIDExists = errors.New("监听器已经退出")

	// ErrReadConnectInvalid indicates that the connection packet was invalid.
	ErrReadConnectInvalid = errors.New("connect packet 错误")

	// ErrConnectNotAuthorized indicates that the connection packet had incorrect auth values.
	ErrConnectNotAuthorized = errors.New("授权错误")

	// ErrInvalidTopic indicates that the specified topic was not valid.
	ErrInvalidTopic = errors.New("cannot publish to $ and $SYS topics")

	// ErrRejectPacket indicates that a packet should be dropped instead of processed.
	ErrRejectPacket = errors.New("packet 被拒绝")

	// ErrClientDisconnect indicates that a client disconnected from the server.
	ErrClientDisconnect = errors.New("客户端断开连接")

	// ErrClientReconnect indicates that a client attempted to reconnect while still connected.
	ErrClientReconnect = errors.New("client sent connect while connected")

	// ErrServerShutdown is propagated when the server shuts down.
	ErrServerShutdown = errors.New("服务器已经下线")

	// ErrSessionReestablished indicates that an existing client was replaced by a newly connected
	// client. The existing client is disconnected.
	ErrSessionReestablished = errors.New("client session re-established")

	// ErrConnectionFailed indicates that a client connection attempt failed for other reasons.
	ErrConnectionFailed = errors.New("connection attempt failed")

	// 定时发送$Sys 间隔
	SysTopicInterval time.Duration = 30000
)
View Source
var Logger *zap.Logger

Functions

func InitConfig

func InitConfig(f string, config *ServerConfig)

func InitLogger

func InitLogger(conf *ServerConfig)

Types

type BrokerConfig

type BrokerConfig struct {
	Protocol string
	Name     string
	Addr     string
	Port     string
	Enable   bool
	Stdout   string
	Stderr   string
}

type Options

type Options struct {
	// BufferSize overrides the default buffer size (circ.DefaultBufferSize) for the client buffers.
	BufferSize int

	// BufferBlockSize overrides the default buffer block size (DefaultBlockSize) for the client buffers.
	BufferBlockSize int

	// 消息飞行时间
	InflightTTL int64
}

type Server

type Server struct {
	Events    events.Events        // 事件回调列表
	Store     persistence.Store    // 持久化后端.
	Options   *Options             // 服务器配置信息.
	Listeners *listeners.Listeners // tcp服务监听器.
	Clients   *clients.Clients     // 当前broker下面的全部client
	Topics    *topics.Index        // topic树
	System    *system.Info         // 系统的信息 $SYS 内容.
	// contains filtered or unexported fields
}

使用 server.New() 初始化 broker

func New

func New() *Server

仅限测试用

func NewServer

func NewServer(opts *Options) *Server

返回一个server实例

func (*Server) AddListener

func (s *Server) AddListener(listener listeners.Listener, config *listeners.Config) error

开启多个网络监听器

func (*Server) AddStore

func (s *Server) AddStore(p persistence.Store) error

设置持久化后端 必须在启动server之前调用.

func (*Server) Close

func (s *Server) Close() error

Close attempts to gracefully shutdown the server, all listeners, clients, and stores.

func (*Server) EstablishConnection

func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller) error

处理每一个新的客户端连接

func (*Server) Publish

func (s *Server) Publish(topic string, payload []byte, retain bool) error

Publish creates a publish packet from a payload and sends it to the inline.pub channel, where it is written directly to the outgoing byte buffers of any clients subscribed to the given topic. Because the message is written directly within the server, QoS is inherently 2 (exactly once).

func (*Server) ResendClientInflight

func (s *Server) ResendClientInflight(cl *clients.Client, force bool) error

ResendClientInflight attempts to resend all undelivered inflight messages to a client.

func (*Server) Serve

func (s *Server) Serve() error

开始事件调度 on all attached listeners, and publishing the system topics.

type ServerConfig

type ServerConfig struct {
	Stdout      string
	Stderr      string
	Broker      map[string]BrokerConfig
	Persistence map[string]interface{}
}

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL