core

package
v0.0.0-...-bc28a72 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2021 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

This package hold all the delivery logic, connection tracking and pub sub

Index

Constants

View Source
const (
	CacheTimeout = 5 * time.Second
)

Variables

View Source
var (
	ErrInvalidLengthChannels        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowChannels          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupChannels = fmt.Errorf("proto: unexpected end of group")
)
View Source
var CacheLimit = 70 // Amount of insert before batching
View Source
var CacheQueueSize int64 = 50

CacheQueueSize - How much the cache of a channel queue can grow, the bigger the less database request we make

View Source
var NewEvent_NewEventType_name = map[int32]string{
	0: "JOIN_CHANNEL",
	1: "LEAVE_CHANNEL",
	2: "NEW_CHANNEL",
	3: "REMOVE_CHANNEL",
	4: "SUBSCRIBE",
	5: "PUBLISH",
	6: "ACK",
	7: "ONLINE_STATUS",
	8: "INITIAL_ONLINE_STATUS",
}
View Source
var NewEvent_NewEventType_value = map[string]int32{
	"JOIN_CHANNEL":          0,
	"LEAVE_CHANNEL":         1,
	"NEW_CHANNEL":           2,
	"REMOVE_CHANNEL":        3,
	"SUBSCRIBE":             4,
	"PUBLISH":               5,
	"ACK":                   6,
	"ONLINE_STATUS":         7,
	"INITIAL_ONLINE_STATUS": 8,
}

Functions

func ChannelExists

func ChannelExists(appID string, channelID string) bool

ChannelExists - Check if channel exists in cache or database

func CreateApp

func CreateApp(context *gin.Context)

CreateApp - Create a new app POST /app

func CreateApplication

func CreateApplication(appID, name string) error

func CreateChannel

func CreateChannel(appID string, channel *Channel) (bool, error)

CreateChannel - Validates input an tries to create a channel

func CreateChannelHandler

func CreateChannelHandler(context *gin.Context)

CreateChannelHandler - Create channel with given info POST /channel

func CreateClient

func CreateClient(appID string, clientID string, username string, extra string) (bool, error)

CreateClient - Create client and also cache it

func CreateClientHandler

func CreateClientHandler(context *gin.Context)

CreateClientHandler - Create new client POST /client

func CreateDevice

func CreateDevice(context *gin.Context)

CreateDevice - Add new device for notifications POST /device

func DeleteApp

func DeleteApp(context *gin.Context)

DeleteApp - Delete a app DELETE /app/:appID

func DeleteApplication

func DeleteApplication(appID string) error

func DeleteChannel

func DeleteChannel(appID string, channelID string) (bool, error)

DeleteChannel - Delete channel from database and cache, and notify all connected clients

func DeleteChannelHandler

func DeleteChannelHandler(context *gin.Context)

DeleteChannelHandler - Delete channel DELETE /channel/:channelID

func DeleteClient

func DeleteClient(appID string, clientID string) (bool, error)

DeleteClient - Remove client from database and cache

func DeleteClientHandler

func DeleteClientHandler(context *gin.Context)

DeleteClientHandler - Delete a client DELETE /client/:clientID

func GetApps

func GetApps(context *gin.Context)

GetApps - Get all apps Get /app

func GetClientHandler

func GetClientHandler(context *gin.Context)

GetClientHandler - Get client info GET /client/:clientID

func GetClients

func GetClients(context *gin.Context)

GetClients - Get app or all the clients GET /client

func GetLastMessages

func GetLastMessages(context *gin.Context)

GetLastMessages - Fetch last messages GET /channel/:channelID/last/:amount

func GetLastMessagesBeforeTimeStamp

func GetLastMessagesBeforeTimeStamp(context *gin.Context)

GetLastMessagesSinceTimeStamp - Fetch last messages after a timestamp GET /channel/:channelID/sync/:lastTimeStamp/before/:amount

func GetLastMessagesSinceTimeStamp

func GetLastMessagesSinceTimeStamp(context *gin.Context)

GetLastMessagesSinceTimeStamp - Fetch last messages after a timestamp GET /channel/:channelID/sync/:lastTimeStamp/last/:amount

func GetMessagesBetweenTimeStamps

func GetMessagesBetweenTimeStamps(context *gin.Context)

GetMessagesBetweenTimeStamps - Fetch messages between timestamps GET /channel/:channelID/sync/:firstTimeStamp/to/:secondTimeStamp

func GetMessagesSinceTimeStamp

func GetMessagesSinceTimeStamp(context *gin.Context)

GetMessagesSinceTimeStamp - Fetch messages after timestamp GET /channel/:channelID/sync/:lastTimeStamp

func GetOpenChannels

func GetOpenChannels(context *gin.Context)

GetOpenChannels - Get all public channels GET /channel/open

func GetPrivateChannels

func GetPrivateChannels(context *gin.Context)

GetPrivateChannels - Get all private channels GET /channel/private

func InitEngine

func InitEngine(config EngineConfig)

func JoinChannel

func JoinChannel(appID string, channelID string, clientID string) (bool, error)

JoinChannel - Join client to a given channel, and update cache and current connected and affected clients

func LeaveChannel

func LeaveChannel(appID string, channelID string, clientID string) (bool, error)

LeaveChannel - Remove client from a given channel, and update cache

func PostCloseChannel

func PostCloseChannel(context *gin.Context)

PostCloseChannel - Close channel POST /channel/:channelID/close

func PostEventHandler

func PostEventHandler(context *gin.Context)

PostEventHandler - Publish event into channel /channel/:channelID/publish

func PostJoinChannel

func PostJoinChannel(context *gin.Context)

PostJoinChannel - Join user to a channel POST /channel/:channelID/join/:clientID

func PostLeaveChannel

func PostLeaveChannel(context *gin.Context)

PostLeaveChannel - Remove user from channel POST /channel/:channelID/leave/:clientID

func PostOpenChannel

func PostOpenChannel(context *gin.Context)

PostOpenChannel - Open channel POST /channel/:channelID/open

func RemoveDevice

func RemoveDevice(context *gin.Context)

RemoveDevice - Remove device for notifications DELETE /device/:deviceID

func RemoveIndex

func RemoveIndex(s []string, index int) []string

RemoveIndex - Helper to remove index from slice

func SendPushNotification

func SendPushNotification(appID string, channelEvent *ChannelEvent) bool

func SetChannelCloseStatus

func SetChannelCloseStatus(appID string, channelID string, closed bool) (bool, error)

SetChannelCloseStatus - Set channel close status

func SetUpLogger

func SetUpLogger()

func UpdateApp

func UpdateApp(context *gin.Context)

UpdateApp - Update app PUT /app/:appID

func UpdateApplication

func UpdateApplication(appID, name string) error

func UpdateClient

func UpdateClient(appID string, clientID string, username string, extra string) (bool, error)

UpdateClient - Update client on the database and cache

func UpdateClientHandler

func UpdateClientHandler(context *gin.Context)

UpdateClientHandler - Update a client PUT /client/:clientID

Types

type App

type App struct {
	AppID string
	Name  string
}

App - Database representation of a App

func GetApplication

func GetApplication(appID string) (*App, error)

func GetApplications

func GetApplications() ([]*App, error)

type AppRepository

type AppRepository interface {
	CreateApp(id string, name string) error
	DeleteApp(id string) error
	GetApps() ([]*App, error)
	GetApp(id string) (*App, error)
	UpdateApp(id string, name string) error
}

AppRepository - Repository for handling App table

type AuthHook

type AuthHook interface {
	Authenticate(token, appID, deviceID string, request *http.Request) *auth.Identity
}

type CacheStorage

type CacheStorage interface {
	// Device
	CheckDeviceExistence(clientID string, id string) bool
	GetClientDevices(clientID string) []*Device
	RemoveDevice(clientID string, id string)
	AddDevice(clientID string, device *Device)
	// Client
	StoreClient(appID string, clientID string, client *Client)
	CheckClientExistence(appID string, clientID string) bool
	GetClient(appID string, clientID string) *Client
	RemoveClient(appID string, clientID string)
	// App
	StoreApp(appID string, name string)
	GetApp(appID string) *App
	RemoveApp(appID string)
	// Channel
	StoreChannel(appID string, channelID string, channel *Channel)
	GetChannel(appID string, channelID string) *Channel
	CheckChannelExistence(appID string, channelID string) bool
	RemoveChannel(appID string, channelID string)
	// Client -> Channel
	RemoveClientChannels(clientID string)
	AddClientChannels(clientID string, channelIDs []string)
	AddClientChannel(clientID string, channelID string)
	GetClientChannels(clientID string) ([]string, bool)
	RemoveClientChannel(clientID string, channelID string)
	// Channel Event
	StoreChannelEvent(channelID string, appID string, event *ChannelEvent)
	GetOldestChannelEvent(channelID string, appID string) *ChannelEvent
	GetChannelEventsSize(channelID string, appID string) uint64
	GetChannelEvents(channelID string, appID string, amount int64) []*ChannelEvent
}

CacheStorage - Cache for avoiding fetching data from database

type Channel

type Channel struct {
	ID         string `json:"id"`
	AppID      string `json:"appID"`
	Name       string `json:"name"`
	CreatedAt  int64  `json:"createdAt"`
	IsClosed   bool   `json:"isClosed"`
	Extra      string `json:"extra"`
	Persistent bool   `json:"isPersistent"`
	Private    bool   `json:"isPrivate"`
	Presence   bool   `json:"isPresence"`
	Push       bool   `json:"isPush"`
}

Channel - Database representation of a channel

func GetChannel

func GetChannel(appID string, channelID string) (*Channel, error)

GetChannel - Get channel first from cache, then retry on database and update cache

type ChannelEvent

type ChannelEvent struct {
	SenderID             string   `protobuf:"bytes,1,opt,name=senderID,proto3" json:"senderID,omitempty"`
	EventType            string   `protobuf:"bytes,2,opt,name=eventType,proto3" json:"eventType,omitempty"`
	Payload              string   `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"`
	ChannelID            string   `protobuf:"bytes,4,opt,name=channelID,proto3" json:"channelID,omitempty"`
	Timestamp            int64    `protobuf:"varint,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*ChannelEvent) Descriptor

func (*ChannelEvent) Descriptor() ([]byte, []int)

func (*ChannelEvent) GetChannelID

func (m *ChannelEvent) GetChannelID() string

func (*ChannelEvent) GetEventType

func (m *ChannelEvent) GetEventType() string

func (*ChannelEvent) GetPayload

func (m *ChannelEvent) GetPayload() string

func (*ChannelEvent) GetSenderID

func (m *ChannelEvent) GetSenderID() string

func (*ChannelEvent) GetTimestamp

func (m *ChannelEvent) GetTimestamp() int64

func (*ChannelEvent) Marshal

func (m *ChannelEvent) Marshal() (dAtA []byte, err error)

func (*ChannelEvent) MarshalTo

func (m *ChannelEvent) MarshalTo(dAtA []byte) (int, error)

func (*ChannelEvent) MarshalToSizedBuffer

func (m *ChannelEvent) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ChannelEvent) ProtoMessage

func (*ChannelEvent) ProtoMessage()

func (*ChannelEvent) Reset

func (m *ChannelEvent) Reset()

func (*ChannelEvent) Size

func (m *ChannelEvent) Size() (n int)

func (*ChannelEvent) String

func (m *ChannelEvent) String() string

func (*ChannelEvent) Unmarshal

func (m *ChannelEvent) Unmarshal(dAtA []byte) error

func (*ChannelEvent) XXX_DiscardUnknown

func (m *ChannelEvent) XXX_DiscardUnknown()

func (*ChannelEvent) XXX_Marshal

func (m *ChannelEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ChannelEvent) XXX_Merge

func (m *ChannelEvent) XXX_Merge(src proto.Message)

func (*ChannelEvent) XXX_Size

func (m *ChannelEvent) XXX_Size() int

func (*ChannelEvent) XXX_Unmarshal

func (m *ChannelEvent) XXX_Unmarshal(b []byte) error

type ChannelRepository

type ChannelRepository interface {
	CreateChannel(id string, appID string, name string, createdAt int64, isClosed bool, extra string, persistent bool, private bool, presence bool, push bool) error

	GetChannelClients(appID string, channelID string) ([]string, error)
	DeleteChannel(appID string, id string) error
	DeleteAppChannels(appID string) error

	JoinClient(appID string, channelID string, clientID string) error
	LeaveClient(appID string, channelID string, clientID string) error

	SetChannelCloseStatus(appID string, channelID string, isClosed bool) error

	GetClientAllowedChannels(clientID string) ([]string, error)
	GetClientPrivateChannels(clientID string) ([]*Channel, error)
	GetClientPublicChannels(clientID string) ([]*Channel, error)

	GetAppPrivateChannels(appID string) ([]*Channel, error)
	GetAppPublicChannels(appID string) ([]*Channel, error)

	ExistsAppChannel(appID string, channelID string) (bool, error)
	GetAppChannel(appID string, channelID string) (*Channel, error)

	AddChannelEvent(appID string, channelID string, event *ChannelEvent) error
	AddChannelEvents(items []InsertItem) error

	GetChannelEventsAfter(appID string, channelID string, timestamp int64) ([]*ChannelEvent, error)
	GetChannelEventsAfterAndBefore(appID string, channelID string, timestampAfter int64, timestampBefore int64) ([]*ChannelEvent, error)
	GetChannelLastEvents(appID string, channelID string, amount int64) ([]*ChannelEvent, error)
	GetChannelLastEventsAfter(appID string, channelID string, amount int64, timestamp int64) ([]*ChannelEvent, error)
	GetChannelLastEventsBefore(appID string, channelID string, amount int64, timestamp int64) ([]*ChannelEvent, error)
}

ChannelRepository - Repository for handling Channel, Channel_Event and Channel_Client tables

type Client

type Client struct {
	ID       string
	Username string
	AppID    string
	Extra    string
}

Client - Database representation of a Client

func GetClient

func GetClient(appID string, clientID string) (*Client, error)

GetClient - Get client from cache first, then try database and update cache if found

type ClientJoin

type ClientJoin struct {
	ChannelID            string   `protobuf:"bytes,1,opt,name=channelID,proto3" json:"channelID,omitempty"`
	ClientID             string   `protobuf:"bytes,2,opt,name=clientID,proto3" json:"clientID,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*ClientJoin) Descriptor

func (*ClientJoin) Descriptor() ([]byte, []int)

func (*ClientJoin) GetChannelID

func (m *ClientJoin) GetChannelID() string

func (*ClientJoin) GetClientID

func (m *ClientJoin) GetClientID() string

func (*ClientJoin) Marshal

func (m *ClientJoin) Marshal() (dAtA []byte, err error)

func (*ClientJoin) MarshalTo

func (m *ClientJoin) MarshalTo(dAtA []byte) (int, error)

func (*ClientJoin) MarshalToSizedBuffer

func (m *ClientJoin) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ClientJoin) ProtoMessage

func (*ClientJoin) ProtoMessage()

func (*ClientJoin) Reset

func (m *ClientJoin) Reset()

func (*ClientJoin) Size

func (m *ClientJoin) Size() (n int)

func (*ClientJoin) String

func (m *ClientJoin) String() string

func (*ClientJoin) Unmarshal

func (m *ClientJoin) Unmarshal(dAtA []byte) error

func (*ClientJoin) XXX_DiscardUnknown

func (m *ClientJoin) XXX_DiscardUnknown()

func (*ClientJoin) XXX_Marshal

func (m *ClientJoin) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ClientJoin) XXX_Merge

func (m *ClientJoin) XXX_Merge(src proto.Message)

func (*ClientJoin) XXX_Size

func (m *ClientJoin) XXX_Size() int

func (*ClientJoin) XXX_Unmarshal

func (m *ClientJoin) XXX_Unmarshal(b []byte) error

type ClientLeave

type ClientLeave struct {
	ChannelID            string   `protobuf:"bytes,1,opt,name=channelID,proto3" json:"channelID,omitempty"`
	ClientID             string   `protobuf:"bytes,2,opt,name=clientID,proto3" json:"clientID,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*ClientLeave) Descriptor

func (*ClientLeave) Descriptor() ([]byte, []int)

func (*ClientLeave) GetChannelID

func (m *ClientLeave) GetChannelID() string

func (*ClientLeave) GetClientID

func (m *ClientLeave) GetClientID() string

func (*ClientLeave) Marshal

func (m *ClientLeave) Marshal() (dAtA []byte, err error)

func (*ClientLeave) MarshalTo

func (m *ClientLeave) MarshalTo(dAtA []byte) (int, error)

func (*ClientLeave) MarshalToSizedBuffer

func (m *ClientLeave) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ClientLeave) ProtoMessage

func (*ClientLeave) ProtoMessage()

func (*ClientLeave) Reset

func (m *ClientLeave) Reset()

func (*ClientLeave) Size

func (m *ClientLeave) Size() (n int)

func (*ClientLeave) String

func (m *ClientLeave) String() string

func (*ClientLeave) Unmarshal

func (m *ClientLeave) Unmarshal(dAtA []byte) error

func (*ClientLeave) XXX_DiscardUnknown

func (m *ClientLeave) XXX_DiscardUnknown()

func (*ClientLeave) XXX_Marshal

func (m *ClientLeave) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ClientLeave) XXX_Merge

func (m *ClientLeave) XXX_Merge(src proto.Message)

func (*ClientLeave) XXX_Size

func (m *ClientLeave) XXX_Size() int

func (*ClientLeave) XXX_Unmarshal

func (m *ClientLeave) XXX_Unmarshal(b []byte) error

type ClientRepository

type ClientRepository interface {
	CreateClient(id string, username string, appID string, extra string) error
	ExistsAppClient(AppID string, ClientID string) (bool, error)
	//GetClientExtra(id string) (string, error)
	GetAppClient(AppID string, ClientID string) (*Client, error)
	DeleteClient(id string) error
	DeleteAppClients(appID string) error
	UpdateClient(id string, username string, extra string) error
	//UpdateClientUsername(id string, username string) error
	//UpdateClientExtra(id string, extra string) error
	GetAppClients(appID string) ([]*Client, error)
	//GetAppClientsCount(appID string) (uint64, error)
	GetAllClients() ([]*Client, error)
}

ClientRepository - Repository for handling Client table

type ClientStatus

type ClientStatus struct {
	Status               bool     `protobuf:"varint,1,opt,name=status,proto3" json:"status,omitempty"`
	Timestamp            int64    `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*ClientStatus) Descriptor

func (*ClientStatus) Descriptor() ([]byte, []int)

func (*ClientStatus) GetStatus

func (m *ClientStatus) GetStatus() bool

func (*ClientStatus) GetTimestamp

func (m *ClientStatus) GetTimestamp() int64

func (*ClientStatus) Marshal

func (m *ClientStatus) Marshal() (dAtA []byte, err error)

func (*ClientStatus) MarshalTo

func (m *ClientStatus) MarshalTo(dAtA []byte) (int, error)

func (*ClientStatus) MarshalToSizedBuffer

func (m *ClientStatus) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ClientStatus) ProtoMessage

func (*ClientStatus) ProtoMessage()

func (*ClientStatus) Reset

func (m *ClientStatus) Reset()

func (*ClientStatus) Size

func (m *ClientStatus) Size() (n int)

func (*ClientStatus) String

func (m *ClientStatus) String() string

func (*ClientStatus) Unmarshal

func (m *ClientStatus) Unmarshal(dAtA []byte) error

func (*ClientStatus) XXX_DiscardUnknown

func (m *ClientStatus) XXX_DiscardUnknown()

func (*ClientStatus) XXX_Marshal

func (m *ClientStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ClientStatus) XXX_Merge

func (m *ClientStatus) XXX_Merge(src proto.Message)

func (*ClientStatus) XXX_Size

func (m *ClientStatus) XXX_Size() int

func (*ClientStatus) XXX_Unmarshal

func (m *ClientStatus) XXX_Unmarshal(b []byte) error

type Connection

type Connection interface {
	//Init(ws *websocket.Conn)
	Send([]byte)
	SendText([]byte)
	SetOnMessage(func([]byte))
	SetOnClose(func())
	SetOnHeartBeat(func())
	Close()
	IsConnected() bool
}

Connection - Interface for connections

type CreateChannelRequest

type CreateChannelRequest struct {
	ChannelID  string   `json:"channelID"`
	Name       string   `json:"name"`
	Persistent bool     `json:"persistent"`
	Private    bool     `json:"private"`
	Presence   bool     `json:"presence"`
	Users      []string `json:"users"`
	Extra      string   `json:"extra"`
	Push       bool     `json:"push"`
}

CreateChannelRequest - Create channel with given ID and settings

type DatabaseStorage

type DatabaseStorage interface {
	GetAppRepository() AppRepository
	GetClientRepository() ClientRepository
	GetChannelRepository() ChannelRepository
	GetDeviceRepository() DeviceRepository
}

DatabaseStorage - Persistent database storage interface

type Device

type Device struct {
	ID       string
	Token    string
	ClientID string
}

Device - Database representation of a device

type DeviceRepository

type DeviceRepository interface {
	CreateDevice(id string, token string, clientID string) error
	GetDevice(id string) (*Device, error)
	DeleteDevice(id string) error
	DeleteClientDevices(clientID string) error
	GetClientDevices(clientID string) ([]*Device, error)
	//GetClientDeviceTokens(clientID string) ([]string, error)
	GetClientsDeviceTokens(clientIDs []string, amount int) ([]string, error)
}

DeviceRepository - Repository for handling client devices

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine - Holds application components

func GetEngine

func GetEngine() *Engine

GetEngine - Get engine singleton

func (*Engine) GetAppRepository

func (engine *Engine) GetAppRepository() AppRepository

GetAppRepository - Get persistent repository

func (*Engine) GetAuthHook

func (engine *Engine) GetAuthHook() AuthHook

func (*Engine) GetCacheStorage

func (engine *Engine) GetCacheStorage() CacheStorage

GetCacheStorage - Get cache storage instance

func (*Engine) GetChannelRepository

func (engine *Engine) GetChannelRepository() ChannelRepository

GetChannelRepository - Get persistent repository

func (*Engine) GetClientRepository

func (engine *Engine) GetClientRepository() ClientRepository

GetClientRepository - Get persistent repository

func (*Engine) GetDeviceRepository

func (engine *Engine) GetDeviceRepository() DeviceRepository

GetDeviceRepository - Get persistent repository

func (*Engine) GetHubsHandler

func (engine *Engine) GetHubsHandler() *HubsHandler

func (*Engine) GetPresence

func (engine *Engine) GetPresence() PresenceHandler

GetPresence - Get presence handler

func (*Engine) GetPublisher

func (engine *Engine) GetPublisher() PublishHandler

GetPublisher - Get Publisher handler

func (*Engine) GetPushHandler

func (engine *Engine) GetPushHandler() PushNotificationHandler

GetPushHandler - Get Push notification handler

func (*Engine) GetServerID

func (engine *Engine) GetServerID() string

GetServerID - Get Server ID

func (*Engine) StoreEvent

func (engine *Engine) StoreEvent(appID string, event *ChannelEvent)

StoreEvent - Append channel to insert queue

type EngineConfig

type EngineConfig struct {
	ServerID                string                  // ServerID for server indetification, if not provided one will be generated
	HubsHandler             *HubsHandler            // If nil, then a default one is created
	DBStorage               DatabaseStorage         // Struct that holds repository
	CacheStorage            CacheStorage            // Channels, App, Sessions and events cache
	PublishHandler          PublishHandler          // Publish between servers handler
	PresenceHandler         PresenceHandler         // Handler for tracking user presence
	PushNotificationHandler PushNotificationHandler // Handler for sending push notifications
	DBWorkers               int                     // If set to -1 it will to to the default of 10
	InsertCacheLimit        int                     // Amount of events stored before batching into the database
	StorageInsert           StorageInsert           // Handler for events being stored, you can use this to batch to events, or simply ignore them. For a batching default one use StorageInsertQueue, that uses the property InsertCacheLimit
	AuthHook                AuthHook                // For the default connection, to authorize connections
}

EngineConfig - Config for the engine, including storage, cache, push notifications

type Envelope

type Envelope struct {
	IsMultiple           bool        `protobuf:"varint,1,opt,name=isMultiple,proto3" json:"isMultiple,omitempty"`
	Events               []*NewEvent `protobuf:"bytes,2,rep,name=events,proto3" json:"events,omitempty"`
	XXX_NoUnkeyedLiteral struct{}    `json:"-"`
	XXX_unrecognized     []byte      `json:"-"`
	XXX_sizecache        int32       `json:"-"`
}

func (*Envelope) Descriptor

func (*Envelope) Descriptor() ([]byte, []int)

func (*Envelope) GetEvents

func (m *Envelope) GetEvents() []*NewEvent

func (*Envelope) GetIsMultiple

func (m *Envelope) GetIsMultiple() bool

func (*Envelope) Marshal

func (m *Envelope) Marshal() (dAtA []byte, err error)

func (*Envelope) MarshalTo

func (m *Envelope) MarshalTo(dAtA []byte) (int, error)

func (*Envelope) MarshalToSizedBuffer

func (m *Envelope) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Envelope) ProtoMessage

func (*Envelope) ProtoMessage()

func (*Envelope) Reset

func (m *Envelope) Reset()

func (*Envelope) Size

func (m *Envelope) Size() (n int)

func (*Envelope) String

func (m *Envelope) String() string

func (*Envelope) Unmarshal

func (m *Envelope) Unmarshal(dAtA []byte) error

func (*Envelope) XXX_DiscardUnknown

func (m *Envelope) XXX_DiscardUnknown()

func (*Envelope) XXX_Marshal

func (m *Envelope) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Envelope) XXX_Merge

func (m *Envelope) XXX_Merge(src proto.Message)

func (*Envelope) XXX_Size

func (m *Envelope) XXX_Size() int

func (*Envelope) XXX_Unmarshal

func (m *Envelope) XXX_Unmarshal(b []byte) error

type GetChannelsResponse

type GetChannelsResponse struct {
	Channels []*Channel `json:"channels"`
}

GetChannelsResponse - Channels response data holder

type Hub

type Hub struct {
	AppID string
	// contains filtered or unexported fields
}

Hub - Handles channels and publishing

func NewHub

func NewHub(AppID string, hook HubHook) *Hub

NewHub - Create a new Hub

func (*Hub) AddChannelToClient

func (hub *Hub) AddChannelToClient(clientID string, channelID string)

AddChannelToClient - Add channel to current connected client

func (*Hub) AddClient

func (hub *Hub) AddClient(session *Session)

AddClient - Add client to connected map

func (*Hub) Close

func (hub *Hub) Close()

Close - Remove all channels and connections

func (*Hub) ContainsChannel

func (hub *Hub) ContainsChannel(channelID string) *HubChannel

ContainsChannel - Get HubChannel if exists in memory

func (*Hub) DeleteChannel

func (hub *Hub) DeleteChannel(channelID string)

DeleteChannel - Remove channel including subscriptions

func (*Hub) Publish

func (hub *Hub) Publish(channelID string, channelEvent *ChannelEvent, shouldStore bool, session *Session) bool

Publish - Send the given payload to subscribed session

func (*Hub) RemoveChannelFromClient

func (hub *Hub) RemoveChannelFromClient(clientID string, channelID string)

RemoveChannelFromClient - Remove channel to current connected client

func (*Hub) RemoveClient

func (hub *Hub) RemoveClient(session *Session)

RemoveClient - Remove client from connected clients and channels

func (*Hub) Subscribe

func (hub *Hub) Subscribe(channelID string, session *Session) *HubChannel

Subscribe - Add subscriber to given channel

func (*Hub) Unsubscribe

func (hub *Hub) Unsubscribe(channelID string, session *Session)

Unsubscribe - Remove subscriber from given channel

type HubChannel

type HubChannel struct {
	Data *Channel
	// contains filtered or unexported fields
}

HubChannel - Handler for topic

func NewChannel

func NewChannel(ID string, AppID string, hub *Hub) *HubChannel

NewChannel - Create and initialize channel

func RemoveChannelIndex

func RemoveChannelIndex(s []*HubChannel, index int) []*HubChannel

RemoveChannelIndex - Helper to remove index from slice

func (*HubChannel) DeleteChannel

func (channel *HubChannel) DeleteChannel()

DeleteChannel - Unsubscribe all clients and stop accepting subscriptions

func (*HubChannel) ExternalPublish

func (channel *HubChannel) ExternalPublish(channelEvent *ChannelEvent) bool

ExternalPublish - Publish to be used by HTTP and Publisher so we don't republish nor store in db/cache

func (*HubChannel) ExternalPublishStatusChange

func (channel *HubChannel) ExternalPublishStatusChange(statusUpdate *OnlineStatusUpdate) bool

ExternalPublishStatusChange - Publish new event about user status update, it doesn't resend data back to publisher

func (*HubChannel) NewClient

func (channel *HubChannel) NewClient(session *Session)

NewClient - Add client to channel

func (*HubChannel) Publish

func (channel *HubChannel) Publish(channelEvent *ChannelEvent, shouldStore bool) bool

Publish - Send message to all connected clients

func (*HubChannel) PublishJoinLeave

func (channel *HubChannel) PublishJoinLeave(eventType NewEvent_NewEventType, payload []byte)

PublishJoinLeave - Publish Join or Leave events to connected clients

func (*HubChannel) PublishStatusChange

func (channel *HubChannel) PublishStatusChange(statusUpdate *OnlineStatusUpdate) bool

PublishStatusChange - Publish new event about user status update

func (*HubChannel) RemoveClient

func (channel *HubChannel) RemoveClient(session *Session)

RemoveClient - Remove client from channel

type HubHook

type HubHook interface {
	// Called when there are no more sessions on this hub, then it is removed to save memory
	OnClose(hub *Hub)

	// When a channels is deleted/unused for some time or closed
	// It closes all connections and then calls this function
	OnChannelRemoved(channelID string, hub *Hub)

	// Called when a new session connects, you may use it to set a SessionHook
	OnSessionAdded(session *Session, hub *Hub)
	// Called when a new session disconnects
	OnSessionRemoved(session *Session, hub *Hub)

	// Called before publishing, you may return false to prevent from publishing
	// Also you may set shouldStore to false in order to prevent the event from being stored on the DB
	// The first return param is if publish should be cancelled or not, the second is if the event should be stored or not
	// For default behaviour return the shouldStore property
	OnPublish(channelID string, channelEvent *ChannelEvent, shouldStore bool, session *Session) (bool, bool)
	// Called before subscribing, you may return false to prevent the session from subscribing
	OnSubscribe(channelID string, session *Session) bool
	// Called after a session unsubscribe
	OnUnsubscribe(channelID string, session *Session)
}

type HubsHandler

type HubsHandler struct {
	// contains filtered or unexported fields
}

HubsHandler - Handle the hubs per application

func NewHubsHandler

func NewHubsHandler(hook HubsHandlerHook) *HubsHandler

func (*HubsHandler) ContainsHub

func (handler *HubsHandler) ContainsHub(AppID string) *Hub

ContainsHub - Return hub if exists

func (*HubsHandler) GetHub

func (handler *HubsHandler) GetHub(AppID string) *Hub

GetHub - Get the Hub with the given AppID, if not found creates one

func (*HubsHandler) NewHub

func (handler *HubsHandler) NewHub(AppID string) *Hub

NewHub - Create a new hub in this server add to the map

func (*HubsHandler) RemoveHub

func (handler *HubsHandler) RemoveHub(AppID string)

RemoveHub - Remove hub from active hub and close all channels and connections

type HubsHandlerHook

type HubsHandlerHook interface {
	OnNewHub(hub *Hub) HubHook
	OnRemoveHub(hub *Hub)
}

type InitialPresenceStatus

type InitialPresenceStatus struct {
	ChannelID            string                   `protobuf:"bytes,1,opt,name=channelID,proto3" json:"channelID,omitempty"`
	ClientStatus         map[string]*ClientStatus `` /* 165-byte string literal not displayed */
	XXX_NoUnkeyedLiteral struct{}                 `json:"-"`
	XXX_unrecognized     []byte                   `json:"-"`
	XXX_sizecache        int32                    `json:"-"`
}

func (*InitialPresenceStatus) Descriptor

func (*InitialPresenceStatus) Descriptor() ([]byte, []int)

func (*InitialPresenceStatus) GetChannelID

func (m *InitialPresenceStatus) GetChannelID() string

func (*InitialPresenceStatus) GetClientStatus

func (m *InitialPresenceStatus) GetClientStatus() map[string]*ClientStatus

func (*InitialPresenceStatus) Marshal

func (m *InitialPresenceStatus) Marshal() (dAtA []byte, err error)

func (*InitialPresenceStatus) MarshalTo

func (m *InitialPresenceStatus) MarshalTo(dAtA []byte) (int, error)

func (*InitialPresenceStatus) MarshalToSizedBuffer

func (m *InitialPresenceStatus) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*InitialPresenceStatus) ProtoMessage

func (*InitialPresenceStatus) ProtoMessage()

func (*InitialPresenceStatus) Reset

func (m *InitialPresenceStatus) Reset()

func (*InitialPresenceStatus) Size

func (m *InitialPresenceStatus) Size() (n int)

func (*InitialPresenceStatus) String

func (m *InitialPresenceStatus) String() string

func (*InitialPresenceStatus) Unmarshal

func (m *InitialPresenceStatus) Unmarshal(dAtA []byte) error

func (*InitialPresenceStatus) XXX_DiscardUnknown

func (m *InitialPresenceStatus) XXX_DiscardUnknown()

func (*InitialPresenceStatus) XXX_Marshal

func (m *InitialPresenceStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*InitialPresenceStatus) XXX_Merge

func (m *InitialPresenceStatus) XXX_Merge(src proto.Message)

func (*InitialPresenceStatus) XXX_Size

func (m *InitialPresenceStatus) XXX_Size() int

func (*InitialPresenceStatus) XXX_Unmarshal

func (m *InitialPresenceStatus) XXX_Unmarshal(b []byte) error

type InsertItem

type InsertItem struct {
	Event *ChannelEvent
	AppID string
}

InsertItem - Insert Item queued

type LastDevicePresence

type LastDevicePresence struct {
	ClientID  string `json:"clientID"`
	DeviceID  string `json:"deviceID"`
	Timestamp int64  `json:"timestamp"`
}

LastDevicePresence - Represents last client device heart beat

type NewEvent

type NewEvent struct {
	Type                 NewEvent_NewEventType `protobuf:"varint,1,opt,name=type,proto3,enum=NewEvent_NewEventType" json:"type,omitempty"`
	Payload              []byte                `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
	XXX_NoUnkeyedLiteral struct{}              `json:"-"`
	XXX_unrecognized     []byte                `json:"-"`
	XXX_sizecache        int32                 `json:"-"`
}

func (*NewEvent) Descriptor

func (*NewEvent) Descriptor() ([]byte, []int)

func (*NewEvent) GetPayload

func (m *NewEvent) GetPayload() []byte

func (*NewEvent) GetType

func (m *NewEvent) GetType() NewEvent_NewEventType

func (*NewEvent) Marshal

func (m *NewEvent) Marshal() (dAtA []byte, err error)

func (*NewEvent) MarshalTo

func (m *NewEvent) MarshalTo(dAtA []byte) (int, error)

func (*NewEvent) MarshalToSizedBuffer

func (m *NewEvent) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*NewEvent) ProtoMessage

func (*NewEvent) ProtoMessage()

func (*NewEvent) Reset

func (m *NewEvent) Reset()

func (*NewEvent) Size

func (m *NewEvent) Size() (n int)

func (*NewEvent) String

func (m *NewEvent) String() string

func (*NewEvent) Unmarshal

func (m *NewEvent) Unmarshal(dAtA []byte) error

func (*NewEvent) XXX_DiscardUnknown

func (m *NewEvent) XXX_DiscardUnknown()

func (*NewEvent) XXX_Marshal

func (m *NewEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*NewEvent) XXX_Merge

func (m *NewEvent) XXX_Merge(src proto.Message)

func (*NewEvent) XXX_Size

func (m *NewEvent) XXX_Size() int

func (*NewEvent) XXX_Unmarshal

func (m *NewEvent) XXX_Unmarshal(b []byte) error

type NewEvent_NewEventType

type NewEvent_NewEventType int32
const (
	NewEvent_JOIN_CHANNEL          NewEvent_NewEventType = 0
	NewEvent_LEAVE_CHANNEL         NewEvent_NewEventType = 1
	NewEvent_NEW_CHANNEL           NewEvent_NewEventType = 2
	NewEvent_REMOVE_CHANNEL        NewEvent_NewEventType = 3
	NewEvent_SUBSCRIBE             NewEvent_NewEventType = 4
	NewEvent_PUBLISH               NewEvent_NewEventType = 5
	NewEvent_ACK                   NewEvent_NewEventType = 6
	NewEvent_ONLINE_STATUS         NewEvent_NewEventType = 7
	NewEvent_INITIAL_ONLINE_STATUS NewEvent_NewEventType = 8
)

func (NewEvent_NewEventType) EnumDescriptor

func (NewEvent_NewEventType) EnumDescriptor() ([]byte, []int)

func (NewEvent_NewEventType) String

func (x NewEvent_NewEventType) String() string

type OnlineStatusUpdate

type OnlineStatusUpdate struct {
	ChannelID            string   `protobuf:"bytes,1,opt,name=channelID,proto3" json:"channelID,omitempty"`
	ClientID             string   `protobuf:"bytes,2,opt,name=clientID,proto3" json:"clientID,omitempty"`
	Status               bool     `protobuf:"varint,3,opt,name=status,proto3" json:"status,omitempty"`
	Timestamp            int64    `protobuf:"varint,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*OnlineStatusUpdate) Descriptor

func (*OnlineStatusUpdate) Descriptor() ([]byte, []int)

func (*OnlineStatusUpdate) GetChannelID

func (m *OnlineStatusUpdate) GetChannelID() string

func (*OnlineStatusUpdate) GetClientID

func (m *OnlineStatusUpdate) GetClientID() string

func (*OnlineStatusUpdate) GetStatus

func (m *OnlineStatusUpdate) GetStatus() bool

func (*OnlineStatusUpdate) GetTimestamp

func (m *OnlineStatusUpdate) GetTimestamp() int64

func (*OnlineStatusUpdate) Marshal

func (m *OnlineStatusUpdate) Marshal() (dAtA []byte, err error)

func (*OnlineStatusUpdate) MarshalTo

func (m *OnlineStatusUpdate) MarshalTo(dAtA []byte) (int, error)

func (*OnlineStatusUpdate) MarshalToSizedBuffer

func (m *OnlineStatusUpdate) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*OnlineStatusUpdate) ProtoMessage

func (*OnlineStatusUpdate) ProtoMessage()

func (*OnlineStatusUpdate) Reset

func (m *OnlineStatusUpdate) Reset()

func (*OnlineStatusUpdate) Size

func (m *OnlineStatusUpdate) Size() (n int)

func (*OnlineStatusUpdate) String

func (m *OnlineStatusUpdate) String() string

func (*OnlineStatusUpdate) Unmarshal

func (m *OnlineStatusUpdate) Unmarshal(dAtA []byte) error

func (*OnlineStatusUpdate) XXX_DiscardUnknown

func (m *OnlineStatusUpdate) XXX_DiscardUnknown()

func (*OnlineStatusUpdate) XXX_Marshal

func (m *OnlineStatusUpdate) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*OnlineStatusUpdate) XXX_Merge

func (m *OnlineStatusUpdate) XXX_Merge(src proto.Message)

func (*OnlineStatusUpdate) XXX_Size

func (m *OnlineStatusUpdate) XXX_Size() int

func (*OnlineStatusUpdate) XXX_Unmarshal

func (m *OnlineStatusUpdate) XXX_Unmarshal(b []byte) error

type PresenceHandler

type PresenceHandler interface {

	// Channel Presence
	GetChannelClientsPresence(appID string, channelID string) map[string]int64
	AddOnlineChannelDevice(appID string, channelID string, clientID string, deviceID string)
	RemoveOnlineChannelDevice(appID string, channelID string, clientID string, deviceID string)
	GetChannelAmountOfClientDevices(appID string, channelID string, clientID string) int64
	IsClientDeviceConnectToChannel(appID string, channelID string, clientID string, deviceID string) bool

	// This Instant Online Status
	SetDeviceOnline(clientID string, deviceID string)
	SetDeviceOffline(clientID string, deviceID string)
	GetClientOnlineDevices(clientID string) ([]string, error)

	IsOnline(clientID string) bool
	AddDevice(clientID string, deviceID string)
	RemoveDevice(clientID string, deviceID string)

	// Last timestamps
	GetClientDevicesPresences(clientID string) ([]*LastDevicePresence, error)
	UpdateDeviceTimestamp(clientID string, deviceID string)
	// Client timestamps for heart beasts
	UpdateClientTimestamp(clientID string)
	GetClientTimestamp(clientID string) int64
}

PresenceHandler - Handle client online status

type PublishAck

type PublishAck struct {
	ReplyTo              uint32   `protobuf:"varint,1,opt,name=replyTo,proto3" json:"replyTo,omitempty"`
	Status               bool     `protobuf:"varint,2,opt,name=status,proto3" json:"status,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*PublishAck) Descriptor

func (*PublishAck) Descriptor() ([]byte, []int)

func (*PublishAck) GetReplyTo

func (m *PublishAck) GetReplyTo() uint32

func (*PublishAck) GetStatus

func (m *PublishAck) GetStatus() bool

func (*PublishAck) Marshal

func (m *PublishAck) Marshal() (dAtA []byte, err error)

func (*PublishAck) MarshalTo

func (m *PublishAck) MarshalTo(dAtA []byte) (int, error)

func (*PublishAck) MarshalToSizedBuffer

func (m *PublishAck) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*PublishAck) ProtoMessage

func (*PublishAck) ProtoMessage()

func (*PublishAck) Reset

func (m *PublishAck) Reset()

func (*PublishAck) Size

func (m *PublishAck) Size() (n int)

func (*PublishAck) String

func (m *PublishAck) String() string

func (*PublishAck) Unmarshal

func (m *PublishAck) Unmarshal(dAtA []byte) error

func (*PublishAck) XXX_DiscardUnknown

func (m *PublishAck) XXX_DiscardUnknown()

func (*PublishAck) XXX_Marshal

func (m *PublishAck) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*PublishAck) XXX_Merge

func (m *PublishAck) XXX_Merge(src proto.Message)

func (*PublishAck) XXX_Size

func (m *PublishAck) XXX_Size() int

func (*PublishAck) XXX_Unmarshal

func (m *PublishAck) XXX_Unmarshal(b []byte) error

type PublishHandler

type PublishHandler interface {
	PublishChannelPresenceChange(appID string, channelID string, clientID string, isJoin bool)
	PublishChannelAccessChange(appID string, channelID string, clientID string, isAdd bool)
	PublishChannelEvent(appID string, channelID string, channelEvent *ChannelEvent)
	PublishChannelOnlineChange(appID string, channelID string, statusUpdate *OnlineStatusUpdate)
	Subscribe(appID string, channelID string)
	Unsubscribe(appID string, channelID string)
}

PublishHandler - Interface for publishing events between servers

type PublishRequest

type PublishRequest struct {
	ID                   uint32   `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"`
	EventType            string   `protobuf:"bytes,2,opt,name=eventType,proto3" json:"eventType,omitempty"`
	ChannelID            string   `protobuf:"bytes,3,opt,name=channelID,proto3" json:"channelID,omitempty"`
	Payload              string   `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*PublishRequest) Descriptor

func (*PublishRequest) Descriptor() ([]byte, []int)

func (*PublishRequest) GetChannelID

func (m *PublishRequest) GetChannelID() string

func (*PublishRequest) GetEventType

func (m *PublishRequest) GetEventType() string

func (*PublishRequest) GetID

func (m *PublishRequest) GetID() uint32

func (*PublishRequest) GetPayload

func (m *PublishRequest) GetPayload() string

func (*PublishRequest) Marshal

func (m *PublishRequest) Marshal() (dAtA []byte, err error)

func (*PublishRequest) MarshalTo

func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error)

func (*PublishRequest) MarshalToSizedBuffer

func (m *PublishRequest) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*PublishRequest) ProtoMessage

func (*PublishRequest) ProtoMessage()

func (*PublishRequest) Reset

func (m *PublishRequest) Reset()

func (*PublishRequest) Size

func (m *PublishRequest) Size() (n int)

func (*PublishRequest) String

func (m *PublishRequest) String() string

func (*PublishRequest) Unmarshal

func (m *PublishRequest) Unmarshal(dAtA []byte) error

func (*PublishRequest) XXX_DiscardUnknown

func (m *PublishRequest) XXX_DiscardUnknown()

func (*PublishRequest) XXX_Marshal

func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*PublishRequest) XXX_Merge

func (m *PublishRequest) XXX_Merge(src proto.Message)

func (*PublishRequest) XXX_Size

func (m *PublishRequest) XXX_Size() int

func (*PublishRequest) XXX_Unmarshal

func (m *PublishRequest) XXX_Unmarshal(b []byte) error

type PushNotificationHandler

type PushNotificationHandler interface {
	EnqueueRequest(request *PushRequestItem)
}

type PushRequestItem

type PushRequestItem struct {
	ChannelID string
	EventType string
	Payload   string
	Timestamp int64
	ClientIDs []string
}

PushRequestItem - Push notification request item

type Session

type Session struct {
	ID string

	SubscribedChannels []*HubChannel
	AllowedChannels    []string
	SessionIdentifier  string // We create a string once and store now, instead of creating every time
	// contains filtered or unexported fields
}

Session - an updated session handling

func (*Session) AddChannel

func (session *Session) AddChannel(channelID string)

AddChannel - Add channel while client is connected

func (*Session) CanPublish

func (session *Session) CanPublish(channelID string, event *ChannelEvent, publishRequest *PublishRequest)

CanPublish - Check if user is allowed to publish, if so publish Also, if a requestID is given we notify the channel (if it is persistent) to store the event Otherwise we publish but won't store the event, nor send the notify back

func (*Session) CanSubscribe

func (session *Session) CanSubscribe(channelID string) bool

CanSubscribe - Check if user is allowed to subscribe, if so subscribe

func (*Session) Close

func (session *Session) Close()

Close - closes session and connection

func (*Session) GetHub

func (session *Session) GetHub() *Hub

func (*Session) GetIdentifier

func (session *Session) GetIdentifier() string

GetIdentifier - Get client and device identifier

func (*Session) Init

func (session *Session) Init(connection Connection, deviceID string, identity *auth.Identity, clientID string, hub *Hub)

Init - initialize properties and start sending messages

func (*Session) Publish

func (session *Session) Publish(data []byte)

Publish - Send data to subscribed client

func (*Session) RemoveChannel

func (session *Session) RemoveChannel(channelID string)

RemoveChannel - Remove channel while client is connected

func (*Session) Send

func (session *Session) Send(channelEvent *ChannelEvent) error

func (*Session) SetHook

func (session *Session) SetHook(hook SessionHook)

type SessionData

type SessionData struct {
	SessionID       string
	AppID           string
	DeviceID        string
	AllowedChannels []string
}

type SessionHook

type SessionHook interface {
	OnInitialized(session *Session)
	OnClose(session *Session)

	// Called while checking if user can subscribe, the isAllowedChannel means if the channel is in the allowed list
	// You must return a bool if the user can subscribe or not, you may return isAllowedChannel for the default behaviour
	CanSubscribe(channelID string, session *Session, isAllowedChannel bool) bool

	// Called while checking if use can publish, the isAllowedChannel means if the channel is in the allowed list
	// You must return a bool if the user can publish or not, you may return isAllowedChannel for the default behaviour
	CanPublish(channelID string, session *Session, isAllowedChannel bool) bool
}

type StorageInsert

type StorageInsert interface {
	StoreEvent(appID string, event *ChannelEvent)
	Start(channelRepository ChannelRepository)
}

type StorageInsertQueue

type StorageInsertQueue struct {
	// contains filtered or unexported fields
}

StorageInsertQueue - Receives all insert requests and send them into the database

func NewStorageInsertQueue

func NewStorageInsertQueue() *StorageInsertQueue

func (*StorageInsertQueue) Start

func (storage *StorageInsertQueue) Start(repo ChannelRepository)

func (*StorageInsertQueue) StoreEvent

func (storage *StorageInsertQueue) StoreEvent(appID string, event *ChannelEvent)

type SubscribeRequest

type SubscribeRequest struct {
	ChannelID            string   `protobuf:"bytes,1,opt,name=channelID,proto3" json:"channelID,omitempty"`
	ID                   uint32   `protobuf:"varint,2,opt,name=ID,proto3" json:"ID,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*SubscribeRequest) Descriptor

func (*SubscribeRequest) Descriptor() ([]byte, []int)

func (*SubscribeRequest) GetChannelID

func (m *SubscribeRequest) GetChannelID() string

func (*SubscribeRequest) GetID

func (m *SubscribeRequest) GetID() uint32

func (*SubscribeRequest) Marshal

func (m *SubscribeRequest) Marshal() (dAtA []byte, err error)

func (*SubscribeRequest) MarshalTo

func (m *SubscribeRequest) MarshalTo(dAtA []byte) (int, error)

func (*SubscribeRequest) MarshalToSizedBuffer

func (m *SubscribeRequest) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*SubscribeRequest) ProtoMessage

func (*SubscribeRequest) ProtoMessage()

func (*SubscribeRequest) Reset

func (m *SubscribeRequest) Reset()

func (*SubscribeRequest) Size

func (m *SubscribeRequest) Size() (n int)

func (*SubscribeRequest) String

func (m *SubscribeRequest) String() string

func (*SubscribeRequest) Unmarshal

func (m *SubscribeRequest) Unmarshal(dAtA []byte) error

func (*SubscribeRequest) XXX_DiscardUnknown

func (m *SubscribeRequest) XXX_DiscardUnknown()

func (*SubscribeRequest) XXX_Marshal

func (m *SubscribeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*SubscribeRequest) XXX_Merge

func (m *SubscribeRequest) XXX_Merge(src proto.Message)

func (*SubscribeRequest) XXX_Size

func (m *SubscribeRequest) XXX_Size() int

func (*SubscribeRequest) XXX_Unmarshal

func (m *SubscribeRequest) XXX_Unmarshal(b []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL