streamallocator

package
v1.4.3-dev5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Design of Prober

Probing is used to check for existence of excess channel capacity. This is especially useful in the downstream direction of SFU. SFU forwards audio/video streams from one or more publishers to all the subscribers. But, the downstream channel of a subscriber may not be big enough to carry all the streams. It is also a time varying quantity.

When there is not enough capacity, some streams will be paused. To resume a stream, SFU would need to know that the channel has enough capacity. That's where probing comes in. When conditions are favorable, SFU can send probe packets so that the bandwidth estimator has more data to estimate available channel capacity better. NOTE: What defines `favorable conditions` is implementation dependent.

There are two options for probing

  • Use padding only RTP packets: This one is preferable as probe rate can be controlled more tightly.
  • Resume a paused stream or forward a higher spatial layer: Have to find a stream at probing rate. Also, a stream could get a key frame unexpectedly boosting rate in the probing window.

The strategy used depends on stream allocator implementation. This module can be used if the stream allocator decides to use padding only RTP packets for probing purposes.

Implementation: There are a couple of options

  • Check prober in the forwarding path (pull from prober). This is preferred for scalability reasons. But, this suffers from not being able to probe when all streams are paused (could be due to downstream bandwidth constraints or the corresponding upstream tracks may have paused due to upstream bandwidth constraints). Another issue is not being to have tight control on probing window boundary as the packet forwarding path may not have a packet to forward. But, it should not be a major concern as long as some stream(s) is/are forwarded as there should be a packet at least every 60 ms or so (forwarding only one stream at 15 fps). Usually, it will be serviced much more frequently when there are multiple streams getting forwarded.
  • Run it a go routine. But, that would have to wake up very often to prevent bunching up of probe packets. So, a scalability concern as there is one prober per subscriber peer connection. But, probe windows should be very short (of the order of 100s of ms). So, this approach might be fine.

The implementation here follows the second approach of using a go routine.

Pacing: ------ Ideally, the subscriber peer connection should have a pacer which trickles data out at the estimated channel capacity rate (and estimated channel capacity + probing rate when actively probing).

But, there a few significant challenges

  1. Pacer will require buffering of forwarded packets. That means more memory, more CPU (have to make copy of packets) and more latency in the media stream.
  2. Scalability concern as SFU may be handling hundreds of subscriber peer connections and each one processing the pacing loop at 5ms interval will add up.

So, this module assumes that pacing is inherently provided by the publishers for media streams. That is a reasonable assumption given that publishing clients will run their own pacer and pacing data out at a steady rate.

A further assumption is that if there are multiple publishers for a subscriber peer connection, all the publishers are not pacing in sync, i.e. each publisher's pacer is completely independent and SFU will be receiving the media packets with a good spread and not clumped together.

Given those assumptions, this module monitors media send rate and adjusts probing packet sends accordingly. Although the probing may have a high enough wake up frequency, it is for short windows. For example, probing at 5 Mbps for 1/2 second and sending 1000 byte probe per iteration will wake up every 1.6 ms. That is very high, but should last for 1/2 second or so.

5 Mbps over 1/2 second = 2.5 Mbps
2.5 Mbps = 312500 bytes = 313 probes at 1000 byte probes
313 probes over 1/2 second = 1.6 ms between probes

A few things to note

  1. When a probe cluster is added, the expected media rate is provided. So, the wake-up interval takes that into account. For example, if probing at 5 Mbps for 1/2 second and if 4 Mbps of it is expected to be provided by media traffic, the wake-up interval becomes 8 ms.
  2. The amount of probing should actually be capped at some value to avoid too much self-induced congestion. It maybe something like 500 kbps. That will increase the wake-up interval to 16 ms in the above example.
  3. In practice, the probing interval may also be shorter. Typically, it can be run for 2 - 3 RTTs to get a good measurement. For the longest hauls, RTT could be 250 ms or so leading to the probing window being long(ish). But, RTT should be much shorter especially if the subscriber peer connection of the client is able to connect to the nearest data center.

Index

Constants

View Source
const (
	ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps

	NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate

	ProbeWaitBase      = 5 * time.Second
	ProbeBackoffFactor = 1.5
	ProbeWaitMax       = 30 * time.Second
	ProbeSettleWait    = 250
	ProbeTrendWait     = 2 * time.Second

	ProbePct         = 120
	ProbeMinBps      = 200 * 1000 // 200 kbps
	ProbeMinDuration = 20 * time.Second
	ProbeMaxDuration = 21 * time.Second

	PriorityMin                = uint8(1)
	PriorityMax                = uint8(255)
	PriorityDefaultScreenshare = PriorityMax
	PriorityDefaultVideo       = PriorityMin

	FlagAllowOvershootWhileOptimal              = true
	FlagAllowOvershootWhileDeficient            = false
	FlagAllowOvershootExemptTrackWhileDeficient = true
	FlagAllowOvershootInProbe                   = true
	FlagAllowOvershootInCatchup                 = false
	FlagAllowOvershootInBoost                   = true
)

Variables

View Source
var (
	ChannelObserverParamsProbe = ChannelObserverParams{
		Name:                           "probe",
		EstimateRequiredSamples:        3,
		EstimateDownwardTrendThreshold: 0.0,
		EstimateCollapseThreshold:      0,
		NackWindowMinDuration:          500 * time.Millisecond,
		NackWindowMaxDuration:          1 * time.Second,
		NackRatioThreshold:             0.04,
	}

	ChannelObserverParamsNonProbe = ChannelObserverParams{
		Name:                           "non-probe",
		EstimateRequiredSamples:        8,
		EstimateDownwardTrendThreshold: -0.5,
		EstimateCollapseThreshold:      250 * time.Millisecond,
		NackWindowMinDuration:          1 * time.Second,
		NackWindowMaxDuration:          2 * time.Second,
		NackRatioThreshold:             0.08,
	}
)

Functions

This section is empty.

Types

type AddTrackParams

type AddTrackParams struct {
	Source      livekit.TrackSource
	Priority    uint8
	IsSimulcast bool
	PublisherID livekit.ParticipantID
}

type ChannelCongestionReason

type ChannelCongestionReason int
const (
	ChannelCongestionReasonNone ChannelCongestionReason = iota
	ChannelCongestionReasonEstimate
	ChannelCongestionReasonLoss
)

func (ChannelCongestionReason) String

func (c ChannelCongestionReason) String() string

type ChannelObserver

type ChannelObserver struct {
	// contains filtered or unexported fields
}

func NewChannelObserver

func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *ChannelObserver

func (*ChannelObserver) AddEstimate

func (c *ChannelObserver) AddEstimate(estimate int64)

func (*ChannelObserver) AddNack

func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32)

func (*ChannelObserver) GetHighestEstimate

func (c *ChannelObserver) GetHighestEstimate() int64

func (*ChannelObserver) GetLowestEstimate

func (c *ChannelObserver) GetLowestEstimate() int64

func (*ChannelObserver) GetNackHistory

func (c *ChannelObserver) GetNackHistory() []string

func (*ChannelObserver) GetNackRatio

func (c *ChannelObserver) GetNackRatio() float64

func (*ChannelObserver) GetTrend

func (*ChannelObserver) SeedEstimate

func (c *ChannelObserver) SeedEstimate(estimate int64)

func (*ChannelObserver) ToString

func (c *ChannelObserver) ToString() string

type ChannelObserverParams

type ChannelObserverParams struct {
	Name                           string
	EstimateRequiredSamples        int
	EstimateDownwardTrendThreshold float64
	EstimateCollapseThreshold      time.Duration
	NackWindowMinDuration          time.Duration
	NackWindowMaxDuration          time.Duration
	NackRatioThreshold             float64
}

type ChannelTrend

type ChannelTrend int
const (
	ChannelTrendNeutral ChannelTrend = iota
	ChannelTrendClearing
	ChannelTrendCongesting
)

func (ChannelTrend) String

func (c ChannelTrend) String() string

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(id ProbeClusterId, mode ProbeClusterMode, desiredRateBps int, expectedRateBps int, minDuration time.Duration, maxDuration time.Duration) *Cluster

func (*Cluster) GetInfo

func (c *Cluster) GetInfo() ProbeClusterInfo

func (*Cluster) GetSleepDuration

func (c *Cluster) GetSleepDuration() time.Duration

func (*Cluster) IsFinished

func (c *Cluster) IsFinished() bool

func (*Cluster) PacketsSent

func (c *Cluster) PacketsSent(size int)

func (*Cluster) ProbeSent

func (c *Cluster) ProbeSent(size int)

func (*Cluster) Process

func (c *Cluster) Process(pl ProberListener)

func (*Cluster) Start

func (c *Cluster) Start()

func (*Cluster) String

func (c *Cluster) String() string

type Event

type Event struct {
	Signal  streamAllocatorSignal
	TrackID livekit.TrackID
	Data    interface{}
}

func (Event) String

func (e Event) String() string

type MaxDistanceSorter

type MaxDistanceSorter []*Track

func (MaxDistanceSorter) Len

func (m MaxDistanceSorter) Len() int

func (MaxDistanceSorter) Less

func (m MaxDistanceSorter) Less(i, j int) bool

func (MaxDistanceSorter) Swap

func (m MaxDistanceSorter) Swap(i, j int)

type MinDistanceSorter

type MinDistanceSorter []*Track

func (MinDistanceSorter) Len

func (m MinDistanceSorter) Len() int

func (MinDistanceSorter) Less

func (m MinDistanceSorter) Less(i, j int) bool

func (MinDistanceSorter) Swap

func (m MinDistanceSorter) Swap(i, j int)

type NackTracker

type NackTracker struct {
	// contains filtered or unexported fields
}

func NewNackTracker

func NewNackTracker(params NackTrackerParams) *NackTracker

func (*NackTracker) Add

func (n *NackTracker) Add(packets uint32, repeatedNacks uint32)

func (*NackTracker) GetHistory

func (n *NackTracker) GetHistory() []string

func (*NackTracker) GetRatio

func (n *NackTracker) GetRatio() float64

func (*NackTracker) IsTriggered

func (n *NackTracker) IsTriggered() bool

func (*NackTracker) ToString

func (n *NackTracker) ToString() string

type NackTrackerParams

type NackTrackerParams struct {
	Name              string
	Logger            logger.Logger
	WindowMinDuration time.Duration
	WindowMaxDuration time.Duration
	RatioThreshold    float64
}

type ProbeClusterId

type ProbeClusterId uint32
const (
	ProbeClusterIdInvalid ProbeClusterId = 0
)

type ProbeClusterInfo

type ProbeClusterInfo struct {
	Id        ProbeClusterId
	BytesSent int
	Duration  time.Duration
}

type ProbeClusterMode

type ProbeClusterMode int
const (
	ProbeClusterModeUniform ProbeClusterMode = iota
	ProbeClusterModeLinearChirp
)

func (ProbeClusterMode) String

func (p ProbeClusterMode) String() string

type Prober

type Prober struct {
	// contains filtered or unexported fields
}

func NewProber

func NewProber(params ProberParams) *Prober

func (*Prober) AddCluster

func (p *Prober) AddCluster(mode ProbeClusterMode, desiredRateBps int, expectedRateBps int, minDuration time.Duration, maxDuration time.Duration) ProbeClusterId

func (*Prober) IsRunning

func (p *Prober) IsRunning() bool

func (*Prober) PacketsSent

func (p *Prober) PacketsSent(size int)

func (*Prober) ProbeSent

func (p *Prober) ProbeSent(size int)

func (*Prober) Reset

func (p *Prober) Reset()

func (*Prober) SetProberListener

func (p *Prober) SetProberListener(listener ProberListener)

type ProberListener

type ProberListener interface {
	OnSendProbe(bytesToSend int)
	OnProbeClusterDone(info ProbeClusterInfo)
	OnActiveChanged(isActive bool)
}

type ProberParams

type ProberParams struct {
	Logger logger.Logger
}

type RateMonitor

type RateMonitor struct {
	// contains filtered or unexported fields
}

func NewRateMonitor

func NewRateMonitor() *RateMonitor

func (*RateMonitor) GetHistory

func (r *RateMonitor) GetHistory() []string

func (*RateMonitor) GetQueuingGuess

func (r *RateMonitor) GetQueuingGuess() float64

STREAM-ALLOCATOR-TODO: This should be updated periodically to flush any pending. Reason is that the estimate could be higher than the actual rate by a significant amount. So, updating periodically to flush out samples that will not contribute to queueing would be good.

func (*RateMonitor) Update

func (r *RateMonitor) Update(estimate int64, managedBytesSent uint32, managedBytesRetransmitted uint32, unmanagedBytesSent uint32, unmanagedBytesRetransmitted uint32)

type StreamAllocator

type StreamAllocator struct {
	// contains filtered or unexported fields
}

func NewStreamAllocator

func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator

func (*StreamAllocator) AddTrack

func (s *StreamAllocator) AddTrack(downTrack *sfu.DownTrack, params AddTrackParams)

func (*StreamAllocator) OnActiveChanged

func (s *StreamAllocator) OnActiveChanged(isActive bool)

called when prober active state changes

func (*StreamAllocator) OnAvailableLayersChanged

func (s *StreamAllocator) OnAvailableLayersChanged(downTrack *sfu.DownTrack)

called when feeding track's layer availability changes

func (*StreamAllocator) OnBitrateAvailabilityChanged

func (s *StreamAllocator) OnBitrateAvailabilityChanged(downTrack *sfu.DownTrack)

called when feeding track's bitrate measurement of any layer is available

func (*StreamAllocator) OnMaxPublishedSpatialChanged

func (s *StreamAllocator) OnMaxPublishedSpatialChanged(downTrack *sfu.DownTrack)

called when feeding track's max published spatial layer changes

func (*StreamAllocator) OnMaxPublishedTemporalChanged

func (s *StreamAllocator) OnMaxPublishedTemporalChanged(downTrack *sfu.DownTrack)

called when feeding track's max published temporal layer changes

func (*StreamAllocator) OnNACK

func (s *StreamAllocator) OnNACK(downTrack *sfu.DownTrack, nackInfos []sfu.NackInfo)

called by a video DownTrack when it processes NACKs

func (*StreamAllocator) OnPacketsSent

func (s *StreamAllocator) OnPacketsSent(downTrack *sfu.DownTrack, size int)

called by a video DownTrack to report packet send

func (*StreamAllocator) OnProbeClusterDone

func (s *StreamAllocator) OnProbeClusterDone(info ProbeClusterInfo)

called when prober wants to send packet(s)

func (*StreamAllocator) OnREMB

func (s *StreamAllocator) OnREMB(downTrack *sfu.DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate)

called when a new REMB is received (receive side bandwidth estimation)

func (*StreamAllocator) OnRTCPReceiverReport

func (s *StreamAllocator) OnRTCPReceiverReport(downTrack *sfu.DownTrack, rr rtcp.ReceptionReport)

called by a video DownTrack when it receives an RTCP Receiver Report STREAM-ALLOCATOR-TODO: this should probably be done for audio tracks also

func (*StreamAllocator) OnResume

func (s *StreamAllocator) OnResume(downTrack *sfu.DownTrack)

called when forwarder resumes a track

func (*StreamAllocator) OnSendProbe

func (s *StreamAllocator) OnSendProbe(bytesToSend int)

called when prober wants to send packet(s)

func (*StreamAllocator) OnStreamStateChange

func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) error)

func (*StreamAllocator) OnSubscribedLayerChanged

func (s *StreamAllocator) OnSubscribedLayerChanged(downTrack *sfu.DownTrack, layer buffer.VideoLayer)

called when subscribed layer changes (limiting max layer)

func (*StreamAllocator) OnSubscriptionChanged

func (s *StreamAllocator) OnSubscriptionChanged(downTrack *sfu.DownTrack)

called when subscription settings changes (muting/unmuting of track)

func (*StreamAllocator) OnTransportCCFeedback

func (s *StreamAllocator) OnTransportCCFeedback(downTrack *sfu.DownTrack, fb *rtcp.TransportLayerCC)

called when a new transport-cc feedback is received

func (*StreamAllocator) RemoveTrack

func (s *StreamAllocator) RemoveTrack(downTrack *sfu.DownTrack)

func (*StreamAllocator) SetAllowPause

func (s *StreamAllocator) SetAllowPause(allowPause bool)

func (*StreamAllocator) SetBandwidthEstimator

func (s *StreamAllocator) SetBandwidthEstimator(bwe cc.BandwidthEstimator)

func (*StreamAllocator) SetChannelCapacity

func (s *StreamAllocator) SetChannelCapacity(channelCapacity int64)

func (*StreamAllocator) SetTrackPriority

func (s *StreamAllocator) SetTrackPriority(downTrack *sfu.DownTrack, priority uint8)

func (*StreamAllocator) Start

func (s *StreamAllocator) Start()

func (*StreamAllocator) Stop

func (s *StreamAllocator) Stop()

type StreamAllocatorParams

type StreamAllocatorParams struct {
	Config config.CongestionControlConfig
	Logger logger.Logger
}

type StreamState

type StreamState int
const (
	StreamStateActive StreamState = iota
	StreamStatePaused
)

func (StreamState) String

func (s StreamState) String() string

type StreamStateInfo

type StreamStateInfo struct {
	ParticipantID livekit.ParticipantID
	TrackID       livekit.TrackID
	State         StreamState
}

type StreamStateUpdate

type StreamStateUpdate struct {
	StreamStates []*StreamStateInfo
}

func NewStreamStateUpdate

func NewStreamStateUpdate() *StreamStateUpdate

func (*StreamStateUpdate) Empty

func (s *StreamStateUpdate) Empty() bool

func (*StreamStateUpdate) HandleStreamingChange

func (s *StreamStateUpdate) HandleStreamingChange(isPaused bool, track *Track)

type Track

type Track struct {
	// contains filtered or unexported fields
}

func NewTrack

func NewTrack(
	downTrack *sfu.DownTrack,
	source livekit.TrackSource,
	isSimulcast bool,
	publisherID livekit.ParticipantID,
	logger logger.Logger,
) *Track

func (*Track) AllocateNextHigher

func (t *Track) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (sfu.VideoAllocation, bool)

func (*Track) AllocateOptimal

func (t *Track) AllocateOptimal(allowOvershoot bool) sfu.VideoAllocation

func (*Track) BandwidthRequested

func (t *Track) BandwidthRequested() int64

func (*Track) DistanceToDesired

func (t *Track) DistanceToDesired() float64

func (*Track) DownTrack

func (t *Track) DownTrack() *sfu.DownTrack

func (*Track) GetAndResetBytesSent

func (t *Track) GetAndResetBytesSent() (uint32, uint32)

func (*Track) GetAndResetNackStats

func (t *Track) GetAndResetNackStats() (lowest uint16, highest uint16, numNacked int, numNacks int, numRuns int)

func (*Track) GetHistory

func (t *Track) GetHistory() string

func (*Track) GetNackDelta

func (t *Track) GetNackDelta() (uint32, uint32)

func (*Track) GetNextHigherTransition

func (t *Track) GetNextHigherTransition(allowOvershoot bool) (sfu.VideoTransition, bool)

func (*Track) GetRTCPReceiverReportDelta

func (t *Track) GetRTCPReceiverReportDelta() (uint32, uint32, uint32)

func (*Track) ID

func (t *Track) ID() livekit.TrackID

func (*Track) IsDeficient

func (t *Track) IsDeficient() bool

func (*Track) IsManaged

func (t *Track) IsManaged() bool

func (*Track) Pause

func (t *Track) Pause() sfu.VideoAllocation

func (*Track) Priority

func (t *Track) Priority() uint8

func (*Track) ProcessRTCPReceiverReport

func (t *Track) ProcessRTCPReceiverReport(rr rtcp.ReceptionReport)

func (*Track) ProvisionalAllocate

func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64

func (*Track) ProvisionalAllocateCommit

func (t *Track) ProvisionalAllocateCommit() sfu.VideoAllocation

func (*Track) ProvisionalAllocateGetBestWeightedTransition

func (t *Track) ProvisionalAllocateGetBestWeightedTransition() sfu.VideoTransition

func (*Track) ProvisionalAllocateGetCooperativeTransition

func (t *Track) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) sfu.VideoTransition

func (*Track) ProvisionalAllocatePrepare

func (t *Track) ProvisionalAllocatePrepare()

func (*Track) PublisherID

func (t *Track) PublisherID() livekit.ParticipantID

func (*Track) SetDirty

func (t *Track) SetDirty(isDirty bool) bool

func (*Track) SetMaxLayer

func (t *Track) SetMaxLayer(layer buffer.VideoLayer) bool

func (*Track) SetPaused

func (t *Track) SetPaused(isPaused bool) bool

func (*Track) SetPriority

func (t *Track) SetPriority(priority uint8) bool

func (*Track) UpdateHistory

func (t *Track) UpdateHistory()

func (*Track) UpdateNack

func (t *Track) UpdateNack(nackInfos []sfu.NackInfo)

func (*Track) WritePaddingRTP

func (t *Track) WritePaddingRTP(bytesToSend int) int

type TrackSorter

type TrackSorter []*Track

func (TrackSorter) Len

func (t TrackSorter) Len() int

func (TrackSorter) Less

func (t TrackSorter) Less(i, j int) bool

func (TrackSorter) Swap

func (t TrackSorter) Swap(i, j int)

type TrendDetector

type TrendDetector struct {
	// contains filtered or unexported fields
}

func NewTrendDetector

func NewTrendDetector(params TrendDetectorParams) *TrendDetector

func (*TrendDetector) AddValue

func (t *TrendDetector) AddValue(value int64)

func (*TrendDetector) GetDirection

func (t *TrendDetector) GetDirection() TrendDirection

func (*TrendDetector) GetHighest

func (t *TrendDetector) GetHighest() int64

func (*TrendDetector) GetLowest

func (t *TrendDetector) GetLowest() int64

func (*TrendDetector) GetValues

func (t *TrendDetector) GetValues() []int64

func (*TrendDetector) Seed

func (t *TrendDetector) Seed(value int64)

func (*TrendDetector) ToString

func (t *TrendDetector) ToString() string

type TrendDetectorParams

type TrendDetectorParams struct {
	Name                   string
	Logger                 logger.Logger
	RequiredSamples        int
	DownwardTrendThreshold float64
	CollapseThreshold      time.Duration
}

type TrendDirection

type TrendDirection int
const (
	TrendDirectionNeutral TrendDirection = iota
	TrendDirectionUpward
	TrendDirectionDownward
)

func (TrendDirection) String

func (t TrendDirection) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL