routing

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2022 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// hash of node_id => Node proto
	NodesKey = "nodes"

	// hash of room_name => node_id
	NodeRoomKey = "room_node_map"
)
View Source
const DefaultMessageChannelSize = 200

Variables

View Source
var (
	ErrNotFound             = errors.New("could not find object")
	ErrIPNotSet             = errors.New("ip address is required and not set")
	ErrHandlerNotDefined    = errors.New("handler not defined")
	ErrIncorrectRTCNode     = errors.New("current node isn't the RTC node for the room")
	ErrNodeNotFound         = errors.New("could not locate the node")
	ErrNodeLimitReached     = errors.New("reached configured limit for node")
	ErrInvalidRouterMessage = errors.New("invalid router message")
	ErrChannelClosed        = errors.New("channel closed")
	ErrChannelFull          = errors.New("channel is full")
)

Functions

This section is empty.

Types

type LocalNode

type LocalNode *livekit.Node

func NewLocalNode

func NewLocalNode(conf *config.Config) (LocalNode, error)

type LocalRouter

type LocalRouter struct {
	// contains filtered or unexported fields
}

a router of messages on the same node, basic implementation for local testing

func NewLocalRouter

func NewLocalRouter(currentNode LocalNode) *LocalRouter

func (*LocalRouter) ClearRoomState

func (r *LocalRouter) ClearRoomState(_ context.Context, _ livekit.RoomName) error

func (*LocalRouter) Drain

func (r *LocalRouter) Drain()

func (*LocalRouter) GetNode

func (r *LocalRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)

func (*LocalRouter) GetNodeForRoom

func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error)

func (*LocalRouter) GetRegion

func (r *LocalRouter) GetRegion() string

func (*LocalRouter) ListNodes

func (r *LocalRouter) ListNodes() ([]*livekit.Node, error)

func (*LocalRouter) OnNewParticipantRTC

func (r *LocalRouter) OnNewParticipantRTC(callback NewParticipantCallback)

func (*LocalRouter) OnRTCMessage

func (r *LocalRouter) OnRTCMessage(callback RTCMessageCallback)

func (*LocalRouter) RegisterNode

func (r *LocalRouter) RegisterNode() error

func (*LocalRouter) RemoveDeadNodes

func (r *LocalRouter) RemoveDeadNodes() error

func (*LocalRouter) SetNodeForRoom

func (r *LocalRouter) SetNodeForRoom(_ context.Context, _ livekit.RoomName, _ livekit.NodeID) error

func (*LocalRouter) Start

func (r *LocalRouter) Start() error

func (*LocalRouter) StartParticipantSignal

func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error)

func (*LocalRouter) Stop

func (r *LocalRouter) Stop()

func (*LocalRouter) UnregisterNode

func (r *LocalRouter) UnregisterNode() error

func (*LocalRouter) WriteNodeRTC

func (r *LocalRouter) WriteNodeRTC(_ context.Context, _ string, msg *livekit.RTCNodeMessage) error

func (*LocalRouter) WriteParticipantRTC

func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error

func (*LocalRouter) WriteRoomRTC

func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error

type MessageChannel

type MessageChannel struct {
	// contains filtered or unexported fields
}

func NewMessageChannel

func NewMessageChannel(size int) *MessageChannel

func (*MessageChannel) Close

func (m *MessageChannel) Close()

func (*MessageChannel) IsClosed

func (m *MessageChannel) IsClosed() bool

func (*MessageChannel) OnClose

func (m *MessageChannel) OnClose(f func())

func (*MessageChannel) ReadChan

func (m *MessageChannel) ReadChan() <-chan proto.Message

func (*MessageChannel) WriteMessage

func (m *MessageChannel) WriteMessage(msg proto.Message) error

type MessageRouter

type MessageRouter interface {
	// StartParticipantSignal participant signal connection is ready to start
	StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error)

	// Write a message to a participant or room
	WriteParticipantRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error
	WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error
}

type MessageSink

type MessageSink interface {
	WriteMessage(msg proto.Message) error
	Close()
}

MessageSink is an abstraction for writing protobuf messages and having them read by a MessageSource, potentially on a different node via a transport

type MessageSource

type MessageSource interface {
	// ReadChan exposes a one way channel to make it easier to use with select
	ReadChan() <-chan proto.Message
	Close()
}

type NewParticipantCallback

type NewParticipantCallback func(
	ctx context.Context,
	roomName livekit.RoomName,
	pi ParticipantInit,
	requestSource MessageSource,
	responseSink MessageSink,
) error

type ParticipantInit

type ParticipantInit struct {
	Identity       livekit.ParticipantIdentity
	Name           livekit.ParticipantName
	Reconnect      bool
	AutoSubscribe  bool
	Client         *livekit.ClientInfo
	Grants         *auth.ClaimGrants
	Region         string
	AdaptiveStream bool
	ID             livekit.ParticipantID
}

func ParticipantInitFromStartSession

func ParticipantInitFromStartSession(ss *livekit.StartSession, region string) (*ParticipantInit, error)

func (*ParticipantInit) ToStartSession

func (pi *ParticipantInit) ToStartSession(roomName livekit.RoomName, connectionID livekit.ConnectionID) (*livekit.StartSession, error)

type RTCMessageCallback

type RTCMessageCallback func(
	ctx context.Context,
	roomName livekit.RoomName,
	identity livekit.ParticipantIdentity,
	msg *livekit.RTCNodeMessage,
)

type RTCNodeSink

type RTCNodeSink struct {
	// contains filtered or unexported fields
}

func NewRTCNodeSink

func NewRTCNodeSink(rc redis.UniversalClient, nodeID livekit.NodeID, participantKey livekit.ParticipantKey) *RTCNodeSink

func (*RTCNodeSink) Close

func (s *RTCNodeSink) Close()

func (*RTCNodeSink) OnClose

func (s *RTCNodeSink) OnClose(f func())

func (*RTCNodeSink) WriteMessage

func (s *RTCNodeSink) WriteMessage(msg proto.Message) error

type RedisRouter

type RedisRouter struct {
	LocalRouter
	// contains filtered or unexported fields
}

RedisRouter uses Redis pub/sub to route signaling messages across different nodes It relies on the RTC node to be the primary driver of the participant connection. Because

func NewRedisRouter

func NewRedisRouter(currentNode LocalNode, rc redis.UniversalClient) *RedisRouter

func (*RedisRouter) ClearRoomState

func (r *RedisRouter) ClearRoomState(_ context.Context, roomName livekit.RoomName) error

func (*RedisRouter) Drain

func (r *RedisRouter) Drain()

func (*RedisRouter) GetNode

func (r *RedisRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)

func (*RedisRouter) GetNodeForRoom

func (r *RedisRouter) GetNodeForRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Node, error)

func (*RedisRouter) ListNodes

func (r *RedisRouter) ListNodes() ([]*livekit.Node, error)

func (*RedisRouter) RegisterNode

func (r *RedisRouter) RegisterNode() error

func (*RedisRouter) RemoveDeadNodes

func (r *RedisRouter) RemoveDeadNodes() error

func (*RedisRouter) SetNodeForRoom

func (r *RedisRouter) SetNodeForRoom(_ context.Context, roomName livekit.RoomName, nodeID livekit.NodeID) error

func (*RedisRouter) Start

func (r *RedisRouter) Start() error

func (*RedisRouter) StartParticipantSignal

func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error)

StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue

func (*RedisRouter) Stop

func (r *RedisRouter) Stop()

func (*RedisRouter) UnregisterNode

func (r *RedisRouter) UnregisterNode() error

func (*RedisRouter) WriteNodeRTC

func (r *RedisRouter) WriteNodeRTC(_ context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error

func (*RedisRouter) WriteParticipantRTC

func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error

func (*RedisRouter) WriteRoomRTC

func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error

type Router

type Router interface {
	MessageRouter

	RegisterNode() error
	UnregisterNode() error
	RemoveDeadNodes() error

	ListNodes() ([]*livekit.Node, error)

	GetNodeForRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Node, error)
	SetNodeForRoom(ctx context.Context, roomName livekit.RoomName, nodeId livekit.NodeID) error
	ClearRoomState(ctx context.Context, roomName livekit.RoomName) error

	GetRegion() string

	Start() error
	Drain()
	Stop()

	// OnNewParticipantRTC is called to start a new participant's RTC connection
	OnNewParticipantRTC(callback NewParticipantCallback)

	// OnRTCMessage is called to execute actions on the RTC node
	OnRTCMessage(callback RTCMessageCallback)
}

Router allows multiple nodes to coordinate the participant session

func CreateRouter

func CreateRouter(rc redis.UniversalClient, node LocalNode) Router

type SignalNodeSink

type SignalNodeSink struct {
	// contains filtered or unexported fields
}

func NewSignalNodeSink

func NewSignalNodeSink(rc redis.UniversalClient, nodeID livekit.NodeID, connectionID livekit.ConnectionID) *SignalNodeSink

func (*SignalNodeSink) Close

func (s *SignalNodeSink) Close()

func (*SignalNodeSink) OnClose

func (s *SignalNodeSink) OnClose(f func())

func (*SignalNodeSink) WriteMessage

func (s *SignalNodeSink) WriteMessage(msg proto.Message) error

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL