stream

package
v0.0.0-...-ced71c4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 14 Imported by: 6

Documentation

Overview

Mostly take from github.com/Workiva/go-datastructures.

Index

Constants

View Source
const DefaultHLSStreamCap = uint(500)
View Source
const DefaultHLSStreamWin = uint(3)
View Source
const DefaultSegWaitTime = time.Second * 10

const DefaultMediaWinLen = uint(5)

View Source
const SegWaitInterval = time.Second

Variables

View Source
var (
	// ErrDisposed is returned when an operation is performed on a disposed
	// queue.
	ErrDisposed = errors.New(`queue: disposed`)

	// ErrTimeout is returned when an applicable queue operation times out.
	ErrTimeout = errors.New(`queue: poll timed out`)

	// ErrEmptyQueue is returned when an non-applicable queue operation was called
	// due to the queue's empty item state
	ErrEmptyQueue = errors.New(`queue: empty queue`)
)
View Source
var (
	HLS  = MakeVideoFormatType(avFormatTypeMagic + 1)
	RTMP = MakeVideoFormatType(avFormatTypeMagic + 2)
	DASH = MakeVideoFormatType(avFormatTypeMagic + 3)
)
View Source
var ErrAddHLSSegment = errors.New("ErrAddHLSSegment")
View Source
var ErrBadHLSBuffer = errors.New("BadHLSBuffer")
View Source
var ErrBufferEmpty = errors.New("Stream Buffer Empty")
View Source
var ErrBufferFull = errors.New("Stream Buffer Full")
View Source
var ErrBufferItemType = errors.New("Buffer Item Type Not Recognized")
View Source
var ErrDroppedRTMPStream = errors.New("RTMP Stream Stopped Without EOF")
View Source
var ErrEOF = errors.New("ErrEOF")
View Source
var ErrHttpReqFailed = errors.New("Http Request Failed")
View Source
var ErrNotFound = errors.New("Not Found")
View Source
var ErrVideoManifest = errors.New("ErrVideoManifest")

Functions

func ExecuteInParallel

func ExecuteInParallel(q *Queue, fn func(interface{}))

ExecuteInParallel will (in parallel) call the provided function with each item in the queue until the queue is exhausted. When the queue is exhausted execution is complete and all goroutines will be killed. This means that the queue will be disposed so cannot be used again.

Types

type AppData

type AppData interface {
	StreamID() string
}

type BasicHLSVideoManifest

type BasicHLSVideoManifest struct {
	// contains filtered or unexported fields
}

func NewBasicHLSVideoManifest

func NewBasicHLSVideoManifest(id string) *BasicHLSVideoManifest

func (*BasicHLSVideoManifest) AddVideoStream

func (m *BasicHLSVideoManifest) AddVideoStream(strm HLSVideoStream, variant *m3u8.Variant) error

func (*BasicHLSVideoManifest) DeleteVideoStream

func (m *BasicHLSVideoManifest) DeleteVideoStream(strmID string) error

func (*BasicHLSVideoManifest) GetManifest

func (m *BasicHLSVideoManifest) GetManifest() (*m3u8.MasterPlaylist, error)

func (*BasicHLSVideoManifest) GetManifestID

func (m *BasicHLSVideoManifest) GetManifestID() string

func (*BasicHLSVideoManifest) GetStreamVariant

func (m *BasicHLSVideoManifest) GetStreamVariant(strmID string) (*m3u8.Variant, error)

func (*BasicHLSVideoManifest) GetVideoFormat

func (m *BasicHLSVideoManifest) GetVideoFormat() VideoFormat

func (*BasicHLSVideoManifest) GetVideoStream

func (m *BasicHLSVideoManifest) GetVideoStream(strmID string) (HLSVideoStream, error)

func (*BasicHLSVideoManifest) GetVideoStreams

func (m *BasicHLSVideoManifest) GetVideoStreams() []HLSVideoStream

func (BasicHLSVideoManifest) String

func (m BasicHLSVideoManifest) String() string

type BasicHLSVideoStream

type BasicHLSVideoStream struct {
	// contains filtered or unexported fields
}

BasicHLSVideoStream is a basic implementation of HLSVideoStream

func NewBasicHLSVideoStream

func NewBasicHLSVideoStream(strmID string, wSize uint) *BasicHLSVideoStream

func (*BasicHLSVideoStream) AddHLSSegment

func (s *BasicHLSVideoStream) AddHLSSegment(seg *HLSSegment) error

AddHLSSegment adds the hls segment to the right stream

func (*BasicHLSVideoStream) AppData

func (s *BasicHLSVideoStream) AppData() AppData

func (*BasicHLSVideoStream) End

func (s *BasicHLSVideoStream) End()

func (*BasicHLSVideoStream) GetHLSSegment

func (s *BasicHLSVideoStream) GetHLSSegment(segName string) (*HLSSegment, error)

GetHLSSegment gets the HLS segment. It blocks until something is found, or timeout happens.

func (*BasicHLSVideoStream) GetStreamFormat

func (s *BasicHLSVideoStream) GetStreamFormat() VideoFormat

GetStreamFormat always returns HLS

func (*BasicHLSVideoStream) GetStreamID

func (s *BasicHLSVideoStream) GetStreamID() string

GetStreamID returns the streamID

func (*BasicHLSVideoStream) GetStreamPlaylist

func (s *BasicHLSVideoStream) GetStreamPlaylist() (*m3u8.MediaPlaylist, error)

GetStreamPlaylist returns the media playlist represented by the streamID

func (*BasicHLSVideoStream) SetSubscriber

func (s *BasicHLSVideoStream) SetSubscriber(f func(seg *HLSSegment, eof bool))

SetSubscriber sets the callback function that will be called when a new hls segment is inserted

func (BasicHLSVideoStream) String

func (s BasicHLSVideoStream) String() string

type BasicRTMPVideoStream

type BasicRTMPVideoStream struct {
	EOF chan struct{}

	RTMPTimeout time.Duration
	// contains filtered or unexported fields
}

func NewBasicRTMPVideoStream

func NewBasicRTMPVideoStream(data AppData) *BasicRTMPVideoStream

NewBasicRTMPVideoStream creates a new BasicRTMPVideoStream. The default RTMPTimeout is set to 10 milliseconds because we assume all RTMP streams are local.

func (*BasicRTMPVideoStream) AppData

func (s *BasicRTMPVideoStream) AppData() AppData

func (*BasicRTMPVideoStream) Close

func (s *BasicRTMPVideoStream) Close()

func (*BasicRTMPVideoStream) GetStreamFormat

func (s *BasicRTMPVideoStream) GetStreamFormat() VideoFormat

func (*BasicRTMPVideoStream) GetStreamID

func (s *BasicRTMPVideoStream) GetStreamID() string

func (BasicRTMPVideoStream) Height

func (s BasicRTMPVideoStream) Height() int

func (*BasicRTMPVideoStream) ReadRTMPFromStream

func (s *BasicRTMPVideoStream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) (eof chan struct{}, err error)

ReadRTMPFromStream reads the content from the RTMP stream out into the dst.

func (BasicRTMPVideoStream) String

func (s BasicRTMPVideoStream) String() string

func (BasicRTMPVideoStream) Width

func (s BasicRTMPVideoStream) Width() int

func (*BasicRTMPVideoStream) WriteRTMPToStream

func (s *BasicRTMPVideoStream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) (eof chan struct{}, err error)

WriteRTMPToStream writes a video stream from src into the stream.

type Broadcaster

type Broadcaster interface {
	Broadcast(seqNo uint64, data []byte) error
	IsLive() bool
	Finish() error
	String() string
}

Broadcaster takes a streamID and a reader, and broadcasts the data to whatever underlining network. Note the data param doesn't have to be the raw data. The implementation can choose to encode any struct. Example:

s := GetStream("StrmID")
b := ppspp.NewBroadcaster("StrmID", s.Metadata())
for seqNo, data := range s.Segments() {
	b.Broadcast(seqNo, data)
}
b.Finish()

type HLSDemuxer

type HLSDemuxer interface {
	PollPlaylist(ctx context.Context) (m3u8.MediaPlaylist, error)
	WaitAndPopSegment(ctx context.Context, name string) ([]byte, error)
	WaitAndGetSegment(ctx context.Context, name string) ([]byte, error)
}

type HLSMuxer

type HLSMuxer interface {
	WriteSegment(seqNo uint64, name string, duration float64, s []byte) error
}

type HLSSegment

type HLSSegment struct {
	SeqNo       uint64
	Name        string
	Data        []byte
	Duration    float64
	IsZeroFrame bool
}

We couldn't just use the m3u8 definition

type HLSVideoManifest

type HLSVideoManifest interface {
	VideoManifest
	GetManifest() (*m3u8.MasterPlaylist, error)
	GetVideoStream(strmID string) (HLSVideoStream, error)
	// AddVideoStream(strmID string, variant *m3u8.Variant) (HLSVideoStream, error)
	AddVideoStream(strm HLSVideoStream, variant *m3u8.Variant) error
	GetStreamVariant(strmID string) (*m3u8.Variant, error)
	GetVideoStreams() []HLSVideoStream
	DeleteVideoStream(strmID string) error
}

type HLSVideoStream

type HLSVideoStream interface {
	VideoStream
	GetStreamPlaylist() (*m3u8.MediaPlaylist, error)
	// GetStreamVariant() *m3u8.Variant
	GetHLSSegment(segName string) (*HLSSegment, error)
	AddHLSSegment(seg *HLSSegment) error
	SetSubscriber(f func(seg *HLSSegment, eof bool))
	End()
}

HLSVideoStream contains the master playlist, media playlists in it, and the segments in them. Each media playlist also has a streamID. You can only add media playlists to the stream.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is the struct responsible for tracking the state of the queue.

func NewQueue

func NewQueue(hint int64) *Queue

New is a constructor for a new threadsafe queue.

func (*Queue) Dispose

func (q *Queue) Dispose() []interface{}

Dispose will dispose of this queue and returns the items disposed. Any subsequent calls to Get or Put will return an error.

func (*Queue) Disposed

func (q *Queue) Disposed() bool

Disposed returns a bool indicating if this queue has had disposed called on it.

func (*Queue) Empty

func (q *Queue) Empty() bool

Empty returns a bool indicating if this bool is empty.

func (*Queue) Get

func (q *Queue) Get(number int64) ([]interface{}, error)

Get retrieves items from the queue. If there are some items in the queue, get will return a number UP TO the number passed in as a parameter. If no items are in the queue, this method will pause until items are added to the queue.

func (*Queue) Len

func (q *Queue) Len() int64

Len returns the number of items in this queue.

func (*Queue) Peek

func (q *Queue) Peek() (interface{}, error)

Peek returns a the first item in the queue by value without modifying the queue.

func (*Queue) Poll

func (q *Queue) Poll(ctx context.Context, number int64, timeout time.Duration) ([]interface{}, error)

Poll retrieves items from the queue. If there are some items in the queue, Poll will return a number UP TO the number passed in as a parameter. If no items are in the queue, this method will pause until items are added to the queue or the provided timeout is reached. A non-positive timeout will block until items are added. If a timeout occurs, ErrTimeout is returned.

func (*Queue) Put

func (q *Queue) Put(items ...interface{}) error

Put will add the specified items to the queue.

func (*Queue) TakeUntil

func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, error)

TakeUntil takes a function and returns a list of items that match the checker until the checker returns false. This does not wait if there are no items in the queue.

type RTMPEOF

type RTMPEOF struct{}

type RTMPVideoStream

type RTMPVideoStream interface {
	VideoStream
	ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) (eof chan struct{}, err error)
	WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) (eof chan struct{}, err error)
	Close()
	Height() int
	Width() int
}

type Subscriber

type Subscriber interface {
	Subscribe(ctx context.Context, gotData func(seqNo uint64, data []byte, eof bool)) error
	IsLive() bool
	Unsubscribe() error
	String() string
}

Subscriber subscribes to a stream defined by strmID. It returns a reader that contains the stream. Example 1:

sub, metadata := ppspp.NewSubscriber("StrmID")
stream := NewStream("StrmID", metadata)
ctx, cancel := context.WithCancel(context.Background()
err := sub.Subscribe(ctx, func(seqNo uint64, data []byte){
	stream.WriteSeg(seqNo, data)
})
time.Sleep(time.Second * 5)
cancel()

Example 2:

sub.Unsubscribe() //This is the same with calling cancel()

type VideoFormat

type VideoFormat uint32

func MakeVideoFormatType

func MakeVideoFormatType(base uint32) (c VideoFormat)

type VideoManifest

type VideoManifest interface {
	GetManifestID() string
	GetVideoFormat() VideoFormat
	String() string
}

type VideoStream

type VideoStream interface {
	AppData() AppData
	GetStreamID() string
	GetStreamFormat() VideoFormat
	String() string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL