node

package
v0.0.0-...-a3e1a49 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2025 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DISCONNECT_MODE_ALWAYS = "always"
	DISCONNECT_MODE_AUTO   = "auto"
	DISCONNECT_MODE_NEVER  = "never"
)

Variables

Functions

This section is empty.

Types

type AppNode

type AppNode interface {
	HandlePubSub(msg []byte)
	LookupSession(id string) *Session
	Authenticate(s *Session, opts ...AuthOption) (*common.ConnectResult, error)
	Authenticated(s *Session, identifiers string)
	Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
	Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
	Perform(s *Session, msg *common.Message) (*common.CommandResult, error)
	Disconnect(s *Session) error
}

AppNode describes a basic node interface

type AuthOption

type AuthOption = func(*AuthOptions)

func WithDisconnectOnFailure

func WithDisconnectOnFailure(disconnect bool) AuthOption

type AuthOptions

type AuthOptions struct {
	DisconnectOnFailure bool
}

type Config

type Config struct {
	// Define when to invoke Disconnect callback
	DisconnectMode string `toml:"disconnect_mode"`
	// The number of goroutines to use for disconnect calls on shutdown
	ShutdownDisconnectPoolSize int `toml:"shutdown_disconnect_gopool_size"`
	// How often server should send Action Cable ping messages (seconds)
	PingInterval int `toml:"ping_interval"`
	// How ofter to refresh node stats (seconds)
	StatsRefreshInterval int `toml:"stats_refresh_interval"`
	// The max size of the Go routines pool for hub
	HubGopoolSize int `toml:"broadcast_gopool_size"`
	// How should ping message timestamp be formatted? ('s' => seconds, 'ms' => milli seconds, 'ns' => nano seconds)
	PingTimestampPrecision string `toml:"ping_timestamp_precision"`
	// For how long to wait for pong message before disconnecting (seconds)
	PongTimeout int `toml:"pong_timeout"`
	// For how long to wait for disconnect callbacks to be processed before exiting (seconds)
	ShutdownTimeout int `toml:"shutdown_timeout"`
}

Config contains general application/node settings

func NewConfig

func NewConfig() Config

NewConfig builds a new config

func (Config) ToToml

func (c Config) ToToml() string

type Connection

type Connection interface {
	Write(msg []byte, deadline time.Time) error
	WriteBinary(msg []byte, deadline time.Time) error
	Read() ([]byte, error)
	Close(code int, reason string)
}

Connection represents underlying connection

type Controller

type Controller interface {
	Start() error
	Shutdown() error
	Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error)
	Subscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)
	Unsubscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)
	Perform(sid string, env *common.SessionEnv, ids string, channel string, data string) (*common.CommandResult, error)
	Disconnect(sid string, env *common.SessionEnv, ids string, subscriptions []string) error
}

Controller is an interface describing business-logic handler (e.g. RPC)

type DisconnectQueue

type DisconnectQueue struct {
	// contains filtered or unexported fields
}

DisconnectQueue is a rate-limited executor

func NewDisconnectQueue

func NewDisconnectQueue(node *Node, config *DisconnectQueueConfig, l *slog.Logger) *DisconnectQueue

NewDisconnectQueue builds new queue with a specified rate (max calls per second)

func (*DisconnectQueue) Enqueue

func (d *DisconnectQueue) Enqueue(s *Session) error

Enqueue adds session to the disconnect queue

func (*DisconnectQueue) Run

func (d *DisconnectQueue) Run() error

Run starts queue

func (*DisconnectQueue) Shutdown

func (d *DisconnectQueue) Shutdown(ctx context.Context) error

Shutdown stops throttling and makes requests one by one

func (*DisconnectQueue) Size

func (d *DisconnectQueue) Size() int

Size returns the number of enqueued tasks

type DisconnectQueueConfig

type DisconnectQueueConfig struct {
	// Limit the number of Disconnect RPC calls per second
	Rate int
	// The size of the channel's buffer for disconnect requests
	Backlog int
	// How much time wait to call all enqueued calls at exit (in seconds) [DEPREACTED]
	ShutdownTimeout int
}

DisconnectQueueConfig contains DisconnectQueue configuration

func NewDisconnectQueueConfig

func NewDisconnectQueueConfig() DisconnectQueueConfig

NewDisconnectQueueConfig builds a new config

func (DisconnectQueueConfig) ToToml

func (c DisconnectQueueConfig) ToToml() string

type Disconnector

type Disconnector interface {
	Run() error
	Shutdown(ctx context.Context) error
	Enqueue(*Session) error
	Size() int
}

Disconnector is an interface for disconnect queue implementation

type Executor

type Executor interface {
	HandleCommand(*Session, *common.Message) error
	Disconnect(*Session) error
}

Executor handles incoming commands (messages)

type InlineDisconnector

type InlineDisconnector struct {
	// contains filtered or unexported fields
}

InlineDisconnector performs Disconnect calls synchronously

func NewInlineDisconnector

func NewInlineDisconnector(n *Node) *InlineDisconnector

NewInlineDisconnector returns new InlineDisconnector

func (*InlineDisconnector) Enqueue

func (d *InlineDisconnector) Enqueue(s *Session) error

Enqueue disconnects session immediately

func (*InlineDisconnector) Run

func (d *InlineDisconnector) Run() error

Run does nothing

func (*InlineDisconnector) Shutdown

func (d *InlineDisconnector) Shutdown(ctx context.Context) error

Shutdown does nothing

func (*InlineDisconnector) Size

func (d *InlineDisconnector) Size() int

Size returns 0

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node represents the whole application

func NewNode

func NewNode(config *Config, opts ...NodeOption) *Node

NewNode builds new node struct

func (*Node) Authenticate

func (n *Node) Authenticate(s *Session, options ...AuthOption) (*common.ConnectResult, error)

Authenticate calls controller to perform authentication. If authentication is successful, session is registered with a hub.

func (*Node) Authenticated

func (n *Node) Authenticated(s *Session, ids string)

Mark session as authenticated and register it with a hub. Useful when you perform authentication manually, not using a controller.

func (*Node) Broadcast

func (n *Node) Broadcast(msg *common.StreamMessage)

Broadcast message to stream (locally)

func (*Node) Disconnect

func (n *Node) Disconnect(s *Session) error

Disconnect adds session to disconnector queue and unregister session from hub

func (*Node) DisconnectNow

func (n *Node) DisconnectNow(s *Session) error

DisconnectNow execute disconnect on controller

func (*Node) ExecuteRemoteCommand

func (n *Node) ExecuteRemoteCommand(msg *common.RemoteCommandMessage)

Execute remote command (locally)

func (*Node) HandleBroadcast

func (n *Node) HandleBroadcast(raw []byte)

HandleBroadcast parses incoming broadcast message, record it and re-transmit to other nodes

func (*Node) HandleCommand

func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error)

HandleCommand parses incoming message from client and execute the command (if recognized)

func (*Node) HandlePubSub

func (n *Node) HandlePubSub(raw []byte)

HandlePubSub parses incoming pubsub message and broadcast it to all clients (w/o using a broker)

func (*Node) History

func (n *Node) History(s *Session, msg *common.Message) error

History fetches the stream history for the specified identifier

func (*Node) ID

func (n *Node) ID() string

ID returns node identifier

func (*Node) Instrumenter

func (n *Node) Instrumenter() metrics.Instrumenter

Return current instrumenter for the node

func (*Node) IsShuttingDown

func (n *Node) IsShuttingDown() bool

func (*Node) LookupSession

func (n *Node) LookupSession(id string) *Session

func (*Node) Perform

func (n *Node) Perform(s *Session, msg *common.Message) (*common.CommandResult, error)

Perform executes client command

func (*Node) Presence

func (n *Node) Presence(s *Session, msg *common.Message) error

Presence returns the presence information for the specified identifier

func (*Node) PresenceJoin

func (n *Node) PresenceJoin(s *Session, msg *common.Message) error

PresenceJoin adds the session to the presence state for the specified identifier

func (*Node) PresenceLeave

func (n *Node) PresenceLeave(s *Session, msg *common.Message) error

PresenceLeave removes the session to the presence state for the specified identifier

func (*Node) RemoteDisconnect

func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)

RemoteDisconnect find a session by identifier and closes it

func (*Node) SetBroker

func (n *Node) SetBroker(b broker.Broker)

func (*Node) SetDisconnector

func (n *Node) SetDisconnector(d Disconnector)

SetDisconnector set disconnector for the node

func (*Node) Shutdown

func (n *Node) Shutdown(ctx context.Context) (err error)

Shutdown stops all services (hub, controller)

func (*Node) Size

func (n *Node) Size() int

func (*Node) Start

func (n *Node) Start() error

Start runs all the required goroutines

func (*Node) Subscribe

func (n *Node) Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error)

Subscribe subscribes session to a channel

func (*Node) TryRestoreSession

func (n *Node) TryRestoreSession(s *Session) (restored bool)

func (*Node) Unsubscribe

func (n *Node) Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error)

Unsubscribe unsubscribes session from a channel

func (*Node) Whisper

func (n *Node) Whisper(s *Session, msg *common.Message) error

Whisper broadcasts the message to the specified whispering stream to all clients except the sender

type NodeOption

type NodeOption = func(*Node)

func WithController

func WithController(c Controller) NodeOption

func WithID

func WithID(id string) NodeOption

func WithInstrumenter

func WithInstrumenter(i metrics.Instrumenter) NodeOption

func WithLogger

func WithLogger(l *slog.Logger) NodeOption

type NoopDisconnectQueue

type NoopDisconnectQueue struct{}

NoopDisconnectQueue is non-operational disconnect queue implementation

func NewNoopDisconnector

func NewNoopDisconnector() *NoopDisconnectQueue

NewNoopDisconnector returns new NoopDisconnectQueue

func (*NoopDisconnectQueue) Enqueue

func (d *NoopDisconnectQueue) Enqueue(s *Session) error

Enqueue does nothing

func (*NoopDisconnectQueue) Run

func (d *NoopDisconnectQueue) Run() error

Run does nothing

func (*NoopDisconnectQueue) Shutdown

func (d *NoopDisconnectQueue) Shutdown(ctx context.Context) error

Shutdown does nothing

func (*NoopDisconnectQueue) Size

func (d *NoopDisconnectQueue) Size() int

Size returns 0

type NullController

type NullController struct {
	// contains filtered or unexported fields
}

func NewNullController

func NewNullController(l *slog.Logger) *NullController

func (*NullController) Authenticate

func (c *NullController) Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error)

func (*NullController) Disconnect

func (c *NullController) Disconnect(sid string, env *common.SessionEnv, ids string, subscriptions []string) error

func (*NullController) Perform

func (c *NullController) Perform(sid string, env *common.SessionEnv, ids string, channel string, data string) (*common.CommandResult, error)

func (*NullController) Shutdown

func (c *NullController) Shutdown() (err error)

func (*NullController) Start

func (c *NullController) Start() (err error)

func (*NullController) Subscribe

func (c *NullController) Subscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)

func (*NullController) Unsubscribe

func (c *NullController) Unsubscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)

type Session

type Session struct {
	Connected bool
	// Could be used to store arbitrary data within a session
	InternalState map[string]interface{}
	Log           *slog.Logger
	// contains filtered or unexported fields
}

Session represents active client

func NewSession

func NewSession(node *Node, conn Connection, url string, headers *map[string]string, uid string, opts ...SessionOption) *Session

NewSession build a new Session struct from ws connetion and http request

func (*Session) AuthenticateOnConnect

func (s *Session) AuthenticateOnConnect() bool

func (*Session) Disconnect

func (s *Session) Disconnect(reason string, code int)

Disconnect schedules connection disconnect

func (*Session) DisconnectNow

func (s *Session) DisconnectNow(reason string, code int)

func (*Session) DisconnectWithMessage

func (s *Session) DisconnectWithMessage(msg encoders.EncodedMessage, code string)

func (*Session) GetEnv

func (s *Session) GetEnv() *common.SessionEnv

func (*Session) GetID

func (s *Session) GetID() string

func (*Session) GetIdentifiers

func (s *Session) GetIdentifiers() string

func (*Session) IsClosed

func (s *Session) IsClosed() bool

func (*Session) IsConnected

func (s *Session) IsConnected() bool

func (*Session) IsDisconnectable

func (s *Session) IsDisconnectable() bool

func (*Session) IsResumeable

func (s *Session) IsResumeable() bool

func (*Session) MarkDisconnectable

func (s *Session) MarkDisconnectable(val bool)

func (*Session) MergeEnv

func (s *Session) MergeEnv(env *common.SessionEnv)

Merge connection and channel states into current env. This method locks the state for writing (so, goroutine-safe)

func (*Session) PrevSid

func (s *Session) PrevSid() string

func (*Session) ReadInternalState

func (s *Session) ReadInternalState(key string) (interface{}, bool)

ReadInternalState reads internal state value by key

func (*Session) ReadMessage

func (s *Session) ReadMessage(message []byte) error

ReadMessage reads messages from ws connection and send them to node

func (*Session) RestoreFromCache

func (s *Session) RestoreFromCache(cached []byte) error

func (*Session) Send

func (s *Session) Send(msg encoders.EncodedMessage)

Send schedules a data transmission

func (*Session) SendJSONTransmission

func (s *Session) SendJSONTransmission(msg string)

SendJSONTransmission is used to propagate the direct transmission to the client (from RPC call result)

func (*Session) SendMessages

func (s *Session) SendMessages()

SendMessages waits for incoming messages and send them to the client connection

func (*Session) Serve

func (s *Session) Serve(callback func()) error

Serve enters a loop to read incoming data

func (*Session) SetEnv

func (s *Session) SetEnv(env *common.SessionEnv)

func (*Session) SetID

func (s *Session) SetID(id string)

func (*Session) SetIdentifiers

func (s *Session) SetIdentifiers(ids string)

func (*Session) String

func (s *Session) String() string

String returns session string representation (for %v in Printf-like functions)

func (*Session) ToCacheEntry

func (s *Session) ToCacheEntry() ([]byte, error)

func (*Session) UnderlyingConn

func (s *Session) UnderlyingConn() Connection

func (*Session) WriteInternalState

func (s *Session) WriteInternalState(key string, val interface{})

WriteInternalState

type SessionOption

type SessionOption = func(*Session)

func WithEncoder

func WithEncoder(enc encoders.Encoder) SessionOption

WithEncoder allows to set a custom encoder for a session

func WithExecutor

func WithExecutor(ex Executor) SessionOption

WithExecutor allows to set a custom executor for a session

func WithHandshakeMessageDeadline

func WithHandshakeMessageDeadline(deadline time.Time) SessionOption

WithHandshakeMessageDeadline allows to set a custom deadline for handshake messages. This option also indicates that we MUST NOT perform Authenticate on connect.

func WithMetrics

func WithMetrics(m metrics.Instrumenter) SessionOption

WithMetrics allows to set a custom metrics instrumenter for a session

func WithPingInterval

func WithPingInterval(interval time.Duration) SessionOption

WithPingInterval allows to set a custom ping interval for a session or disable pings at all (by passing 0)

func WithPingPrecision

func WithPingPrecision(val string) SessionOption

WithPingPrecision allows to configure precision for timestamps attached to pings

func WithPongTimeout

func WithPongTimeout(timeout time.Duration) SessionOption

WithPongTimeout allows to set a custom pong timeout for a session

func WithPrevSID

func WithPrevSID(sid string) SessionOption

WithPrevSID allows providing the previous session ID to restore from

func WithResumable

func WithResumable(val bool) SessionOption

WithResumable allows marking session as resumable (so we store its state in cache)

type SubscriptionState

type SubscriptionState struct {
	// contains filtered or unexported fields
}

func NewSubscriptionState

func NewSubscriptionState() *SubscriptionState

func (*SubscriptionState) AddChannel

func (st *SubscriptionState) AddChannel(id string)

func (*SubscriptionState) AddChannelStream

func (st *SubscriptionState) AddChannelStream(id string, stream string)

func (*SubscriptionState) Channels

func (st *SubscriptionState) Channels() []string

func (*SubscriptionState) HasChannel

func (st *SubscriptionState) HasChannel(id string) bool

func (*SubscriptionState) RemoveChannel

func (st *SubscriptionState) RemoveChannel(id string)

func (*SubscriptionState) RemoveChannelStream

func (st *SubscriptionState) RemoveChannelStream(id string, stream string)

func (*SubscriptionState) RemoveChannelStreams

func (st *SubscriptionState) RemoveChannelStreams(id string) []string

func (*SubscriptionState) StreamsFor

func (st *SubscriptionState) StreamsFor(id string) []string

func (*SubscriptionState) ToMap

func (st *SubscriptionState) ToMap() map[string][]string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL