broker

package
v0.0.0-...-edd4e79 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2023 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KeyClientPrefix         = "client/"
	KeyClientUnAckMessageID = `/unack/message_id`

	KeyClientUnRecPacketID        = `/unrec/packet_id`
	KeyClientUnCompPacketID       = `/uncomp/packet_id`
	KeyClientLatestAliveTime      = `/latest_alive_time`
	KeyClientLatestAckedMessageID = `/latest_acked_message_id`
	KeyClientSubTopic             = `/sub_topic`
	KeyConnectProperties          = `/connect_properties`
	KeyWillMessage                = `/will_message`
)
View Source
const (
	KeyTopicPrefix        = "topic/"
	KeyTopicWillMessage   = `/will_message`
	KeyTopicRetainMessage = `/retain_message`
)
View Source
const (
	QoS0 = QoS(0)
	QoS1 = 1
	QoS2 = 2
)

Variables

This section is empty.

Functions

func ClientConnectPropertiesKey

func ClientConnectPropertiesKey(clientID string) string

func ClientKey

func ClientKey(clientID string) *strings.Builder

func ClientSubTopicKey

func ClientSubTopicKey(clientID, topic string) string

func ClientSubTopicKeyPrefix

func ClientSubTopicKeyPrefix(clientID string) string

func ClientTopicUnAckKey

func ClientTopicUnAckKey(clientID, topic, messageID string) string

func ClientTopicUnAckKeyPrefix

func ClientTopicUnAckKeyPrefix(clientID, topic string) string

func ClientTopicUnFinishedMessagePrefix

func ClientTopicUnFinishedMessagePrefix(clientID, topic string) string

func ClientWillMessageKey

func ClientWillMessageKey(clientID string) string

func Decode

func Decode(rawData []byte) (*packet.PublishMessage, error)

Decode bytes to publish packet

func Encode

func Encode(version serializer.SerialVersion, publish *packet.PublishMessage, buf *bytes.Buffer) error

Encode publish packet to bytes

func TopicKey

func TopicKey(topic string) *strings.Builder

func TopicRetainMessage

func TopicRetainMessage(topic string) *strings.Builder

func TopicRetainMessageMessageIDKey

func TopicRetainMessageMessageIDKey(topic, messageID string) *strings.Builder

func TopicWillMessage

func TopicWillMessage(topic string) *strings.Builder

func TopicWillMessageMessageIDKey

func TopicWillMessageMessageIDKey(topic, messageID string) *strings.Builder

func TrimTopicWillMessageIDKey

func TrimTopicWillMessageIDKey(topic, key string) string

func WithClientKey

func WithClientKey(key, clientID string) string

Types

type KeyValueStore

type KeyValueStore interface {
	PutKey(ctx context.Context, key, value string) error
	ReadKey(ctx context.Context, key string) (string, bool, error)
	DeleteKey(ctx context.Context, key string) error
	ReadPrefixKey(ctx context.Context, prefix string) (map[string]string, error)
}

type KeyValueStoreWithTimeout

type KeyValueStoreWithTimeout struct {
	KeyValueStore
	// contains filtered or unexported fields
}

func NewKeyValueStoreWithTimout

func NewKeyValueStoreWithTimout(store KeyValueStore, timeout time.Duration) *KeyValueStoreWithTimeout

func (*KeyValueStoreWithTimeout) DefaultDeleteKey

func (s *KeyValueStoreWithTimeout) DefaultDeleteKey(key string) error

func (*KeyValueStoreWithTimeout) DefaultPutKey

func (s *KeyValueStoreWithTimeout) DefaultPutKey(key, value string) error

func (*KeyValueStoreWithTimeout) DefaultReadKey

func (s *KeyValueStoreWithTimeout) DefaultReadKey(key string) (string, bool, error)

func (*KeyValueStoreWithTimeout) DefaultReadPrefixKey

func (s *KeyValueStoreWithTimeout) DefaultReadPrefixKey(prefix string) (map[string]string, error)

type LocalKeyValueStore

type LocalKeyValueStore struct {
	// contains filtered or unexported fields
}

func NewLocalKeyValueStore

func NewLocalKeyValueStore() *LocalKeyValueStore

func (*LocalKeyValueStore) DeleteKey

func (l *LocalKeyValueStore) DeleteKey(ctx context.Context, key string) error

func (*LocalKeyValueStore) PutKey

func (l *LocalKeyValueStore) PutKey(ctx context.Context, key, value string) error

func (*LocalKeyValueStore) ReadKey

func (l *LocalKeyValueStore) ReadKey(ctx context.Context, key string) (string, bool, error)

func (*LocalKeyValueStore) ReadPrefixKey

func (l *LocalKeyValueStore) ReadPrefixKey(ctx context.Context, prefix string) (map[string]string, error)

type LocalSubCenter

type LocalSubCenter struct {
	// contains filtered or unexported fields
}

func NewLocalSubCenter

func NewLocalSubCenter() *LocalSubCenter

func (*LocalSubCenter) CreateSub

func (l *LocalSubCenter) CreateSub(clientID string, topics []packets.SubOptions) error

func (*LocalSubCenter) DeleteClient

func (l *LocalSubCenter) DeleteClient(clientID string)

func (*LocalSubCenter) DeleteSub

func (l *LocalSubCenter) DeleteSub(clientID string, topics []string) error

func (*LocalSubCenter) Match

func (l *LocalSubCenter) Match(topic string) (clientIDQos map[string]int32)

func (*LocalSubCenter) MatchTopic

func (l *LocalSubCenter) MatchTopic(topic string) (topics map[string]int32)

type MessageStoreEvent

type MessageStoreEvent interface {
	CreateListenMessageStoreEvent(topic string, handler func(...interface{}))
	DeleteListenMessageStoreEvent(topic string, handler func(i ...interface{}))
}

type PropertyKey

type PropertyKey = string

type PublishElement

type PublishElement interface {
	GetTopic() string
	GetResponseTopic() string
	GetQos() int
	GetContentType() string
}

type PublishListener

type PublishListener interface {
	CreatePublishEvent(topic string, handler func(i ...interface{}))
	DeletePublishEvent(topic string, handler func(i ...interface{}))
}

PublishListener is the interface of the publish event listener. It is used to listen the publish event from broker. The publish event will be triggered when the client publish a message to the broker.

type PublishWriter

type PublishWriter interface {
	// WritePacket writes the packet to the writer.
	// Warning: packetID is original packetID, method should change it to the new one that does not used.
	WritePacket(packet packets.Packet)

	GetID() string
	Close() error
}

type QoS

type QoS = byte

func Int32ToQoS

func Int32ToQoS(qos int32) QoS

type Session

type Session interface {
	SessionTopic
	Release()
	SessionWillMessage
	SessionCreateConnectProperties
}

type SessionConnectProperties

type SessionConnectProperties struct {
	ExpiryInterval    int64  `json:"expiry_interval"`
	ReceiveMaximum    uint16 `json:"receive_maximum"`
	MaximumPacketSize uint32 `json:"maximum_packet_size"`
	TopicAliasMaximum uint16 `json:"topic_alias_maximum"`
	RequestResponse   bool   `json:"request_response"`
	RequestProblem    bool   `json:"request_problem"`
}

type SessionCreateConnectProperties

type SessionCreateConnectProperties interface {
	GetConnectProperties() (*SessionConnectProperties, error)
	SetConnectProperties(properties *SessionConnectProperties) error
}

type SessionKey

type SessionKey string

type SessionManager

type SessionManager interface {
	ReadSession(key string) (Session, bool)
	DeleteSession(key string)
	CreateSession(key string, session Session)
	NewSession(key string) Session
}

type SessionTopic

type SessionTopic interface {
	ReadSubTopics() (topics map[string]*proto.SubOption)
	CreateSubTopic(topic string, option *proto.SubOption)
	DeleteSubTopic(topic string)
	SessionTopicMessage
}

type SessionTopicLatestPushedMessage

type SessionTopicLatestPushedMessage interface {
	ReadTopicLatestPushedMessageID(topic string) (messageID string, ok bool)
	SetTopicLatestPushedMessageID(topic string, messageID string)
	DeleteTopicLatestPushedMessageID(topic string, messageID string)
}

SessionTopicLatestPushedMessage save the latest pushed messageID for topic

type SessionTopicUnFinishedMessage

type SessionTopicUnFinishedMessage interface {
	CreateTopicUnFinishedMessage(topic string, message []UnFinishedMessage)
	ReadTopicUnFinishedMessage(topic string) (message []UnFinishedMessage)
	DeleteTopicUnFinishedMessage(topic string, messageID string)
}

SessionTopicUnFinishedMessage save the unfinished message for topic

type SessionWillMessage

type SessionWillMessage interface {
	GetWillMessage() (*WillMessage, error)
	SetWillMessage(message *WillMessage) error
}

type Store

type Store interface {
	TopicMessageStore
}

type StoreSerializer

type StoreSerializer interface {
	Encode(publish *packet.PublishMessage, buf *bytes.Buffer) error
	Decode(rawData []byte) (*packet.PublishMessage, error)
}

type SubCenter

type SubCenter interface {
	CreateSub(clientID string, topics []packets.SubOptions) error
	DeleteSub(clientID string, topics []string) error
	Match(topic string) (clientIDQos map[string]int32)
	DeleteClient(clientID string)
	MatchTopic(topic string) (topics map[string]int32)
}

SubCenter is the interface of the subscription center. It is used to manage the subscription of the clients.

type SubClient

type SubClient interface {
	GetClientID() string
	GetQoS() int32
}

SubClient is the interface of the subscription client.

type TopicMessageStore

type TopicMessageStore interface {
	ReadFromTimestamp(ctx context.Context, topic string, timestamp time.Time, limit int) ([]packet.PublishMessage, error)
	ReadTopicMessagesByID(ctx context.Context, topic, id string, limit int, include bool) ([]packet.PublishMessage, error)
	CreatePacket(topic string, value []byte) (id string, err error)
	DeleteTopicMessageID(ctx context.Context, topic, messageID string) error
}

type TopicStoreInfo

type TopicStoreInfo interface {
	GetTopicMessageTotalCount(ctx context.Context, topic string) (int64, error)
	DeleteTopicMessages(ctx context.Context, topic string) error
}

type UnFinishedMessage

type UnFinishedMessage struct {
	MessageID   string
	PacketID    string
	PubReceived bool
}

type UserProperties

type UserProperties = packets.User

type WillMessage

type WillMessage struct {
	MessageID   string
	DelayTaskID string
	Topic       string         `json:"topic"`
	QoS         int            `json:"qos"`
	Property    WillProperties `json:"property"`
	Retain      bool           `json:"retain"`
}

func ConnectPacketToWillMessage

func ConnectPacketToWillMessage(connect *packets.Connect, messageID string) *WillMessage

func (*WillMessage) ToPublishPacket

func (w *WillMessage) ToPublishPacket() *packets.Publish

type WillProperties

type WillProperties struct {
	WillDelayInterval int64            `json:"will_delay_interval"`
	PayloadFormat     int64            `json:"payload_format"`
	ExpiryInterval    int64            `json:"expiry_interval"`
	ContentType       string           `json:"content_type"`
	ResponseTopic     string           `json:"response_topic"`
	CorrelationData   []byte           `json:"correlation_data"`
	UserProperties    []UserProperties `json:"user_properties"`
}

Directories

Path Synopsis
message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL