Documentation ¶
Overview ¶
Messages contains functionalities to store inbound and outbound packets. This is important for QoS > 0 levels as acknowledgments shall be sent either from Broker->Client or Client->Broker. It also provides UUID and persistency.
Index ¶
- Constants
- Variables
- func DNextLevelP(topic []byte) ([]byte, []byte, error)
- func NextLevelP(topic []byte, sep byte, wldcd byte) (nlvl []byte, rem []byte, err error)
- func TopicComponents(topic []byte) (res [][]byte, err error)
- type MessageBox
- func (mb *MessageBox) AddInbound(msg protobase.EDProtocol) bool
- func (mb *MessageBox) AddOutbound(msg protobase.EDProtocol) bool
- func (mb *MessageBox) DeleteIn(msg protobase.EDProtocol) bool
- func (mb *MessageBox) DeleteOut(msg protobase.EDProtocol) bool
- func (mb *MessageBox) GetAllOut() (msgs []protobase.EDProtocol)
- func (mb *MessageBox) GetAllOutStr() (msgs []string)
- func (mb *MessageBox) GetIDStoreI() (idstore protobase.MSGIDInterface)
- func (mb *MessageBox) GetIDStoreO() (idstore protobase.MSGIDInterface)
- func (mb *MessageBox) GetInbound(uid uuid.UUID) (protobase.EDProtocol, bool)
- func (mb *MessageBox) GetOutbound(uid uuid.UUID) (protobase.EDProtocol, bool)
- type MessageId
- type MessageQueue
- func (mq *MessageQueue) Get() (item interface{})
- func (mq *MessageQueue) Insert(aid int, item interface{})
- func (mq *MessageQueue) InsertIn(aid int, item interface{})
- func (mq *MessageQueue) InsertOut(aid int, item interface{})
- func (mq *MessageQueue) ReleaseIn(aid int) (ok bool)
- func (mq *MessageQueue) ReleaseOut(aid int) (ok bool)
- type MessageQueueAck
- func (mqa *MessageQueueAck) CreateInAck(aid int) (ok bool)
- func (mqa *MessageQueueAck) CreateOutAck(aid int) (ok bool)
- func (mqa *MessageQueueAck) GetInAck(aid int) (ack *sync.Cond)
- func (mqa *MessageQueueAck) GetOutAck(aid int) (ack *sync.Cond)
- func (mqa *MessageQueueAck) HasInAck(aid int) bool
- func (mqa *MessageQueueAck) HasOutAck(aid int) bool
- func (mqa *MessageQueueAck) RemoveInAck(aid int) (ok bool)
- func (mqa *MessageQueueAck) RemoveOutAck(aid int) (ok bool)
- type MessageStore
- func (self *MessageStore) AddClient(client string)
- func (self *MessageStore) AddInbound(client string, msg protobase.EDProtocol) bool
- func (self *MessageStore) AddOutbound(client string, msg protobase.EDProtocol) bool
- func (self *MessageStore) Close(client string) bool
- func (self *MessageStore) DeleteIn(client string, msg protobase.EDProtocol) bool
- func (self *MessageStore) DeleteOut(client string, msg protobase.EDProtocol) bool
- func (self *MessageStore) Exists(client string) (ok bool)
- func (self *MessageStore) GetAllOut(client string) (msgs []protobase.EDProtocol)
- func (self *MessageStore) GetAllOutStr(client string) (msgs []string)
- func (self *MessageStore) GetIDStoreI(client string) (idstore protobase.MSGIDInterface)
- func (self *MessageStore) GetIDStoreO(client string) (idstore protobase.MSGIDInterface)
- func (self *MessageStore) GetInbound(client string, uid uuid.UUID) (protobase.EDProtocol, bool)
- func (self *MessageStore) GetOutbound(client string, uid uuid.UUID) (protobase.EDProtocol, bool)
- func (self *MessageStore) Init()
- type MsgEntry
- type QueueBox
- type QueueBoxEntry
- type QueueId
- type Retain
- type Subscribe
Constants ¶
const ( LROOT byte = iota LGC LWLCD LLEAF LCHR LBRK )
Enum for topic components.
const ( TSEP byte = '/' TWLDCD byte = '*' )
Default seperator and wildcard.
const ( MSGMINLEN = 1 MSGMAXLEN = 65535 )
Constant values for minimum and maximum number of packets. In reality, this number will not be reached easily as the enteries gets reused when they become free.
Variables ¶
var ( ERNotFound error = errors.New("subscribe: node not found") ERINVNode error = errors.New("subscribe: inconsistent / invalid node") )
Retain error messages
var (
ESNotFound error = errors.New("no such topic or client.")
)
Subscribe error messages
Functions ¶
func DNextLevelP ¶
DNextLevelP is a wrapper for `NextLevelP` and supplies default topic constants to it.
func NextLevelP ¶
NextLevepP seperates individual components in a topic string.
func TopicComponents ¶
TopicComponents is a wrapper func for `DNextLevelP`. It returns individual topic components in a slice of byte arrays. Errors must be explicitly checked.
Types ¶
type MessageBox ¶
func NewMessageBox ¶
func NewMessageBox() *MessageBox
func (*MessageBox) AddInbound ¶
func (mb *MessageBox) AddInbound(msg protobase.EDProtocol) bool
func (*MessageBox) AddOutbound ¶
func (mb *MessageBox) AddOutbound(msg protobase.EDProtocol) bool
func (*MessageBox) DeleteIn ¶
func (mb *MessageBox) DeleteIn(msg protobase.EDProtocol) bool
func (*MessageBox) DeleteOut ¶
func (mb *MessageBox) DeleteOut(msg protobase.EDProtocol) bool
DeleteOut disassociates a client from a outgoing packet.
func (*MessageBox) GetAllOut ¶
func (mb *MessageBox) GetAllOut() (msgs []protobase.EDProtocol)
GetAllOut returns all of available outgoing packets of a given client.
func (*MessageBox) GetAllOutStr ¶
func (mb *MessageBox) GetAllOutStr() (msgs []string)
func (*MessageBox) GetIDStoreI ¶
func (mb *MessageBox) GetIDStoreI() (idstore protobase.MSGIDInterface)
func (*MessageBox) GetIDStoreO ¶
func (mb *MessageBox) GetIDStoreO() (idstore protobase.MSGIDInterface)
func (*MessageBox) GetInbound ¶
func (mb *MessageBox) GetInbound(uid uuid.UUID) (protobase.EDProtocol, bool)
func (*MessageBox) GetOutbound ¶
func (mb *MessageBox) GetOutbound(uid uuid.UUID) (protobase.EDProtocol, bool)
type MessageId ¶
MessageId is a struct which contains mapping of packet number to their unique identifier.
func NewMessageId ¶
func NewMessageId() *MessageId
NewMessageId returns a pointer to a new `MessageId` struct.
func (*MessageId) FreeId ¶
FreeId removes a `id` from internal mapping. Note that it has no effect on cursor ( cursor is not decremented ).
func (*MessageId) GetFNewID ¶
GetFNewID finds an empty slot and returns a new `uint16` associated with that slot as well as a `cursor`. Cursor increments on each new association and restarts when maximum message length is reached.
func (*MessageId) IsOccupied ¶
IsOccupied returns a `bool` indicating wether a certain id is in use or not.
type MessageQueue ¶
type MessageQueue struct { sync.RWMutex Ack *MessageQueueAck Q *containers.Queue Last interface{} }
func NewMessageQueue ¶
func NewMessageQueue() *MessageQueue
func (*MessageQueue) Get ¶
func (mq *MessageQueue) Get() (item interface{})
func (*MessageQueue) Insert ¶
func (mq *MessageQueue) Insert(aid int, item interface{})
func (*MessageQueue) InsertIn ¶
func (mq *MessageQueue) InsertIn(aid int, item interface{})
func (*MessageQueue) InsertOut ¶
func (mq *MessageQueue) InsertOut(aid int, item interface{})
func (*MessageQueue) ReleaseIn ¶
func (mq *MessageQueue) ReleaseIn(aid int) (ok bool)
func (*MessageQueue) ReleaseOut ¶
func (mq *MessageQueue) ReleaseOut(aid int) (ok bool)
type MessageQueueAck ¶
func NewMessageQueueAck ¶
func NewMessageQueueAck() *MessageQueueAck
func (*MessageQueueAck) CreateInAck ¶
func (mqa *MessageQueueAck) CreateInAck(aid int) (ok bool)
func (*MessageQueueAck) CreateOutAck ¶
func (mqa *MessageQueueAck) CreateOutAck(aid int) (ok bool)
func (*MessageQueueAck) HasInAck ¶
func (mqa *MessageQueueAck) HasInAck(aid int) bool
func (*MessageQueueAck) HasOutAck ¶
func (mqa *MessageQueueAck) HasOutAck(aid int) bool
func (*MessageQueueAck) RemoveInAck ¶
func (mqa *MessageQueueAck) RemoveInAck(aid int) (ok bool)
func (*MessageQueueAck) RemoveOutAck ¶
func (mqa *MessageQueueAck) RemoveOutAck(aid int) (ok bool)
type MessageStore ¶
MessageStore is a struct which acts as an entry into the persistent storage.
func NewInitedMessageStore ¶
func NewInitedMessageStore() *MessageStore
NewInitedMessageStore returns a pointer to a new `MessageStore` and allocates memory. This is equivalent of calling `Init()` after `NewMessageStore`
func NewMessageStore ¶
func NewMessageStore() *MessageStore
NewMessageStore returns a pointer to a new `MessageStore`.
func (*MessageStore) AddClient ¶
func (self *MessageStore) AddClient(client string)
AddClient initializes and adds a entry for a given client.
func (*MessageStore) AddInbound ¶
func (self *MessageStore) AddInbound(client string, msg protobase.EDProtocol) bool
AddInbound associates a client to a incoming packet. It checks if the given client is in the storage, otherwise returns `false`.
func (*MessageStore) AddOutbound ¶
func (self *MessageStore) AddOutbound(client string, msg protobase.EDProtocol) bool
AddOutbound associates a client to a ougoing packet.
func (*MessageStore) Close ¶
func (self *MessageStore) Close(client string) bool
Close removes the entry of the client from internal mappings. It returns `false` if client does not exist.
func (*MessageStore) DeleteIn ¶
func (self *MessageStore) DeleteIn(client string, msg protobase.EDProtocol) bool
DeleteIn disassociates a client from a incoming packet.
func (*MessageStore) DeleteOut ¶
func (self *MessageStore) DeleteOut(client string, msg protobase.EDProtocol) bool
DeleteOut disassociates a client from a outgoing packet.
func (*MessageStore) Exists ¶
func (self *MessageStore) Exists(client string) (ok bool)
Exists returns a `bool` indicating whether a client is already registered or not.
func (*MessageStore) GetAllOut ¶
func (self *MessageStore) GetAllOut(client string) (msgs []protobase.EDProtocol)
GetAllOut returns all of available outgoing packets of a given client.
func (*MessageStore) GetAllOutStr ¶
func (self *MessageStore) GetAllOutStr(client string) (msgs []string)
GetAllOutStr returns string UUID repr of all available outgoing packets of a given client.
func (*MessageStore) GetIDStoreI ¶
func (self *MessageStore) GetIDStoreI(client string) (idstore protobase.MSGIDInterface)
func (*MessageStore) GetIDStoreO ¶
func (self *MessageStore) GetIDStoreO(client string) (idstore protobase.MSGIDInterface)
func (*MessageStore) GetInbound ¶
func (self *MessageStore) GetInbound(client string, uid uuid.UUID) (protobase.EDProtocol, bool)
func (*MessageStore) GetOutbound ¶
func (self *MessageStore) GetOutbound(client string, uid uuid.UUID) (protobase.EDProtocol, bool)
func (*MessageStore) Init ¶
func (self *MessageStore) Init()
Init initializes all of internal struct members such as inbound and outbound mappings.
type QueueBoxEntry ¶
type QueueBoxEntry struct {
// contains filtered or unexported fields
}
type QueueId ¶
func NewQueueId ¶
func NewQueueId() *QueueId
NewQueueId returns a pointer to a new initialized and allocated `QueueId` struct.
func (*QueueId) FreeId ¶
FreeId removes a `id` from internal mapping. Note that it has no effect on cursor ( cursor is not decremented ).
func (*QueueId) GetFNewID ¶
GetFNewID finds an empty slot and returns a new `uint16` associated with that slot as well as a `cursor`. Cursor increments on each new association and restarts when maximum message length is reached.
func (*QueueId) IsOccupied ¶
IsOccupied returns a `bool` indicating wether a certain id is in use or not.
type Retain ¶
type Retain struct {
// contains filtered or unexported fields
}
Retain is the container for retained messages.
func NewRetain ¶
func NewRetain() *Retain
NewRetain allocates and initializes a new `Retain` struct and returns a pointer to it. It also allocates the internal `map`.
func (*Retain) Find ¶
func (self *Retain) Find(topic []byte) (packet protobase.EDProtocol, err error)
Find finds a node associated with `topic` and returns its associated packet.