realtime

package
v1.0.0-beta.122 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2023 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PresenceChannelData = "presence"
	MessageChannelData  = "message"
)

Variables

This section is empty.

Functions

func DecodeEvent

func DecodeEvent(encodingType internal.UserDataEncType, eventType api.EventType, message []byte) (proto.Message, error)

func DecodeRealtime

func DecodeRealtime(encodingType internal.UserDataEncType, message []byte) (*api.RealTimeMessage, error)

func EncodeAsJSON

func EncodeAsJSON(event proto.Message) ([]byte, error)

func EncodeAsMsgPack

func EncodeAsMsgPack(data any) ([]byte, error)

func EncodeEvent

func EncodeEvent(encodingType internal.UserDataEncType, event proto.Message) ([]byte, error)

func EncodeEventAsMsgPack

func EncodeEventAsMsgPack(event proto.Message) ([]byte, error)

func EncodeRealtime

func EncodeRealtime(encodingType internal.UserDataEncType, msg *api.RealTimeMessage) ([]byte, error)

func EncodeStreamMD

func EncodeStreamMD(md *StreamMessageMD) ([]byte, error)

func JsonByteToMsgPack

func JsonByteToMsgPack(data []byte) ([]byte, error)

func NewEventDataFromMessage

func NewEventDataFromMessage(encType internal.UserDataEncType, clientId string, socketId string, eventName string, msg *api.Message) (*internal.StreamData, error)

func NewMessageData

func NewMessageData(encType internal.UserDataEncType, clientId string, socketId string, eventName string, msg *api.MessageEvent) (*internal.StreamData, error)

func NewPresenceData

func NewPresenceData(encType internal.UserDataEncType, clientId string, socketId string, eventName string, msg *api.MessageEvent) (*internal.StreamData, error)

func SanitizeUserData

func SanitizeUserData(toEnc internal.UserDataEncType, data *internal.StreamData) ([]byte, error)

SanitizeUserData is an optimization so that we can store received raw data and return as-is in case encoding that is used during writing is same as during reading. In case encoding changed then this method is doing a conversion. A typical case of needing this conversion is when the message is published through websocket and encoding used was msgpack and then these messages were read back using HTTP APIs in that case we need to convert from msgpack to JSON.

func SendReply

func SendReply(conn *websocket.Conn, encType internal.UserDataEncType, eventType api.EventType, event proto.Message) error

Types

type Channel

type Channel struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewChannel

func NewChannel(encName string, stream cache.Stream) *Channel

func (*Channel) Close

func (ch *Channel) Close(ctx context.Context)

func (*Channel) DisconnectWatcher

func (ch *Channel) DisconnectWatcher(watcher string)

DisconnectWatcher ToDo: call it during leave.

func (*Channel) GetWatcher

func (ch *Channel) GetWatcher(ctx context.Context, watcherName string, resume string) (*ChannelWatcher, error)

func (*Channel) ListWatchers

func (ch *Channel) ListWatchers() []string

func (*Channel) Name

func (ch *Channel) Name() string

func (*Channel) PublishMessage

func (ch *Channel) PublishMessage(ctx context.Context, data *internal.StreamData) (string, error)

func (*Channel) PublishPresence

func (ch *Channel) PublishPresence(ctx context.Context, data *internal.StreamData) (string, error)

func (*Channel) Read

func (ch *Channel) Read(ctx context.Context, pos string) (*cache.StreamMessages, bool, error)

func (*Channel) StopWatcher

func (ch *Channel) StopWatcher(watcher string)

type ChannelFactory

type ChannelFactory struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewChannelFactory

func NewChannelFactory(cache cache.Cache, encoder metadata.CacheEncoder, heartbeatF *HeartbeatFactory) *ChannelFactory

func (*ChannelFactory) CreateChannel

func (factory *ChannelFactory) CreateChannel(ctx context.Context, tenantId uint32, projId uint32, channelName string) (*Channel, error)

CreateChannel will throw an error if stream already exists. Use CreateOrGet to create if not exists primitive.

func (*ChannelFactory) DeleteChannel

func (factory *ChannelFactory) DeleteChannel(ctx context.Context, ch *Channel)

func (*ChannelFactory) GetChannel

func (factory *ChannelFactory) GetChannel(ctx context.Context, tenantId uint32, projId uint32, channelName string) (*Channel, error)

func (*ChannelFactory) GetOrCreateChannel

func (factory *ChannelFactory) GetOrCreateChannel(ctx context.Context, tenantId uint32, projId uint32, channelName string) (*Channel, error)

func (*ChannelFactory) ListChannels

func (factory *ChannelFactory) ListChannels(ctx context.Context, tenantId uint32, projId uint32, prefix string) ([]string, error)

type ChannelRunner

type ChannelRunner struct {
	// contains filtered or unexported fields
}

func (*ChannelRunner) Run

func (runner *ChannelRunner) Run(ctx context.Context, tenant *metadata.Tenant) (Response, error)

func (*ChannelRunner) SetChannelReq

func (runner *ChannelRunner) SetChannelReq(req *api.GetRTChannelRequest)

func (*ChannelRunner) SetChannelsReq

func (runner *ChannelRunner) SetChannelsReq(req *api.GetRTChannelsRequest)

func (*ChannelRunner) SetListSubscriptionsReq

func (runner *ChannelRunner) SetListSubscriptionsReq(req *api.ListSubscriptionRequest)

type ChannelWatcher

type ChannelWatcher struct {
	// contains filtered or unexported fields
}

ChannelWatcher is to watch events for a single channel. It accepts a watch that will be notified when a new event is read from the stream. As ChannelWatcher is mapped to a consumer group on a stream therefore the state is restored from the cache during restart which means a watcher is only created if it doesn’t exist otherwise the existing one is returned.

func CreateAndRegisterWatcher

func CreateAndRegisterWatcher(ctx context.Context, name string, pos string, stream cache.Stream) (*ChannelWatcher, error)

func CreateWatcher

func CreateWatcher(ctx context.Context, name string, pos string, existingPos string, stream cache.Stream) (*ChannelWatcher, error)

func (*ChannelWatcher) Disconnect

func (watcher *ChannelWatcher) Disconnect()

func (*ChannelWatcher) StartWatching

func (watcher *ChannelWatcher) StartWatching(watch Watch)

func (*ChannelWatcher) Stop

func (watcher *ChannelWatcher) Stop()

type ConnectionParams

type ConnectionParams struct {
	ProjectName string
	SessionId   string
	Position    string
	Encoding    string
}

ConnectionParams do we need serial here as well?

func (ConnectionParams) ToEncodingType

func (params ConnectionParams) ToEncodingType() internal.UserDataEncType

type DevicePusher

type DevicePusher struct {
	// contains filtered or unexported fields
}

func NewDevicePusher

func NewDevicePusher(session *Session, channel string) *DevicePusher

func (*DevicePusher) Watch

func (pusher *DevicePusher) Watch(events *cache.StreamMessages, err error) ([]string, error)

type HeartbeatFactory

type HeartbeatFactory struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewHeartbeatFactory

func NewHeartbeatFactory(cache cache.Cache, encoder metadata.CacheEncoder) *HeartbeatFactory

func (*HeartbeatFactory) GetHeartbeatTable

func (factory *HeartbeatFactory) GetHeartbeatTable(tenantId uint32, projectId uint32) *HeartbeatTable

type HeartbeatTable

type HeartbeatTable struct {
	// contains filtered or unexported fields
}

func NewHeartbeat

func NewHeartbeat(cache cache.Cache, tableName string) *HeartbeatTable

func (*HeartbeatTable) GroupsExpired

func (h *HeartbeatTable) GroupsExpired(groupsName []string) bool

func (*HeartbeatTable) Ping

func (h *HeartbeatTable) Ping(sessionId string) error

type MessagesRunner

type MessagesRunner struct {
	// contains filtered or unexported fields
}

MessagesRunner is to publish messages to a channel.

func (*MessagesRunner) Run

func (runner *MessagesRunner) Run(ctx context.Context, tenant *metadata.Tenant) (Response, error)

type RTMRunner

type RTMRunner interface {
	Run(ctx context.Context, tenant *metadata.Tenant) (Response, error)
}

RTMRunner is used to run the realtime HTTP APIs related to channel like accessing a channel, subscribing to a channel, etc.

type RTMRunnerFactory

type RTMRunnerFactory struct {
	// contains filtered or unexported fields
}

func NewRTMRunnerFactory

func NewRTMRunnerFactory(cache cache.Cache, factory *ChannelFactory) *RTMRunnerFactory

NewRTMRunnerFactory returns RTMRunnerFactory object.

func (*RTMRunnerFactory) GetChannelRunner

func (f *RTMRunnerFactory) GetChannelRunner() *ChannelRunner

func (*RTMRunnerFactory) GetMessagesRunner

func (f *RTMRunnerFactory) GetMessagesRunner(r *api.MessagesRequest) *MessagesRunner

func (*RTMRunnerFactory) GetReadMessagesRunner

func (f *RTMRunnerFactory) GetReadMessagesRunner(r *api.ReadMessagesRequest, streaming Streaming) *ReadMessagesRunner

type ReadMessagesRunner

type ReadMessagesRunner struct {
	// contains filtered or unexported fields
}

func (*ReadMessagesRunner) Run

func (runner *ReadMessagesRunner) Run(ctx context.Context, tenant *metadata.Tenant) (Response, error)

type Response

type Response struct {
	api.Response
}

type Session

type Session struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*Session) Close

func (session *Session) Close() error

func (*Session) IsActive

func (session *Session) IsActive() bool

func (*Session) OnClose

func (session *Session) OnClose(code int, _ string) error

func (*Session) OnPing

func (session *Session) OnPing(_ string) error

func (*Session) OnPong

func (session *Session) OnPong(_ string) error

func (*Session) SendConnSuccess

func (session *Session) SendConnSuccess() error

func (*Session) Start

func (session *Session) Start(ctx context.Context) error

Start an entry point for handling all the events from a device.

type Sessions

type Sessions struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewSessionMgr

func NewSessionMgr(cache cache.Cache, tenantMgr *metadata.TenantManager, txMgr *transaction.Manager, heartbeatF *HeartbeatFactory, factory *ChannelFactory) *Sessions

func (*Sessions) AddDevice

func (s *Sessions) AddDevice(ctx context.Context, conn *websocket.Conn, params ConnectionParams) (*Session, error)

func (*Sessions) CreateDeviceSession

func (s *Sessions) CreateDeviceSession(ctx context.Context, conn *websocket.Conn, params ConnectionParams) (*Session, error)

func (*Sessions) ExecuteRunner

func (s *Sessions) ExecuteRunner(ctx context.Context, runner RTMRunner) (Response, error)

func (*Sessions) RemoveDevice

func (s *Sessions) RemoveDevice(_ context.Context, session *Session)

func (*Sessions) TrackSessions

func (s *Sessions) TrackSessions()

type StreamMessageMD

type StreamMessageMD struct {
	ClientId string
	SocketId string
	// DataType is the type of data it is storing for example "presence" or "message" data
	DataType string
	// EventName is the named identifier of this message like in case of presence "enter"/"left", etc
	EventName string
}

func DecodeStreamMD

func DecodeStreamMD(enc []byte) (*StreamMessageMD, error)

func NewStreamMessageMD

func NewStreamMessageMD(dataType string, clientId string, socketId string, eventName string) *StreamMessageMD

type Streaming

type Streaming interface {
	api.Realtime_ReadMessagesServer
}

type Watch

type Watch func(*cache.StreamMessages, error) ([]string, error)

Watch is called when an event is received by ChannelWatcher.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL