messages

package
v0.0.0-...-7aac46a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2023 License: MIT Imports: 7 Imported by: 0

README

Messages

Persistent packet storage

Message order preservation

Description:

Preserve the original ordering in which messages were published to the wait queue while keeping memory usage and lookup complexity to a minimum.

Use Case:

Suppose clientA publishes a set of messages {message1, message2, ...., messagen} to a particular topic T which is subscribed by clientB. When clientB is offline and has a subscription with QoS > 0, messages are stored for redelivery whenever clientB becomes online. Due to the message sequence number ( Message ID ), messages can become out of order which violates consistency rule. Therefore it is crucial to have a mechanism to preserve message order, it should satisfy rule for two stages. The first stage is the partial ordering of unacknowledged messages, and the second stage is total order preservation of messages with the higher quality of service.

Change(s):
  • Improve message storage by implementing it with Ordered Sets as opposed to the current implementation which uses Hash Sets.

Test units

check to ensure that it passes all test cases and view the code coverage in html mode.

NOTE: msgidfull is the build tag for concurrent test case of GetNewID(uuid.UUID) which requires 65535 ( 0xFFFF )insertions. It must return 0 when all 65535 slots are occupied.

$ go test -v -coverprofile=cover.out . && go tool cover -html=cover.out
$ go test -v -tags msgidfull -coverprofile=cover.out . && go tool cover -html=cover.out

Documentation

Overview

Messages contains functionalities to store inbound and outbound packets. This is important for QoS > 0 levels as acknowledgments shall be sent either from Broker->Client or Client->Broker. It also provides UUID and persistency.

Index

Constants

View Source
const (
	LROOT byte = iota
	LGC
	LWLCD
	LLEAF
	LCHR
	LBRK
)

Enum for topic components.

View Source
const (
	TSEP   byte = '/'
	TWLDCD byte = '*'
)

Default seperator and wildcard.

View Source
const (
	MSGMINLEN = 1
	MSGMAXLEN = 65535
)

Constant values for minimum and maximum number of packets. In reality, this number will not be reached easily as the enteries gets reused when they become free.

Variables

View Source
var (
	ERNotFound error = errors.New("subscribe: node not found")
	ERINVNode  error = errors.New("subscribe: inconsistent / invalid node")
)

Retain error messages

View Source
var (
	ESNotFound error = errors.New("no such topic or client.")
)

Subscribe error messages

Functions

func DNextLevelP

func DNextLevelP(topic []byte) ([]byte, []byte, error)

DNextLevelP is a wrapper for `NextLevelP` and supplies default topic constants to it.

func NextLevelP

func NextLevelP(topic []byte, sep byte, wldcd byte) (nlvl []byte, rem []byte, err error)

NextLevepP seperates individual components in a topic string.

func TopicComponents

func TopicComponents(topic []byte) (res [][]byte, err error)

TopicComponents is a wrapper func for `DNextLevelP`. It returns individual topic components in a slice of byte arrays. Errors must be explicitly checked.

Types

type MessageBox

type MessageBox struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewMessageBox

func NewMessageBox() *MessageBox

func (*MessageBox) AddInbound

func (mb *MessageBox) AddInbound(msg protobase.EDProtocol) bool

func (*MessageBox) AddOutbound

func (mb *MessageBox) AddOutbound(msg protobase.EDProtocol) bool

func (*MessageBox) DeleteIn

func (mb *MessageBox) DeleteIn(msg protobase.EDProtocol) bool

func (*MessageBox) DeleteOut

func (mb *MessageBox) DeleteOut(msg protobase.EDProtocol) bool

DeleteOut disassociates a client from a outgoing packet.

func (*MessageBox) GetAllOut

func (mb *MessageBox) GetAllOut() (msgs []protobase.EDProtocol)

GetAllOut returns all of available outgoing packets of a given client.

func (*MessageBox) GetAllOutStr

func (mb *MessageBox) GetAllOutStr() (msgs []string)

func (*MessageBox) GetIDStoreI

func (mb *MessageBox) GetIDStoreI() (idstore protobase.MSGIDInterface)

func (*MessageBox) GetIDStoreO

func (mb *MessageBox) GetIDStoreO() (idstore protobase.MSGIDInterface)

func (*MessageBox) GetInbound

func (mb *MessageBox) GetInbound(uid uuid.UUID) (protobase.EDProtocol, bool)

func (*MessageBox) GetOutbound

func (mb *MessageBox) GetOutbound(uid uuid.UUID) (protobase.EDProtocol, bool)

type MessageId

type MessageId struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MessageId is a struct which contains mapping of packet number to their unique identifier.

func NewMessageId

func NewMessageId() *MessageId

NewMessageId returns a pointer to a new `MessageId` struct.

func (*MessageId) FreeId

func (m *MessageId) FreeId(id uint16)

FreeId removes a `id` from internal mapping. Note that it has no effect on cursor ( cursor is not decremented ).

func (*MessageId) GetFNewID

func (m *MessageId) GetFNewID(uid uuid.UUID) (id uint16, cursor int)

GetFNewID finds an empty slot and returns a new `uint16` associated with that slot as well as a `cursor`. Cursor increments on each new association and restarts when maximum message length is reached.

func (*MessageId) GetNewID

func (m *MessageId) GetNewID(uid uuid.UUID) (id uint16)

GetNewId finds an empty slot and returns a new `uint16`.

func (*MessageId) GetUUID

func (m *MessageId) GetUUID(id uint16) (uuid.UUID, bool)

GetUUID finds the associated `uuid.UUID` for a given`id`. It

func (*MessageId) IsOccupied

func (m *MessageId) IsOccupied(id uint16) bool

IsOccupied returns a `bool` indicating wether a certain id is in use or not.

type MessageQueue

type MessageQueue struct {
	sync.RWMutex

	Ack  *MessageQueueAck
	Q    *containers.Queue
	Last interface{}
}

func NewMessageQueue

func NewMessageQueue() *MessageQueue

func (*MessageQueue) Get

func (mq *MessageQueue) Get() (item interface{})

func (*MessageQueue) Insert

func (mq *MessageQueue) Insert(aid int, item interface{})

func (*MessageQueue) InsertIn

func (mq *MessageQueue) InsertIn(aid int, item interface{})

func (*MessageQueue) InsertOut

func (mq *MessageQueue) InsertOut(aid int, item interface{})

func (*MessageQueue) ReleaseIn

func (mq *MessageQueue) ReleaseIn(aid int) (ok bool)

func (*MessageQueue) ReleaseOut

func (mq *MessageQueue) ReleaseOut(aid int) (ok bool)

type MessageQueueAck

type MessageQueueAck struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewMessageQueueAck

func NewMessageQueueAck() *MessageQueueAck

func (*MessageQueueAck) CreateInAck

func (mqa *MessageQueueAck) CreateInAck(aid int) (ok bool)

func (*MessageQueueAck) CreateOutAck

func (mqa *MessageQueueAck) CreateOutAck(aid int) (ok bool)

func (*MessageQueueAck) GetInAck

func (mqa *MessageQueueAck) GetInAck(aid int) (ack *sync.Cond)

func (*MessageQueueAck) GetOutAck

func (mqa *MessageQueueAck) GetOutAck(aid int) (ack *sync.Cond)

func (*MessageQueueAck) HasInAck

func (mqa *MessageQueueAck) HasInAck(aid int) bool

func (*MessageQueueAck) HasOutAck

func (mqa *MessageQueueAck) HasOutAck(aid int) bool

func (*MessageQueueAck) RemoveInAck

func (mqa *MessageQueueAck) RemoveInAck(aid int) (ok bool)

func (*MessageQueueAck) RemoveOutAck

func (mqa *MessageQueueAck) RemoveOutAck(aid int) (ok bool)

type MessageStore

type MessageStore struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MessageStore is a struct which acts as an entry into the persistent storage.

func NewInitedMessageStore

func NewInitedMessageStore() *MessageStore

NewInitedMessageStore returns a pointer to a new `MessageStore` and allocates memory. This is equivalent of calling `Init()` after `NewMessageStore`

func NewMessageStore

func NewMessageStore() *MessageStore

NewMessageStore returns a pointer to a new `MessageStore`.

func (*MessageStore) AddClient

func (self *MessageStore) AddClient(client string)

AddClient initializes and adds a entry for a given client.

func (*MessageStore) AddInbound

func (self *MessageStore) AddInbound(client string, msg protobase.EDProtocol) bool

AddInbound associates a client to a incoming packet. It checks if the given client is in the storage, otherwise returns `false`.

func (*MessageStore) AddOutbound

func (self *MessageStore) AddOutbound(client string, msg protobase.EDProtocol) bool

AddOutbound associates a client to a ougoing packet.

func (*MessageStore) Close

func (self *MessageStore) Close(client string) bool

Close removes the entry of the client from internal mappings. It returns `false` if client does not exist.

func (*MessageStore) DeleteIn

func (self *MessageStore) DeleteIn(client string, msg protobase.EDProtocol) bool

DeleteIn disassociates a client from a incoming packet.

func (*MessageStore) DeleteOut

func (self *MessageStore) DeleteOut(client string, msg protobase.EDProtocol) bool

DeleteOut disassociates a client from a outgoing packet.

func (*MessageStore) Exists

func (self *MessageStore) Exists(client string) (ok bool)

Exists returns a `bool` indicating whether a client is already registered or not.

func (*MessageStore) GetAllOut

func (self *MessageStore) GetAllOut(client string) (msgs []protobase.EDProtocol)

GetAllOut returns all of available outgoing packets of a given client.

func (*MessageStore) GetAllOutStr

func (self *MessageStore) GetAllOutStr(client string) (msgs []string)

GetAllOutStr returns string UUID repr of all available outgoing packets of a given client.

func (*MessageStore) GetIDStoreI

func (self *MessageStore) GetIDStoreI(client string) (idstore protobase.MSGIDInterface)

func (*MessageStore) GetIDStoreO

func (self *MessageStore) GetIDStoreO(client string) (idstore protobase.MSGIDInterface)

func (*MessageStore) GetInbound

func (self *MessageStore) GetInbound(client string, uid uuid.UUID) (protobase.EDProtocol, bool)

func (*MessageStore) GetOutbound

func (self *MessageStore) GetOutbound(client string, uid uuid.UUID) (protobase.EDProtocol, bool)

func (*MessageStore) Init

func (self *MessageStore) Init()

Init initializes all of internal struct members such as inbound and outbound mappings.

type MsgEntry

type MsgEntry struct {
	sync.Mutex
	// contains filtered or unexported fields
}

MsgEntry maps a client id to a protocol packet.

func (*MsgEntry) GenSeqID

func (self *MsgEntry) GenSeqID() int

GenSeqID creates and returns a sequence id used to preserve order.

type QueueBox

type QueueBox struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

type QueueBoxEntry

type QueueBoxEntry struct {
	// contains filtered or unexported fields
}

type QueueId

type QueueId struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewQueueId

func NewQueueId() *QueueId

NewQueueId returns a pointer to a new initialized and allocated `QueueId` struct.

func (*QueueId) FreeId

func (qi *QueueId) FreeId(id uint16)

FreeId removes a `id` from internal mapping. Note that it has no effect on cursor ( cursor is not decremented ).

func (*QueueId) GetFNewID

func (qi *QueueId) GetFNewID() (id uint16, cursor int)

GetFNewID finds an empty slot and returns a new `uint16` associated with that slot as well as a `cursor`. Cursor increments on each new association and restarts when maximum message length is reached.

func (*QueueId) GetNewID

func (qi *QueueId) GetNewID() (id uint16)

GetNewId finds an empty slot and returns a new `uint16`.

func (*QueueId) IsOccupied

func (qi *QueueId) IsOccupied(id uint16) bool

IsOccupied returns a `bool` indicating wether a certain id is in use or not.

type Retain

type Retain struct {
	// contains filtered or unexported fields
}

Retain is the container for retained messages.

func NewRetain

func NewRetain() *Retain

NewRetain allocates and initializes a new `Retain` struct and returns a pointer to it. It also allocates the internal `map`.

func (*Retain) Find

func (self *Retain) Find(topic []byte) (packet protobase.EDProtocol, err error)

Find finds a node associated with `topic` and returns its associated packet.

func (*Retain) Insert

func (self *Retain) Insert(topic []byte, packet protobase.EDProtocol) (err error)

Insert inserts/replace a node and its associated `packet` at `topic` path.

func (*Retain) Remove

func (self *Retain) Remove(topic []byte) (err error)

Remove removes a node associated to `topic`.

type Subscribe

type Subscribe struct {
	// contains filtered or unexported fields
}

Subscribe is the second subscriptions struct.

func NewSub

func NewSub() *Subscribe

NewSub returns a pointer to a new `Subscribe` struct.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL