mqtt_broker

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 7, 2020 License: Apache-2.0 Imports: 29 Imported by: 0

README

#mqtt-broker

mqtt-broker is mqtt server

  • support Qos0,QoS1

mqtt-event

type mean
connect mqtt client connect
subscribe mqtt client request subscribe
unsubscribe mqtt client request unsubscribe

todo

  • mqtt-event-queue for sub/pub mqtt-broker event
  • mqtt-broker save subscribe tree snapshot to file,and reload it when restart

Documentation

Index

Constants

View Source
const MQTTEventStream = "$streamIO-mqtt-broker-event"
View Source
const MaxEventSize = 1024 * 1024 * 128

Variables

View Source
var (
	Event_Type_name = map[int32]string{
		0: "SubscribeEvent",
		1: "UnSubscribeEvent",
		2: "RetainMessageEvent",
		3: "ClientStatusChangeEvent",
	}
	Event_Type_value = map[string]int32{
		"SubscribeEvent":          0,
		"UnSubscribeEvent":        1,
		"RetainMessageEvent":      2,
		"ClientStatusChangeEvent": 3,
	}
)

Enum value maps for Event_Type.

View Source
var (
	ClientStatusChangeEvent_Status_name = map[int32]string{
		0: "Online",
		1: "Offline",
	}
	ClientStatusChangeEvent_Status_value = map[string]int32{
		"Online":  0,
		"Offline": 1,
	}
)

Enum value maps for ClientStatusChangeEvent_Status.

View Source
var File_streamIO_mqtt_broker_event_proto protoreflect.FileDescriptor

Functions

func PPrintln

func PPrintln(obj interface{})

Types

type Broker

type Broker struct {
	Options
	// contains filtered or unexported fields
}

func New

func New(options Options) *Broker

func (*Broker) Start

func (broker *Broker) Start() error

type ClientStatusChangeEvent

type ClientStatusChangeEvent struct {
	SessionID int64                          `protobuf:"varint,1,opt,name=sessionID,proto3" json:"sessionID,omitempty"`
	Status    ClientStatusChangeEvent_Status `protobuf:"varint,2,opt,name=status,proto3,enum=mqtt_broker.ClientStatusChangeEvent_Status" json:"status,omitempty"`
	// contains filtered or unexported fields
}

func (*ClientStatusChangeEvent) Descriptor deprecated

func (*ClientStatusChangeEvent) Descriptor() ([]byte, []int)

Deprecated: Use ClientStatusChangeEvent.ProtoReflect.Descriptor instead.

func (*ClientStatusChangeEvent) GetSessionID

func (x *ClientStatusChangeEvent) GetSessionID() int64

func (*ClientStatusChangeEvent) GetStatus

func (*ClientStatusChangeEvent) ProtoMessage

func (*ClientStatusChangeEvent) ProtoMessage()

func (*ClientStatusChangeEvent) ProtoReflect

func (x *ClientStatusChangeEvent) ProtoReflect() protoreflect.Message

func (*ClientStatusChangeEvent) Reset

func (x *ClientStatusChangeEvent) Reset()

func (*ClientStatusChangeEvent) String

func (x *ClientStatusChangeEvent) String() string

type ClientStatusChangeEvent_Status

type ClientStatusChangeEvent_Status int32
const (
	ClientStatusChangeEvent_Online  ClientStatusChangeEvent_Status = 0
	ClientStatusChangeEvent_Offline ClientStatusChangeEvent_Status = 1
)

func (ClientStatusChangeEvent_Status) Descriptor

func (ClientStatusChangeEvent_Status) Enum

func (ClientStatusChangeEvent_Status) EnumDescriptor deprecated

func (ClientStatusChangeEvent_Status) EnumDescriptor() ([]byte, []int)

Deprecated: Use ClientStatusChangeEvent_Status.Descriptor instead.

func (ClientStatusChangeEvent_Status) Number

func (ClientStatusChangeEvent_Status) String

func (ClientStatusChangeEvent_Status) Type

type Event

type Event struct {
	Data []byte     `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
	Type Event_Type `protobuf:"varint,2,opt,name=type,proto3,enum=mqtt_broker.Event_Type" json:"type,omitempty"`
	// contains filtered or unexported fields
}

func (*Event) Descriptor deprecated

func (*Event) Descriptor() ([]byte, []int)

Deprecated: Use Event.ProtoReflect.Descriptor instead.

func (*Event) GetData

func (x *Event) GetData() []byte

func (*Event) GetType

func (x *Event) GetType() Event_Type

func (*Event) ProtoMessage

func (*Event) ProtoMessage()

func (*Event) ProtoReflect

func (x *Event) ProtoReflect() protoreflect.Message

func (*Event) Reset

func (x *Event) Reset()

func (*Event) String

func (x *Event) String() string

type EventReader

type EventReader struct {
	// contains filtered or unexported fields
}

func (*EventReader) Close

func (eReader *EventReader) Close() error

type EventWithOffset

type EventWithOffset struct {
	// contains filtered or unexported fields
}

type Event_Type

type Event_Type int32
const (
	Event_SubscribeEvent          Event_Type = 0
	Event_UnSubscribeEvent        Event_Type = 1
	Event_RetainMessageEvent      Event_Type = 2
	Event_ClientStatusChangeEvent Event_Type = 3
)

func (Event_Type) Descriptor

func (Event_Type) Descriptor() protoreflect.EnumDescriptor

func (Event_Type) Enum

func (x Event_Type) Enum() *Event_Type

func (Event_Type) EnumDescriptor deprecated

func (Event_Type) EnumDescriptor() ([]byte, []int)

Deprecated: Use Event_Type.Descriptor instead.

func (Event_Type) Number

func (x Event_Type) Number() protoreflect.EnumNumber

func (Event_Type) String

func (x Event_Type) String() string

func (Event_Type) Type

type Node

type Node struct {
	CoW *copyOnWrite
	// contains filtered or unexported fields
}

type Offset

type Offset interface {
	// contains filtered or unexported methods
}

type Options

type Options struct {
	MetaServerAddr           string        `json:"meta_server_addr"`
	HOST                     string        `json:"host"`
	BindPort                 int           `json:"bind_port"`
	BindTLSPort              int           `json:"bind_tls_port"`
	DefaultKeepalive         uint16        `json:"default_keepalive"`
	MinKeepalive             uint16        `json:"min_keepalive"`
	CheckpointEventSize      int64         `json:"checkpoint_event_size"`
	SnapshotPath             string        `json:"snapshot_path"`
	BrokerId                 int64         `json:"broker_id"`
	LogFile                  string        `json:"log_file"`
	LogLevel                 logrus.Level  `json:"log_level"`
	ReadOffsetCommitInterval time.Duration `json:"read_offset_commit_interval"`
}

func DefaultOptions

func DefaultOptions() Options

func (Options) WithBindPort

func (options Options) WithBindPort(val int) Options

func (Options) WithBindTLSPort

func (options Options) WithBindTLSPort(val int) Options

func (Options) WithBrokerId

func (options Options) WithBrokerId(val int64) Options

func (Options) WithCheckpointEventSize

func (options Options) WithCheckpointEventSize(val int64) Options

func (Options) WithDefaultKeepalive

func (options Options) WithDefaultKeepalive(val uint16) Options

func (Options) WithHOST

func (options Options) WithHOST(val string) Options

func (Options) WithLogFile

func (options Options) WithLogFile(val string) Options

func (Options) WithLogLevel

func (options Options) WithLogLevel(val logrus.Level) Options

func (Options) WithMetaServerAddr

func (options Options) WithMetaServerAddr(val string) Options

func (Options) WithSnapshotPath

func (options Options) WithSnapshotPath(val string) Options

type RetainMessageEvent

type RetainMessageEvent struct {
	Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
	// contains filtered or unexported fields
}

func (*RetainMessageEvent) Descriptor deprecated

func (*RetainMessageEvent) Descriptor() ([]byte, []int)

Deprecated: Use RetainMessageEvent.ProtoReflect.Descriptor instead.

func (*RetainMessageEvent) GetData

func (x *RetainMessageEvent) GetData() []byte

func (*RetainMessageEvent) ProtoMessage

func (*RetainMessageEvent) ProtoMessage()

func (*RetainMessageEvent) ProtoReflect

func (x *RetainMessageEvent) ProtoReflect() protoreflect.Message

func (*RetainMessageEvent) Reset

func (x *RetainMessageEvent) Reset()

func (*RetainMessageEvent) String

func (x *RetainMessageEvent) String() string

type Snapshot

type Snapshot struct {
	// contains filtered or unexported fields
}

func NewSnapshot

func NewSnapshot(path string) *Snapshot

func (*Snapshot) WriteSnapshot

func (s *Snapshot) WriteSnapshot(header SnapshotHeader, topicTree *TopicTree, metaTree *btree.BTree) error

type SnapshotHeader

type SnapshotHeader struct {
	TS     time.Time `json:"ts"`
	Offset int64     `json:"offset"`
}

type SubscribeEvent

type SubscribeEvent struct {
	SessionId      int64                 `protobuf:"varint,1,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"`
	Qos0StreamInfo *store.StreamInfoItem `protobuf:"bytes,2,opt,name=qos0_stream_info,json=qos0StreamInfo,proto3" json:"qos0_stream_info,omitempty"`
	Qos1StreamInfo *store.StreamInfoItem `protobuf:"bytes,3,opt,name=qos1_stream_info,json=qos1StreamInfo,proto3" json:"qos1_stream_info,omitempty"`
	Topic          map[string]int32      `` /* 152-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (*SubscribeEvent) Descriptor deprecated

func (*SubscribeEvent) Descriptor() ([]byte, []int)

Deprecated: Use SubscribeEvent.ProtoReflect.Descriptor instead.

func (*SubscribeEvent) GetQos0StreamInfo

func (x *SubscribeEvent) GetQos0StreamInfo() *store.StreamInfoItem

func (*SubscribeEvent) GetQos1StreamInfo

func (x *SubscribeEvent) GetQos1StreamInfo() *store.StreamInfoItem

func (*SubscribeEvent) GetSessionId

func (x *SubscribeEvent) GetSessionId() int64

func (*SubscribeEvent) GetTopic

func (x *SubscribeEvent) GetTopic() map[string]int32

func (*SubscribeEvent) ProtoMessage

func (*SubscribeEvent) ProtoMessage()

func (*SubscribeEvent) ProtoReflect

func (x *SubscribeEvent) ProtoReflect() protoreflect.Message

func (*SubscribeEvent) Reset

func (x *SubscribeEvent) Reset()

func (*SubscribeEvent) String

func (x *SubscribeEvent) String() string

type Subscriber

type Subscriber interface {
	ID() int64
	Topic() string
	Qos() int32
	Online() bool
	// contains filtered or unexported methods
}

type TopicTree

type TopicTree struct {
	// contains filtered or unexported fields
}

func NewTopicTree

func NewTopicTree() *TopicTree

func (*TopicTree) Clone

func (tree *TopicTree) Clone() *TopicTree

func (*TopicTree) Delete

func (tree *TopicTree) Delete(subscriber Subscriber)

func (*TopicTree) Insert

func (tree *TopicTree) Insert(sub Subscriber)

func (*TopicTree) Match

func (tree *TopicTree) Match(topic string) []map[int64]Subscriber

func (*TopicTree) RangeRetainMessage

func (tree *TopicTree) RangeRetainMessage(f func(packet *packets.PublishPacket) bool)

func (*TopicTree) UpdateRetainPacket

func (tree *TopicTree) UpdateRetainPacket(packet *packets.PublishPacket)

func (*TopicTree) Walk

func (tree *TopicTree) Walk(f func(path string, subscribers map[int64]Subscriber) bool)

type UnSubscribeEvent

type UnSubscribeEvent struct {
	SessionId int64    `protobuf:"varint,2,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"`
	Topic     []string `protobuf:"bytes,4,rep,name=topic,proto3" json:"topic,omitempty"`
	// contains filtered or unexported fields
}

func (*UnSubscribeEvent) Descriptor deprecated

func (*UnSubscribeEvent) Descriptor() ([]byte, []int)

Deprecated: Use UnSubscribeEvent.ProtoReflect.Descriptor instead.

func (*UnSubscribeEvent) GetSessionId

func (x *UnSubscribeEvent) GetSessionId() int64

func (*UnSubscribeEvent) GetTopic

func (x *UnSubscribeEvent) GetTopic() []string

func (*UnSubscribeEvent) ProtoMessage

func (*UnSubscribeEvent) ProtoMessage()

func (*UnSubscribeEvent) ProtoReflect

func (x *UnSubscribeEvent) ProtoReflect() protoreflect.Message

func (*UnSubscribeEvent) Reset

func (x *UnSubscribeEvent) Reset()

func (*UnSubscribeEvent) String

func (x *UnSubscribeEvent) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL