sessions

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2021 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const MQ_TAG_CLU = message.RESERVED2

Variables

View Source
var (
	ErrSessionsProviderNotFound = errors.New("Session: Session provider not found")
	ErrKeyNotAvailable          = errors.New("Session: not item found for key.")
)
View Source
var Default = "default"

Register makes a session provider available by the provided name. If a Register is called twice with the same name or if the driver is nil, it panics.

Functions

func NewMemProvider

func NewMemProvider() *memProvider

func Register

func Register(name string, provider SessionsProvider)

func SessionInit

func SessionInit(session string)

func Unregister

func Unregister(name string)

Types

type Ackqueue

type Ackqueue struct {
	// contains filtered or unexported fields
}

Ackqueue is a growing queue implemented based on a ring buffer. As the buffer gets full, it will auto-grow.

Ackqueue is used to store messages that are waiting for acks to come back. There are a few scenarios in which acks are required.

  1. Client sends SUBSCRIBE message to server, waits for SUBACK.
  2. Client sends UNSUBSCRIBE message to server, waits for UNSUBACK.
  3. Client sends PUBLISH QoS 1 message to server, waits for PUBACK.
  4. Server sends PUBLISH QoS 1 message to client, waits for PUBACK.
  5. Client sends PUBLISH QoS 2 message to server, waits for PUBREC.
  6. Server sends PUBREC message to client, waits for PUBREL.
  7. Client sends PUBREL message to server, waits for PUBCOMP.
  8. Server sends PUBLISH QoS 2 message to client, waits for PUBREC.
  9. Client sends PUBREC message to server, waits for PUBREL.
  10. Server sends PUBREL message to client, waits for PUBCOMP.
  11. Client sends PINGREQ message to server, waits for PINGRESP.

Ackqueue是一个正在增长的队列,它是在一个环形缓冲区的基础上实现的。 作为缓冲 如果满了,它会自动增长。

Ackqueue用于存储正在等待ack返回的消息。 在那里 是几个需要ack的场景。 1。 客户端发送订阅消息到服务器,等待SUBACK。 2。 客户端发送取消订阅消息到服务器,等待UNSUBACK。 3。 客户端向服务器发送PUBLISH QoS 1消息,等待PUBACK。 4。 服务器向客户端发送PUBLISH QoS 1消息,等待PUBACK。 5。 客户端向服务器发送PUBLISH QoS 2消息,等待PUBREC。 6。 服务器向客户端发送PUBREC消息,等待PUBREL。 7。 客户端发送PUBREL消息到服务器,等待PUBCOMP。 8。 服务器向客户端发送PUBLISH QoS 2消息,等待PUBREC。 9。 客户端发送PUBREC消息到服务器,等待PUBREL。 10。 服务器向客户端发送PUBREL消息,等待PUBCOMP。 11。 客户端发送PINGREQ消息到服务器,等待PINGRESP。

func (*Ackqueue) Ack

func (this *Ackqueue) Ack(msg message.Message) error

Ack() takes the ack message supplied and updates the status of messages waiting. Ack()获取提供的Ack消息并更新消息等待的状态。

func (*Ackqueue) Acked

func (this *Ackqueue) Acked() []ackmsg

Acked() returns the list of messages that have completed the ack cycle. 返回已完成ack循环的消息列表。

func (*Ackqueue) Acked02

func (this *Ackqueue) Acked02() []ackmsg

集群发来的消息处理

func (*Ackqueue) SetCluserTag

func (this *Ackqueue) SetCluserTag(pktid uint16) bool

func (*Ackqueue) Wait

func (this *Ackqueue) Wait(msg message.Message, onComplete interface{}) error

Wait() copies the message into a waiting queue, and waits for the corresponding ack message to be received. Wait()将消息复制到一个等待队列中,并等待相应的消息 ack消息被接收。

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(providerName string) (*Manager, error)

func (*Manager) Close

func (this *Manager) Close() error

func (*Manager) Count

func (this *Manager) Count() int

func (*Manager) Del

func (this *Manager) Del(id string)

func (*Manager) Get

func (this *Manager) Get(id string) (*Session, error)

func (*Manager) New

func (this *Manager) New(id string) (*Session, error)

func (*Manager) Save

func (this *Manager) Save(id string) error

type Session

type Session struct {
	// Ack queue for outgoing PUBLISH QoS 1 messages
	//用于传出发布QoS 1消息的Ack队列
	Pub1ack *Ackqueue

	// Ack queue for incoming PUBLISH QoS 2 messages
	//传入发布QoS 2消息的Ack队列
	Pub2in *Ackqueue

	// Ack queue for outgoing PUBLISH QoS 2 messages
	//用于传出发布QoS 2消息的Ack队列
	Pub2out *Ackqueue

	// Ack queue for outgoing SUBSCRIBE messages
	//用于发送订阅消息的Ack队列
	Suback *Ackqueue

	// Ack queue for outgoing UNSUBSCRIBE messages
	//发送取消订阅消息的Ack队列
	Unsuback *Ackqueue

	// Ack queue for outgoing PINGREQ messages
	//用于发送PINGREQ消息的Ack队列
	Pingack *Ackqueue

	// cmsg is the CONNECT message
	//cmsg是连接消息
	Cmsg *message.ConnectMessage

	// Will message to publish if connect is closed unexpectedly
	//如果连接意外关闭,遗嘱消息将发布
	Will *message.PublishMessage

	// Retained publish message
	//保留发布消息
	Retained *message.PublishMessage
	// contains filtered or unexported fields
}

客户端会话

func (*Session) AddTopic

func (this *Session) AddTopic(topic string, qos byte) error

func (*Session) ID

func (this *Session) ID() string

func (*Session) IDs

func (this *Session) IDs() []byte

func (*Session) Init

func (this *Session) Init(msg *message.ConnectMessage) error

func (*Session) RemoveTopic

func (this *Session) RemoveTopic(topic string) error

func (*Session) RetainMessage

func (this *Session) RetainMessage(msg *message.PublishMessage) error

func (*Session) Topics

func (this *Session) Topics() ([]string, []byte, error)

func (*Session) Update

func (this *Session) Update(msg *message.ConnectMessage) error

type SessionsProvider

type SessionsProvider interface {
	New(id string) (*Session, error)
	Get(id string) (*Session, error)
	Del(id string)
	Save(id string) error
	Count() int
	Close() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL