message

package
v0.0.0-...-996fa4a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2024 License: AGPL-3.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SubscriberDirect = SubscriberType(iota)
	SubscriberRemote
	SubscriberOffline
)

Subscriber types

View Source
const RetainedTTL = math.MaxUint32

RetainedTTL represents a TTL value to use for retained messages (max TTL).

Variables

View Source
var Query = Ssid{system, query}

Query represents a constant SSID for a query.

Functions

This section is empty.

Types

type Awaiter

type Awaiter interface {
	Gather(time.Duration) [][]byte
}

Awaiter represents an asynchronously awaiting response channel.

type Counter

type Counter struct {
	Ssid    Ssid
	Channel []byte
	Counter int
}

Counter represents a single subscription counter.

type Counters

type Counters struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Counters represents a subscription counting map.

func NewCounters

func NewCounters() *Counters

NewCounters creates a new container.

func (*Counters) All

func (s *Counters) All() []Counter

All returns all counters.

func (*Counters) Decrement

func (s *Counters) Decrement(ssid Ssid) (last bool)

Decrement decrements a subscription counter.

func (*Counters) Increment

func (s *Counters) Increment(ssid Ssid, channel []byte) (first bool)

Increment increments the subscription counter.

func (*Counters) IncrementOnce

func (s *Counters) IncrementOnce(ssid Ssid, channel []byte) (first bool)

IncrementOnce increments the subscription counter.

type Frame

type Frame []Message

Frame represents a message frame which is sent through the wire to the remote server and contains a set of messages.

func DecodeFrame

func DecodeFrame(buf []byte) (out Frame, err error)

DecodeFrame decodes the message frame from the decoder.

func NewFrame

func NewFrame(capacity int) Frame

NewFrame creates a new frame with the specified capacity

func (*Frame) Encode

func (f *Frame) Encode() []byte

Encode encodes the message frame

func (*Frame) Limit

func (f *Frame) Limit(n int)

Limit takes the last N elements, sorted by message time

func (Frame) Sort

func (f Frame) Sort()

Sort sorts the frame

func (Frame) Split

func (f Frame) Split(maxByteSize int) (head Frame, tail Frame)

Split splits the frame by a specified number of bytes into two slices.

type ID

type ID []byte

ID represents a message ID encoded at 128bit and lexigraphically sortable

func NewID

func NewID(ssid Ssid) ID

NewID creates a new message identifier for the current time.

func NewPrefix

func NewPrefix(ssid Ssid, from int64) ID

NewPrefix creates a new message identifier only containing the prefix.

func (ID) Contract

func (id ID) Contract() uint32

Contract retrieves the contract from the message ID.

func (ID) HasPrefix

func (id ID) HasPrefix(ssid Ssid, cutoff int64) bool

HasPrefix matches the prefix with the cutoff time.

func (ID) Match

func (id ID) Match(query Ssid, from, until int64) bool

Match matches the mesage ID with SSID and time bounds.

func (ID) SetTime

func (id ID) SetTime(t int64)

SetTime sets the time on the ID, useful for testing.

func (ID) Ssid

func (id ID) Ssid() Ssid

Ssid retrieves the SSID from the message ID.

func (ID) Time

func (id ID) Time() int64

Time gets the time of the key, adjusted.

type Message

type Message struct {
	ID      ID     `json:"id,omitempty"`   // The ID of the message
	Channel []byte `json:"chan,omitempty"` // The channel of the message
	Payload []byte `json:"data,omitempty"` // The payload of the message
	TTL     uint32 `json:"ttl,omitempty"`  // The time-to-live of the message
}

Message represents a message which has to be forwarded or stored.

func DecodeMessage

func DecodeMessage(buf []byte) (out Message, err error)

DecodeMessage decodes the message from the decoder.

func New

func New(ssid Ssid, channel, payload []byte) *Message

New creates a new message structure from the provided SSID, channel and payload.

func (*Message) Contract

func (m *Message) Contract() uint32

Contract retrieves the contract from the message ID.

func (*Message) Encode

func (m *Message) Encode() []byte

Encode encodes the message into a binary & compressed representation.

func (*Message) Expires

func (m *Message) Expires() time.Time

Expires calculates the expiration time.

func (*Message) GetBinaryCodec

func (m *Message) GetBinaryCodec() binary.Codec

GetBinaryCodec retrieves a custom binary codec.

func (*Message) Size

func (m *Message) Size() int64

Size returns the byte size of the message.

func (*Message) Ssid

func (m *Message) Ssid() Ssid

Ssid retrieves the SSID from the message ID.

func (*Message) Stored

func (m *Message) Stored() bool

Stored returns whether the message is or should be stored.

func (*Message) Time

func (m *Message) Time() int64

Time gets the time of the key, adjusted.

type Ssid

type Ssid []uint32

Ssid represents a subscription ID which contains a contract and a list of hashes for various parts of the channel.

func NewSsid

func NewSsid(contract uint32, query []uint32) Ssid

NewSsid creates a new SSID.

func NewSsidForPresence

func NewSsidForPresence(original Ssid) Ssid

NewSsidForPresence creates a new SSID for presence.

func NewSsidForShare

func NewSsidForShare(original Ssid) Ssid

NewSsidForShare creates a new SSID for shared subscriptions.

func (Ssid) Contract

func (s Ssid) Contract() uint32

Contract gets the contract part from SSID.

func (Ssid) Encode

func (s Ssid) Encode() string

Encode encodes the SSID to a binary format

func (Ssid) GetHashCode

func (s Ssid) GetHashCode() uint32

GetHashCode combines the SSID into a single hash.

type Subscriber

type Subscriber interface {
	ID() string
	Type() SubscriberType
	Send(*Message) error
}

Subscriber is a value associated with a subscription.

type SubscriberType

type SubscriberType uint8

SubscriberType represents a type of subscriber

type Subscribers

type Subscribers map[uint32]Subscriber

Subscribers represents a subscriber set which can contain only unique values.

func (*Subscribers) AddRange

func (s *Subscribers) AddRange(from Subscribers, filter func(s Subscriber) bool)

AddRange adds multiple subscribers from an existing list of subscribers, with filter applied.

func (*Subscribers) AddUnique

func (s *Subscribers) AddUnique(value Subscriber) bool

AddUnique adds a subscriber to the set.

func (*Subscribers) Contains

func (s *Subscribers) Contains(value Subscriber) (ok bool)

Contains checks whether a subscriber is in the set.

func (*Subscribers) Random

func (s *Subscribers) Random(rnd uint32) (v Subscriber)

Random picks a random subscriber from the map. The 'rnd' argument must be a 32-bit randomly generated unsigned integer in range of [0, math.MaxUint32).

func (*Subscribers) Remove

func (s *Subscribers) Remove(value Subscriber) bool

Remove removes a subscriber from the set.

func (*Subscribers) Reset

func (s *Subscribers) Reset()

Reset recycles the list of subscribers.

func (*Subscribers) Size

func (s *Subscribers) Size() int

Size returns the size of the subscriber list.

type Subscription

type Subscription struct {
	Ssid       Ssid       // Gets or sets the SSID (parsed channel) for this subscription.
	Subscriber Subscriber // Gets or sets the subscriber for this subscription.
}

Subscription represents a topic subscription.

type Trie

type Trie struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Trie represents an efficient collection of subscriptions with lookup capability.

func NewTrie

func NewTrie() *Trie

NewTrie creates a new subscriptions matcher using standard emitter strategy.

func NewTrieMQTT

func NewTrieMQTT() *Trie

NewTrieMQTT creates a new subscriptions matcher using standard MQTT strategy.

func (*Trie) Count

func (t *Trie) Count() int

Count returns the number of subscriptions.

func (*Trie) Lookup

func (t *Trie) Lookup(ssid Ssid, filter func(s Subscriber) bool) (subs Subscribers)

Lookup returns the Subscribers for the given topic.

func (*Trie) Subscribe

func (t *Trie) Subscribe(ssid Ssid, sub Subscriber) *Subscription

Subscribe adds the Subscriber to the topic and returns a Subscription.

func (*Trie) Unsubscribe

func (t *Trie) Unsubscribe(ssid Ssid, subscriber Subscriber)

Unsubscribe removes the Subscription.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL