utils

package
v0.0.0-...-2d8ace7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2023 License: Apache-2.0 Imports: 36 Imported by: 9

Documentation

Index

Constants

View Source
const (
	RoomPrefix         = "RM_"
	NodePrefix         = "ND_"
	ParticipantPrefix  = "PA_"
	TrackPrefix        = "TR_"
	APIKeyPrefix       = "API"
	EgressPrefix       = "EG_"
	IngressPrefix      = "IN_"
	RPCPrefix          = "RPC_"
	WHIPResourcePrefix = "WH_"
)

Variables

View Source
var (
	PromMessageBusCounter = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "livekit",
			Subsystem: "messagebus",
			Name:      "messages",
		},
		[]string{"type", "status"},
	)
)

Functions

func EnableLockTracker

func EnableLockTracker()

EnableLockTracker enable lock tracking background worker. This should be called during init

func GetMimeTypeForAudioCodec

func GetMimeTypeForAudioCodec(codec livekit.AudioCodec) string

func GetMimeTypeForVideoCodec

func GetMimeTypeForVideoCodec(codec livekit.VideoCodec) string

func HashedID

func HashedID(id string) string

Creates a hashed ID from a unique string

func LocalNodeID

func LocalNodeID() (string, error)

func MarshalGuid

func MarshalGuid[T livekit.Guid](id T) livekit.GuidBlock

func NewGuid

func NewGuid(prefix string) string

func NumMutexes

func NumMutexes() int

func ParallelExec

func ParallelExec[T any](vals []T, parallelThreshold, step uint64, fn func(T))

ParallelExec will executes the given function with each element of vals, if len(vals) >= parallelThreshold, will execute them in parallel, with the given step size. So fn must be thread-safe.

func RandomSecret

func RandomSecret() string

func UnmarshalGuid

func UnmarshalGuid[T livekit.Guid](b livekit.GuidBlock) T

Types

type CPUStats

type CPUStats struct {
	// contains filtered or unexported fields
}

func NewCPUStats

func NewCPUStats(updateCallback func(idle float64)) (*CPUStats, error)

func (*CPUStats) GetCPUIdle

func (c *CPUStats) GetCPUIdle() float64

func (*CPUStats) NumCPU

func (c *CPUStats) NumCPU() int

func (*CPUStats) Stop

func (c *CPUStats) Stop()

type ErrArray

type ErrArray struct {
	// contains filtered or unexported fields
}

func (*ErrArray) AppendErr

func (e *ErrArray) AppendErr(err error)

func (*ErrArray) ToError

func (e *ErrArray) ToError() psrpc.Error

type EventEmitter

type EventEmitter[K comparable, V any] struct {
	// contains filtered or unexported fields
}

func NewDefaultEventEmitter

func NewDefaultEventEmitter[K comparable, V any]() *EventEmitter[K, V]

func NewEventEmitter

func NewEventEmitter[K comparable, V any](params EventEmitterParams) *EventEmitter[K, V]

func (*EventEmitter[K, V]) Emit

func (e *EventEmitter[K, V]) Emit(k K, v V)

func (*EventEmitter[K, V]) Observe

func (e *EventEmitter[K, V]) Observe(k K) EventObserver[V]

func (*EventEmitter[K, V]) ObservedKeys

func (e *EventEmitter[K, V]) ObservedKeys() []K

type EventEmitterParams

type EventEmitterParams struct {
	QueueSize int
	Logger    logger.Logger
}

type EventObserver

type EventObserver[V any] struct {
	// contains filtered or unexported fields
}

func NewEventObserver

func NewEventObserver[V any](stopFunc func()) (EventObserver[V], func(v V))

func (EventObserver[V]) Events

func (o EventObserver[V]) Events() <-chan V

func (EventObserver[V]) Stop

func (o EventObserver[V]) Stop()

type FlowGraph

type FlowGraph struct {
	// contains filtered or unexported fields
}

func NewFlowGraph

func NewFlowGraph(n int64) FlowGraph

func (*FlowGraph) AddEdge

func (g *FlowGraph) AddEdge(s, t, cap, cost int64)

type Graph

type Graph[K comparable, N GraphNodeProps[K], E GraphEdgeProps] struct {
	// contains filtered or unexported fields
}

func NewGraph

func NewGraph[K comparable, N GraphNodeProps[K], E GraphEdgeProps]() *Graph[K, N, E]

func (*Graph[K, N, E]) DeleteEdge

func (g *Graph[K, N, E]) DeleteEdge(src, dst K)

func (*Graph[K, N, E]) Edge

func (g *Graph[K, N, E]) Edge(src, dst K) (p E, ok bool)

func (*Graph[K, N, E]) InEdges

func (g *Graph[K, N, E]) InEdges(dst K) map[K]E

func (*Graph[K, N, E]) InsertEdge

func (g *Graph[K, N, E]) InsertEdge(src, dst K, props E)

func (*Graph[K, N, E]) InsertNode

func (g *Graph[K, N, E]) InsertNode(props N)

func (*Graph[K, N, E]) Node

func (g *Graph[K, N, E]) Node(id K) N

func (*Graph[K, N, E]) OutEdges

func (g *Graph[K, N, E]) OutEdges(src K) map[K]E

func (*Graph[K, N, E]) ShortestPath

func (g *Graph[K, N, E]) ShortestPath(src, dst K) ([]N, int64)

func (*Graph[K, N, E]) Size

func (g *Graph[K, N, E]) Size() int

func (*Graph[K, N, E]) TopologicalSort

func (g *Graph[K, N, E]) TopologicalSort() []N

type GraphEdge

type GraphEdge[N, E any] struct {
	// contains filtered or unexported fields
}

type GraphEdgeProps

type GraphEdgeProps interface {
	Length() int64
}

type GraphNode

type GraphNode[T any] struct {
	// contains filtered or unexported fields
}

type GraphNodeProps

type GraphNodeProps[K comparable] interface {
	ID() K
}

type MessageBus

type MessageBus interface {
	Subscribe(ctx context.Context, channel string) (PubSub, error)
	// SubscribeQueue is like subscribe, but ensuring only a single instance gets to process the message
	SubscribeQueue(ctx context.Context, channel string) (PubSub, error)
	Publish(ctx context.Context, channel string, msg proto.Message) error
}

func NewRedisMessageBus

func NewRedisMessageBus(rc redis.UniversalClient) MessageBus

type MinCostMaxFlow

type MinCostMaxFlow struct {
	// contains filtered or unexported fields
}

func (*MinCostMaxFlow) ComputeMaxFlow

func (f *MinCostMaxFlow) ComputeMaxFlow(g FlowGraph, s, t int64) (flow, cost int64)

func (*MinCostMaxFlow) Flow

func (f *MinCostMaxFlow) Flow(s, t int64) int64

type Mutex

type Mutex struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*Mutex) Lock

func (m *Mutex) Lock()

func (*Mutex) Unlock

func (m *Mutex) Unlock()

type ProtoProxy

type ProtoProxy[T proto.Message] struct {
	// contains filtered or unexported fields
}

ProtoProxy is a caching proxy for protobuf messages that may be expensive to compute. It is used to avoid unnecessary re-generation of Protobufs

func NewProtoProxy

func NewProtoProxy[T proto.Message](refreshInterval time.Duration, updateFn func() T) *ProtoProxy[T]

NewProtoProxy creates a new ProtoProxy that regenerates underlying values at a cadence of refreshInterval this should be used for updates that should be sent periodically, but does not have the urgency of immediate delivery updateFn should provide computations required to generate the protobuf if refreshInterval is 0, then proxy will only update on MarkDirty(true)

func (*ProtoProxy[T]) Get

func (p *ProtoProxy[T]) Get() T

func (*ProtoProxy[T]) MarkDirty

func (p *ProtoProxy[T]) MarkDirty(immediate bool)

func (*ProtoProxy[T]) Stop

func (p *ProtoProxy[T]) Stop()

func (*ProtoProxy[T]) Updated

func (p *ProtoProxy[T]) Updated() <-chan struct{}

type PubSub

type PubSub interface {
	Channel() <-chan interface{}
	Payload(msg interface{}) []byte
	Close() error
}

type RWMutex

type RWMutex struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*RWMutex) Lock

func (m *RWMutex) Lock()

func (*RWMutex) RLock

func (m *RWMutex) RLock()

func (*RWMutex) RUnlock

func (m *RWMutex) RUnlock()

func (*RWMutex) Unlock

func (m *RWMutex) Unlock()

type RedisMessageBus

type RedisMessageBus struct {
	// contains filtered or unexported fields
}

func (*RedisMessageBus) Lock

func (r *RedisMessageBus) Lock(ctx context.Context, key string, expiration time.Duration) (bool, error)

func (*RedisMessageBus) Publish

func (r *RedisMessageBus) Publish(ctx context.Context, channel string, msg proto.Message) error

func (*RedisMessageBus) Subscribe

func (r *RedisMessageBus) Subscribe(ctx context.Context, channel string) (PubSub, error)

func (*RedisMessageBus) SubscribeQueue

func (r *RedisMessageBus) SubscribeQueue(ctx context.Context, channel string) (PubSub, error)

type RedisPubSub

type RedisPubSub struct {
	// contains filtered or unexported fields
}

func (*RedisPubSub) Channel

func (r *RedisPubSub) Channel() <-chan interface{}

func (*RedisPubSub) Close

func (r *RedisPubSub) Close() error

func (*RedisPubSub) Payload

func (r *RedisPubSub) Payload(msg interface{}) []byte

type StuckLock

type StuckLock struct {
	// contains filtered or unexported fields
}

func ScanTrackedLocks

func ScanTrackedLocks(threshold time.Duration) []*StuckLock

ScanTrackedLocks check all lock trackers

func ScanTrackedLocksI

func ScanTrackedLocksI(threshold time.Duration, n int) []*StuckLock

ScanTrackedLocksI check lock trackers incrementally n at a time

func (*StuckLock) FirstLockedAtStack

func (d *StuckLock) FirstLockedAtStack() string

func (*StuckLock) HeldSince

func (d *StuckLock) HeldSince() time.Time

func (*StuckLock) NumGoroutineHeld

func (d *StuckLock) NumGoroutineHeld() int

func (*StuckLock) NumGoroutineWaiting

func (d *StuckLock) NumGoroutineWaiting() int

type TimedVersion

type TimedVersion struct {
	// contains filtered or unexported fields
}

func NewTimedVersionFromProto

func NewTimedVersionFromProto(proto *livekit.TimedVersion) *TimedVersion

func NewTimedVersionFromTime

func NewTimedVersionFromTime(t time.Time) *TimedVersion

func TimedVersionFromProto

func TimedVersionFromProto(proto *livekit.TimedVersion) TimedVersion

func TimedVersionFromTime

func TimedVersionFromTime(t time.Time) TimedVersion

func (*TimedVersion) After

func (t *TimedVersion) After(other *TimedVersion) bool

func (*TimedVersion) Compare

func (t *TimedVersion) Compare(other *TimedVersion) int

func (*TimedVersion) IsZero

func (t *TimedVersion) IsZero() bool

func (*TimedVersion) Load

func (t *TimedVersion) Load() TimedVersion

func (*TimedVersion) Store

func (t *TimedVersion) Store(other *TimedVersion)

func (*TimedVersion) String

func (t *TimedVersion) String() string

func (*TimedVersion) Time

func (t *TimedVersion) Time() time.Time

func (*TimedVersion) ToProto

func (t *TimedVersion) ToProto() *livekit.TimedVersion

func (*TimedVersion) Update

func (t *TimedVersion) Update(other *TimedVersion) bool

type TimedVersionGenerator

type TimedVersionGenerator interface {
	New() *TimedVersion
	Next() TimedVersion
}

func NewDefaultTimedVersionGenerator

func NewDefaultTimedVersionGenerator() TimedVersionGenerator

type WorkerGroup

type WorkerGroup struct {
	// contains filtered or unexported fields
}

func (*WorkerGroup) Go

func (w *WorkerGroup) Go(fn func())

func (*WorkerGroup) Wait

func (w *WorkerGroup) Wait()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL