adapter

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DEFAULT_STREAM_NAME        = "socket.io"
	DEFAULT_MAX_LEN            = 10000
	DEFAULT_READ_COUNT         = 100
	DEFAULT_SESSION_KEY_PREFIX = "sio:session:"
	DEFAULT_MESSAGE_ID_PREFIX  = "sio:message:"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Adapter

type Adapter interface {
	ServerCount() int
	Close()

	AddAll(sid SocketID, rooms []Room)
	Delete(sid SocketID, room Room)
	DeleteAll(sid SocketID)

	Broadcast(header *parser.PacketHeader, v []any, opts *BroadcastOptions)

	// The return value 'sids' is a thread safe mapset.Set.
	Sockets(rooms mapset.Set[Room]) (sids mapset.Set[SocketID])
	// The return value 'rooms' is a thread safe mapset.Set.
	SocketRooms(sid SocketID) (rooms mapset.Set[Room], ok bool)

	FetchSockets(opts *BroadcastOptions) (sockets []Socket)

	AddSockets(opts *BroadcastOptions, rooms ...Room)
	DelSockets(opts *BroadcastOptions, rooms ...Room)
	DisconnectSockets(opts *BroadcastOptions, close bool)

	ServerSideEmit(header *parser.PacketHeader, v []any)

	// Save the client session in order to restore it upon reconnection.
	PersistSession(session *SessionToPersist)

	// Restore the session and find the packets that were missed by the client.
	//
	// ok returns false when there is no session or the session has expired.
	RestoreSession(pid PrivateSessionID, offset string) (session *SessionToPersist, ok bool)
}

type BroadcastFlags

type BroadcastFlags struct {
	// This flag is unused at the moment, but for compatibility with the socket.io API, it stays here.
	Compress bool
	Local    bool
}

type BroadcastOperator

type BroadcastOperator struct {
	// contains filtered or unexported fields
}

func NewBroadcastOperator

func NewBroadcastOperator(
	nsp string,
	adapter Adapter,
	isEventReserved func(string) bool,
) *BroadcastOperator

func (*BroadcastOperator) Compress

func (b *BroadcastOperator) Compress(compress bool) *BroadcastOperator

Compression flag is unused at the moment, thus setting this will have no effect on compression.

func (*BroadcastOperator) DisconnectSockets

func (b *BroadcastOperator) DisconnectSockets(close bool)

Makes the matching socket instances disconnect from the namespace.

If value of close is true, closes the underlying connection. Otherwise, it just disconnects the namespace.

func (*BroadcastOperator) Emit

func (b *BroadcastOperator) Emit(eventName string, _v ...any)

Emits an event to all choosen clients.

func (*BroadcastOperator) Except

func (b *BroadcastOperator) Except(room ...Room) *BroadcastOperator

Sets a modifier for a subsequent event emission that the event will only be broadcast to clients that have not joined the given rooms.

func (*BroadcastOperator) FetchSockets

func (b *BroadcastOperator) FetchSockets() []Socket

Returns the matching socket instances. This method works across a cluster of several Socket.IO servers.

func (*BroadcastOperator) In

func (b *BroadcastOperator) In(room ...Room) *BroadcastOperator

Alias of To(...)

func (*BroadcastOperator) Local

Sets a modifier for a subsequent event emission that the event data will only be broadcast to the current node (when scaling to multiple nodes).

See: https://socket.io/docs/v4/using-multiple-nodes

func (*BroadcastOperator) SocketsJoin

func (b *BroadcastOperator) SocketsJoin(room ...Room)

Makes the matching socket instances join the specified rooms.

func (*BroadcastOperator) SocketsLeave

func (b *BroadcastOperator) SocketsLeave(room ...Room)

Makes the matching socket instances leave the specified rooms.

func (*BroadcastOperator) To

func (b *BroadcastOperator) To(room ...Room) *BroadcastOperator

Sets a modifier for a subsequent event emission that the event will only be broadcast to clients that have joined the given room.

To emit to multiple rooms, you can call To several times.

type BroadcastOptions

type BroadcastOptions struct {
	Rooms  mapset.Set[Room]
	Except mapset.Set[Room]
	Flags  BroadcastFlags
}

func NewBroadcastOptions

func NewBroadcastOptions() *BroadcastOptions

func (BroadcastOptions) MarshalBinary

func (b BroadcastOptions) MarshalBinary() ([]byte, error)

func (*BroadcastOptions) UnmarshalJSON

func (b *BroadcastOptions) UnmarshalJSON(data []byte) error

type Creator

type Creator func(socketStore SocketStore, parserCreator parser.Creator) Adapter

func NewInMemoryAdapterCreator

func NewInMemoryAdapterCreator() Creator

func NewRedisStreamAdapterCreator

func NewRedisStreamAdapterCreator(redisClient redis.Cmdable, opts *RedisStreamsAdapterOptions) Creator

func NewSessionAwareAdapterCreator

func NewSessionAwareAdapterCreator(maxDisconnectionDuration time.Duration) Creator

type PersistedPacket

type PersistedPacket struct {
	ID        string
	EmittedAt time.Time
	Opts      *BroadcastOptions

	Header      *parser.PacketHeader
	Data        []any
	EncodedData [][]byte
}

func (*PersistedPacket) HasExpired

func (p *PersistedPacket) HasExpired(maxDisconnectDuration time.Duration) bool

type PrivateSessionID

type PrivateSessionID string

A private ID, sent by the server at the beginning of the Socket.IO session and used for connection state recovery upon reconnection.

type RedisStreamAdapter

type RedisStreamAdapter struct {
	// contains filtered or unexported fields
}

func (*RedisStreamAdapter) AddAll

func (a *RedisStreamAdapter) AddAll(sid SocketID, rooms []Room)

func (*RedisStreamAdapter) AddSockets

func (a *RedisStreamAdapter) AddSockets(opts *BroadcastOptions, rooms ...Room)

func (*RedisStreamAdapter) Broadcast

func (a *RedisStreamAdapter) Broadcast(header *parser.PacketHeader, v []any, opts *BroadcastOptions)

func (*RedisStreamAdapter) Close

func (a *RedisStreamAdapter) Close()

func (*RedisStreamAdapter) DelSockets

func (a *RedisStreamAdapter) DelSockets(opts *BroadcastOptions, rooms ...Room)

func (*RedisStreamAdapter) Delete

func (a *RedisStreamAdapter) Delete(sid SocketID, room Room)

func (*RedisStreamAdapter) DeleteAll

func (a *RedisStreamAdapter) DeleteAll(sid SocketID)

func (*RedisStreamAdapter) DisconnectSockets

func (a *RedisStreamAdapter) DisconnectSockets(opts *BroadcastOptions, close bool)

func (*RedisStreamAdapter) FetchSockets

func (a *RedisStreamAdapter) FetchSockets(opts *BroadcastOptions) (sockets []Socket)

func (*RedisStreamAdapter) PersistSession

func (a *RedisStreamAdapter) PersistSession(session *SessionToPersist)

func (*RedisStreamAdapter) RestoreSession

func (a *RedisStreamAdapter) RestoreSession(pid PrivateSessionID, offset string) (session *SessionToPersist, ok bool)

func (*RedisStreamAdapter) ServerCount

func (a *RedisStreamAdapter) ServerCount() int

func (*RedisStreamAdapter) ServerSideEmit

func (a *RedisStreamAdapter) ServerSideEmit(header *parser.PacketHeader, v []any)

func (*RedisStreamAdapter) SocketRooms

func (a *RedisStreamAdapter) SocketRooms(sid SocketID) (rooms mapset.Set[Room], ok bool)

The return value 'rooms' must be a thread safe mapset.Set.

func (*RedisStreamAdapter) Sockets

func (a *RedisStreamAdapter) Sockets(rooms mapset.Set[Room]) (sids mapset.Set[SocketID])

The return value 'sids' must be a thread safe mapset.Set.

type RedisStreamBuffer

type RedisStreamBuffer [][]byte

func (RedisStreamBuffer) MarshalBinary

func (b RedisStreamBuffer) MarshalBinary() ([]byte, error)

func (*RedisStreamBuffer) UnmarshalJSON

func (b *RedisStreamBuffer) UnmarshalJSON(data []byte) error

type RedisStreamMessage

type RedisStreamMessage struct {
	SessionId string
	Header    *parser.PacketHeader
	Buffers   [][]byte
	Opts      *BroadcastOptions
	EmittedAt time.Time
}

func (*RedisStreamMessage) Parse

func (m *RedisStreamMessage) Parse(msg redis.XMessage) error

func (RedisStreamMessage) ToStreamData

func (m RedisStreamMessage) ToStreamData() map[string]interface{}

type RedisStreamsAdapterOptions

type RedisStreamsAdapterOptions struct {
	StreamName            string
	MaxLength             int64
	ReadCount             int64
	SessionKeyPrefix      string
	MaxDisconnectDuration time.Duration
}

type Room

type Room string

type SessionToPersist

type SessionToPersist struct {
	SID SocketID
	PID PrivateSessionID

	Rooms []Room

	MissedPackets []*PersistedPacket
}

func (SessionToPersist) MarshalBinary

func (s SessionToPersist) MarshalBinary() ([]byte, error)

func (*SessionToPersist) UnmarshalBinary

func (s *SessionToPersist) UnmarshalBinary(data []byte) error

type Socket

type Socket interface {
	ID() SocketID

	// Join room(s)
	Join(room ...Room)
	// Leave a room
	Leave(room Room)

	// Emit a message.
	// If you want to emit a binary data, use sio.Binary instead of []byte.
	Emit(eventName string, v ...any)

	// Sets a modifier for a subsequent event emission that the event
	// will only be broadcast to clients that have joined the given room.
	//
	// To emit to multiple rooms, you can call To several times.
	To(room ...Room) *BroadcastOperator

	// Alias of To(...)
	In(room ...Room) *BroadcastOperator

	// Sets a modifier for a subsequent event emission that the event
	// will only be broadcast to clients that have not joined the given rooms.
	Except(room ...Room) *BroadcastOperator

	// Sets a modifier for a subsequent event emission that
	// the event data will only be broadcast to every sockets but the sender.
	Broadcast() *BroadcastOperator

	// Disconnect from namespace.
	//
	// If `close` is true, all namespaces are going to be disconnected (a DISCONNECT packet will be sent),
	// and the underlying Engine.IO connection will be terminated.
	//
	// If `close` is false, only the current namespace will be disconnected (a DISCONNECT packet will be sent),
	// and the underlying Engine.IO connection will be kept open.
	Disconnect(close bool)
}

type SocketID

type SocketID string

A public ID, sent by the server at the beginning of the Socket.IO session and which can be used for private messaging.

type SocketStore

type SocketStore interface {
	// Send Engine.IO packets to a specific socket.
	SendBuffers(sid SocketID, buffers [][]byte) (ok bool)

	Get(sid SocketID) (so Socket, ok bool)
	GetAll() []Socket

	Remove(sid SocketID)
}

type TestSocket

type TestSocket struct {
	Rooms     []Room
	Connected bool
	// contains filtered or unexported fields
}

func NewTestSocket

func NewTestSocket(id SocketID) *TestSocket

func (*TestSocket) Broadcast

func (s *TestSocket) Broadcast() *BroadcastOperator

func (*TestSocket) Disconnect

func (s *TestSocket) Disconnect(close bool)

func (*TestSocket) Emit

func (s *TestSocket) Emit(eventName string, v ...any)

func (*TestSocket) Except

func (s *TestSocket) Except(room ...Room) *BroadcastOperator

func (*TestSocket) ID

func (s *TestSocket) ID() SocketID

func (*TestSocket) In

func (s *TestSocket) In(room ...Room) *BroadcastOperator

func (*TestSocket) Join

func (s *TestSocket) Join(room ...Room)

func (*TestSocket) Leave

func (s *TestSocket) Leave(room Room)

func (*TestSocket) To

func (s *TestSocket) To(room ...Room) *BroadcastOperator

type TestSocketStore

type TestSocketStore struct {
	// contains filtered or unexported fields
}

func NewTestSocketStore

func NewTestSocketStore() *TestSocketStore

func (*TestSocketStore) Get

func (s *TestSocketStore) Get(sid SocketID) (so Socket, ok bool)

func (*TestSocketStore) GetAll

func (s *TestSocketStore) GetAll() []Socket

func (*TestSocketStore) Remove

func (s *TestSocketStore) Remove(sid SocketID)

func (*TestSocketStore) SendBuffers

func (s *TestSocketStore) SendBuffers(sid SocketID, buffers [][]byte) (ok bool)

func (*TestSocketStore) Set

func (s *TestSocketStore) Set(so Socket)

func (*TestSocketStore) SetSendBuffers

func (s *TestSocketStore) SetSendBuffers(sendBuffers func(sid SocketID, buffers [][]byte) (ok bool))

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL