media

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DirectionInput  = "rx"
	DirectionOutput = "tx"
)

Constants

View Source
const (
	// KeySIPSuppressUplinkEcho: when true, output-router does not send decoded uplink
	// microphone AudioPackets to RTP output; only synthesized (TTS) audio is sent.
	KeySIPSuppressUplinkEcho = "sip_suppress_uplink_echo"
)

Session value keys (sync.Map on MediaSession)

Variables

View Source
var (
	AllStates     = "*"
	Begin         = "begin"
	End           = "end"
	Hangup        = "hangup"
	StartSpeaking = "speaking.start"
	StartSilence  = "silence.start"
	Transcribing  = "transcribing" // params: sentence string
	Synthesizing  = "synthesizing" // params: result string
	StartPlay     = "play.start"
	StopPlay      = "play.stop"
	Completed     = "completed"
	// interrupt
	Interruption = "interruption"
)

State types

View Source
var (
	MediaDataTypeState  = "state"
	MediaDataTypePacket = "packet"
	MediaDataTypeMetric = "metric"
)
View Source
var (
	ErrNotInputTransport  = errors.New("not input transport")
	ErrNotOutputTransport = errors.New("not output transport")
	ErrCodecNotSupported  = errors.New("codec not supported")
)
View Source
var (
	AgentRunning    = "_agent_running"
	WorkingState    = "_working_state"
	UpstreamRunning = "_upstream_running"
)

Functions

func CastOption

func CastOption[T any](options map[string]any) (val T)

func ComputeSampleByteCount

func ComputeSampleByteCount(sampleRate, bitDepth, channels int) int

ComputeSampleByteCount returns the number of bytes produced by one millisecond of linear-PCM audio at the given configuration. It is used by both the recognizer (to compute byte budgets per second) and the synthesizer (to size frame slices).

Formula: bytes_per_ms = sampleRate * (bitDepth/8) * channels / 1000.

Inputs <= 0 are treated as zero so the function never panics; callers can guard against zero results to detect misconfiguration.

func IsAudioCodecSupported

func IsAudioCodecSupported(codec AudioCodec) bool

IsAudioCodecSupported checks if an audio codec is supported

func IsVideoCodecSupported

func IsVideoCodecSupported(codec VideoCodec) bool

IsVideoCodecSupported checks if a video codec is supported

func NormalizeFramePeriod

func NormalizeFramePeriod(s string) time.Duration

NormalizeFramePeriod parses a frame-period string (e.g. "20ms", "60ms") and clamps it to the supported range [10ms, 300ms]. Empty / unparseable / zero / out-of-range values are coerced to a safe default of 20ms so downstream RTP/codec packetizers always receive a usable cadence.

The string form is what synthesizer/recognizer Options carry over the wire (JSON / form encoding). Pure-Duration callers can simply pass d.String().

func ResamplePCM

func ResamplePCM(data []byte, inputRate, outputRate int) ([]byte, error)

ResamplePCM converts audio data from one sample rate to another

func SetDefaultResampler

func SetDefaultResampler(factory ConverterFactory)

SetDefaultResampler sets the default converter factory

Types

type AsyncTaskRunner

type AsyncTaskRunner[T any] struct {
	InitCallback      func(h MediaHandler) error
	TerminateCallback func(h MediaHandler) error
	StateCallback     func(h MediaHandler, event StateChange) error
	RequestBuilder    func(h MediaHandler, packet MediaPacket) (*PacketRequest[T], error)
	TaskExecutor      func(ctx context.Context, h MediaHandler, req PacketRequest[T]) error

	WorkerPoolSize int
	TaskTimeout    time.Duration
	MaxTaskTimeout time.Duration
	ConcurrentMode bool
	// contains filtered or unexported fields
}

AsyncTaskRunner handles asynchronous task execution using true multi-worker pool pattern

func NewAsyncTaskRunner

func NewAsyncTaskRunner[T any](queueSize int) AsyncTaskRunner[T]

NewAsyncTaskRunner creates a new task runner with worker pool configuration

func (*AsyncTaskRunner[T]) CancelActiveTask

func (tr *AsyncTaskRunner[T]) CancelActiveTask()

CancelActiveTask stops all worker tasks (for multi-worker pool)

func (*AsyncTaskRunner[T]) HandleMediaData

func (tr *AsyncTaskRunner[T]) HandleMediaData(h MediaHandler, data MediaData)

HandleMediaData routes media data to appropriate handlers

func (*AsyncTaskRunner[T]) HandlePacket

func (tr *AsyncTaskRunner[T]) HandlePacket(h MediaHandler, packet MediaPacket)

HandlePacket processes packet data through task queue

func (*AsyncTaskRunner[T]) HandleState

func (tr *AsyncTaskRunner[T]) HandleState(h MediaHandler, event StateChange)

HandleState processes state change events

func (*AsyncTaskRunner[T]) ReleaseResources

func (tr *AsyncTaskRunner[T]) ReleaseResources()

ReleaseResources frees allocated resources and stops all workers

type AudioCodec

type AudioCodec string

AudioCodec represents supported audio codecs

const (
	// Uncompressed
	AudioCodecPCM  AudioCodec = "pcm"
	AudioCodecPCMU AudioCodec = "pcmu"
	AudioCodecPCMA AudioCodec = "pcma"

	// Lossless compression
	AudioCodecFLAC AudioCodec = "flac"
	AudioCodecAPE  AudioCodec = "ape"
	AudioCodecWAV  AudioCodec = "wav"

	// Lossy compression (Low bitrate)
	AudioCodecMP3    AudioCodec = "mp3"
	AudioCodecAAC    AudioCodec = "aac"
	AudioCodecOPUS   AudioCodec = "opus"
	AudioCodecVORBIS AudioCodec = "vorbis"

	// Lossy compression (High quality)
	AudioCodecFDAC AudioCodec = "fdac"
	AudioCodecALAC AudioCodec = "alac"

	// Telephony
	AudioCodecGSM  AudioCodec = "gsm"
	AudioCodecAMR  AudioCodec = "amr"
	AudioCodecSILK AudioCodec = "silk"

	// Proprietary
	AudioCodecWMA AudioCodec = "wma"
	AudioCodecAC3 AudioCodec = "ac3"
	AudioCodecDTS AudioCodec = "dts"
)

type AudioPacket

type AudioPacket struct {
	PlayID   string `json:"id,omitempty"`
	Sequence int    `json:"sequence"`
	Payload  []byte `json:"payload"`
	// RTPSamples, when > 0, is the RTP clock increment for this frame (RFC 3550 timestamp delta).
	// Encoders with variable frame sizes (e.g. OPUS) should set this so the RTP layer does not guess from duration.
	RTPSamples    uint32 `json:"rtpSamples,omitempty"`
	IsFirstPacket bool   `json:"isFirstPacket,omitempty"`
	IsEndPacket   bool   `json:"isEndPacket,omitempty"`
	IsSynthesized bool   `json:"isSynthesized,omitempty"`
	IsSilence     bool   `json:"isSilence,omitempty"`
	SourceText    string `json:"sourceText,omitempty"`
}

func (*AudioPacket) Body

func (d *AudioPacket) Body() []byte

func (*AudioPacket) String

func (d *AudioPacket) String() string

type AudioProfile

type AudioProfile string

AudioProfile represents audio codec profiles

const (
	// AAC profiles
	AudioProfileAAC_LC    AudioProfile = "aac_lc"
	AudioProfileAAC_HE    AudioProfile = "aac_he"
	AudioProfileAAC_HE_V2 AudioProfile = "aac_he_v2"
	AudioProfileAAC_LD    AudioProfile = "aac_ld"
	AudioProfileAAC_ELD   AudioProfile = "aac_eld"

	// Opus profiles
	AudioProfileOpusNarrow AudioProfile = "opus_narrow"
	AudioProfileOpusWide   AudioProfile = "opus_wide"

	// MP3 profiles
	AudioProfileMP3_MPEG1  AudioProfile = "mp3_mpeg1"
	AudioProfileMP3_MPEG2  AudioProfile = "mp3_mpeg2"
	AudioProfileMP3_MPEG25 AudioProfile = "mp3_mpeg25"
)

type BaseProcessor

type BaseProcessor struct {
	// contains filtered or unexported fields
}

BaseProcessor provides default implementation for common processor functionality

func NewBaseProcessor

func NewBaseProcessor(name string, priority ProcessorPriority) *BaseProcessor

NewBaseProcessor creates a base processor

func (*BaseProcessor) CanHandle

func (bp *BaseProcessor) CanHandle(ctx context.Context, event *MediaEvent) bool

CanHandle checks if the processor can handle the event

func (*BaseProcessor) Name

func (bp *BaseProcessor) Name() string

Name returns the processor name

func (*BaseProcessor) Priority

func (bp *BaseProcessor) Priority() ProcessorPriority

Priority returns the processor priority

func (*BaseProcessor) WithCondition

func (bp *BaseProcessor) WithCondition(condition ProcessorCondition) *BaseProcessor

WithCondition sets a condition for the processor

type ClosePacket

type ClosePacket struct {
	Reason string `json:"reason"`
}

func (*ClosePacket) Body

func (f *ClosePacket) Body() []byte

func (*ClosePacket) String

func (f *ClosePacket) String() string

type CodecConfig

type CodecConfig struct {
	Codec         string `json:"codec" form:"codec" default:"pcm"`
	SampleRate    int    `json:"sampleRate" form:"sample_rate" default:"16000"`
	Channels      int    `json:"channels" form:"channels" default:"1"`
	BitDepth      int    `json:"bitDepth" form:"bit_depth" default:"16"`
	FrameDuration string `json:"frameDuration" form:"frame_duration"`
	PayloadType   uint8  `json:"payloadType" form:"payload_type"`
	// OpusDecodeChannels: if 1 or 2, Opus inbound decode uses this many channels (from remote SDP offer),
	// then downmixes to mono for PCM. Encoder/answer Channels stay 1.
	OpusDecodeChannels int `json:"opusDecodeChannels,omitempty" form:"opus_decode_channels"`
	// OpusPCMBridgeDecodeStereo: SIP two-leg PCM bridge only — SDP OPUS/48000/2 payloads use a stereo
	// TOC; force a 2-channel libopus decoder then downmix to mono PCM. CallSession/ASR leaves this false.
	OpusPCMBridgeDecodeStereo bool `json:"-"`
}

CodecConfig defines codec configuration

func DefaultCodecConfig

func DefaultCodecConfig() CodecConfig

func (CodecConfig) String

func (c CodecConfig) String() string

type CodecInfo

type CodecInfo struct {
	// Codec identifier (can be audio or video codec name)
	Codec string

	// Human-readable name
	Name string

	// Description
	Description string

	// Whether the codec is lossy
	IsLossy bool

	// Supported profiles
	Profiles []string

	// Supported levels
	Levels []string

	// Typical bitrate range (kbps)
	BitrateMin int
	BitrateMax int

	// Whether hardware acceleration is available
	HardwareAcceleration bool

	// Supported container formats
	Containers []string
}

CodecInfo contains detailed information about a codec

func AudioCodecInfo

func AudioCodecInfo(codec AudioCodec) *CodecInfo

AudioCodecInfo returns information about an audio codec

func VideoCodecInfo

func VideoCodecInfo(codec VideoCodec) *CodecInfo

VideoCodecInfo returns information about a video codec

type CodecLevel

type CodecLevel string

CodecLevel represents codec level/tier

const (
	// H.264 levels
	CodecLevelH264_1  CodecLevel = "h264_1"
	CodecLevelH264_1b CodecLevel = "h264_1b"
	CodecLevelH264_11 CodecLevel = "h264_11"
	CodecLevelH264_12 CodecLevel = "h264_12"
	CodecLevelH264_13 CodecLevel = "h264_13"
	CodecLevelH264_2  CodecLevel = "h264_2"
	CodecLevelH264_21 CodecLevel = "h264_21"
	CodecLevelH264_22 CodecLevel = "h264_22"
	CodecLevelH264_3  CodecLevel = "h264_3"
	CodecLevelH264_31 CodecLevel = "h264_31"
	CodecLevelH264_32 CodecLevel = "h264_32"
	CodecLevelH264_4  CodecLevel = "h264_4"
	CodecLevelH264_41 CodecLevel = "h264_41"
	CodecLevelH264_42 CodecLevel = "h264_42"
	CodecLevelH264_5  CodecLevel = "h264_5"
	CodecLevelH264_51 CodecLevel = "h264_51"
	CodecLevelH264_52 CodecLevel = "h264_52"

	// H.265 levels
	CodecLevelH265_1  CodecLevel = "h265_1"
	CodecLevelH265_2  CodecLevel = "h265_2"
	CodecLevelH265_21 CodecLevel = "h265_21"
	CodecLevelH265_3  CodecLevel = "h265_3"
	CodecLevelH265_31 CodecLevel = "h265_31"
	CodecLevelH265_4  CodecLevel = "h265_4"
	CodecLevelH265_41 CodecLevel = "h265_41"
	CodecLevelH265_5  CodecLevel = "h265_5"
	CodecLevelH265_51 CodecLevel = "h265_51"
	CodecLevelH265_52 CodecLevel = "h265_52"
)

type CompletedData

type CompletedData struct {
	SenderName   string        `json:"senderName"` // eg: tts.aws, asr.qcloud
	Duration     time.Duration `json:"duration"`   // total duration
	Source       MediaPacket   `json:"-"`          // last packet
	Result       any           `json:"result"`     // result
	AssistantId  uint          `json:"assistantId"`
	AssistantVid uint          `json:"assistantVid"`
	DialogID     string        `json:"dialogID"`
}

func (*CompletedData) MarshalJSON

func (d *CompletedData) MarshalJSON() ([]byte, error)

func (CompletedData) String

func (d CompletedData) String() string

type ConverterFactory

type ConverterFactory func(inputRate, outputRate int) SampleRateConverter

ConverterFactory creates a sample rate converter

type DTMFPacket

type DTMFPacket struct {
	Digit string `json:"digit"` // "0"–"9", "*", "#", "A"–"D"
	End   bool   `json:"end"`   // RFC 2833 E (end) bit — prefer handling when true to avoid duplicate keys
}

DTMFPacket carries one RFC 2833 / RFC 4733 telephone-event (RTP payload type typically 101). Emitted by SIP RTP input when the peer sends out-of-band DTMF; not passed through audio codecs.

func (*DTMFPacket) Body

func (d *DTMFPacket) Body() []byte

func (*DTMFPacket) String

func (d *DTMFPacket) String() string

type EncoderFunc

type EncoderFunc func(packet MediaPacket) ([]MediaPacket, error)

type ErrorHandler

type ErrorHandler func(sender any, err error)

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus manages event distribution using pub/sub pattern

func NewEventBus

func NewEventBus(ctx context.Context, queueSize, workers int) *EventBus

NewEventBus creates a new event bus

func (*EventBus) Close

func (eb *EventBus) Close()

Close stops the event bus

func (*EventBus) Publish

func (eb *EventBus) Publish(event *MediaEvent)

Publish sends an event to all subscribers

func (*EventBus) PublishError

func (eb *EventBus) PublishError(sessionID string, err error, sender interface{})

PublishError publishes an error event

func (*EventBus) PublishPacket

func (eb *EventBus) PublishPacket(sessionID string, packet MediaPacket, sender interface{})

PublishPacket publishes a packet event

func (*EventBus) PublishState

func (eb *EventBus) PublishState(sessionID string, state StateChange, sender interface{})

PublishState publishes a state change event

func (*EventBus) Subscribe

func (eb *EventBus) Subscribe(eventType EventType, handler EventHandler)

Subscribe registers an event handler for a specific event type

func (*EventBus) Unsubscribe

func (eb *EventBus) Unsubscribe(eventType EventType, handler EventHandler)

Unsubscribe removes an event handler

type EventHandler

type EventHandler func(ctx context.Context, event *MediaEvent) error

EventHandler processes events from the event bus

type EventType

type EventType string

EventType represents the type of event

const (
	EventTypePacket    EventType = "packet"
	EventTypeState     EventType = "state"
	EventTypeError     EventType = "error"
	EventTypeLifecycle EventType = "lifecycle"
)

type FuncProcessor

type FuncProcessor struct {
	*BaseProcessor
	// contains filtered or unexported fields
}

FuncProcessor is a processor implemented as a function

func NewFuncProcessor

func NewFuncProcessor(name string, priority ProcessorPriority, fn func(ctx context.Context, session *MediaSession, event *MediaEvent) error) *FuncProcessor

NewFuncProcessor creates a function-based processor

func (*FuncProcessor) Process

func (fp *FuncProcessor) Process(ctx context.Context, session *MediaSession, event *MediaEvent) error

Process executes the processor function

type InterpolatingConverter

type InterpolatingConverter struct {
	// contains filtered or unexported fields
}

InterpolatingConverter performs optimized interpolation for sample rate conversion

func (*InterpolatingConverter) Close

func (ic *InterpolatingConverter) Close() error

func (*InterpolatingConverter) ConvertSamples

func (ic *InterpolatingConverter) ConvertSamples(samples []byte) []byte

ConvertSamples performs interpolation (linear by default, cubic if enabled)

func (*InterpolatingConverter) Samples

func (ic *InterpolatingConverter) Samples() []byte

func (*InterpolatingConverter) Write

func (ic *InterpolatingConverter) Write(p []byte) (n int, err error)

Write implements SampleRateConverter

type LocalMediaCache

type LocalMediaCache struct {
	Disabled  bool
	CacheRoot string
}

func MediaCache

func MediaCache() *LocalMediaCache

func (*LocalMediaCache) BuildKey

func (c *LocalMediaCache) BuildKey(params ...string) string

func (*LocalMediaCache) Get

func (c *LocalMediaCache) Get(key string) ([]byte, error)

func (*LocalMediaCache) Store

func (c *LocalMediaCache) Store(key string, data []byte) error

type MediaData

type MediaData struct {
	CreatedAt time.Time
	Sender    any
	Type      string
	State     StateChange
	Packet    MediaPacket
	Duration  *time.Duration
}

func (*MediaData) String

func (d *MediaData) String() string

type MediaEvent

type MediaEvent struct {
	Type      EventType
	Timestamp time.Time
	SessionID string
	Payload   interface{}
	Metadata  map[string]interface{}
}

MediaEvent represents an event in the event bus

type MediaHandler

type MediaHandler interface {
	GetContext() context.Context
	GetSession() *MediaSession
	CauseError(sender any, err error)
	EmitState(sender any, state string, params ...any)
	EmitPacket(sender any, packet MediaPacket)
	SendToOutput(sender any, packet MediaPacket)
	AddMetric(key string, duration time.Duration)
	InjectPacket(f PacketFilter)
}

type MediaHandlerFunc

type MediaHandlerFunc func(h MediaHandler, data MediaData)

type MediaPacket

type MediaPacket interface {
	fmt.Stringer
	Body() []byte
}

MediaPacket types - represents media data packets

type MediaSession

type MediaSession struct {
	ID                 string             `json:"id"`
	Running            bool               `json:"running"`
	QueueSize          int                `json:"queueSize"`
	SampleRate         int                // sample rate of the session
	MaxSessionDuration int                `json:"maxSessionDuration"` // Set the maximum session duration in seconds.
	EffectAudios       map[string]*[]byte `json:"-"`
	StartAt            time.Time          `json:"startAt"`
	// contains filtered or unexported fields
}

func NewDefaultSession

func NewDefaultSession() *MediaSession

func (*MediaSession) AddInputTransport

func (s *MediaSession) AddInputTransport(rx MediaTransport, filterFuncs ...PacketFilter) *MediaSession

AddInputTransport registers input transport with different method name

func (*MediaSession) AddMetric

func (s *MediaSession) AddMetric(key string, duration time.Duration)

func (*MediaSession) AddOutputTransport

func (s *MediaSession) AddOutputTransport(tx MediaTransport, filterFuncs ...PacketFilter) *MediaSession

AddOutputTransport registers output transport with different method name

func (*MediaSession) CauseError

func (s *MediaSession) CauseError(sender any, err error)

func (*MediaSession) Close

func (s *MediaSession) Close() error

func (*MediaSession) Codec

func (s *MediaSession) Codec() CodecConfig

func (*MediaSession) Context

func (s *MediaSession) Context(parent context.Context) *MediaSession

chainable methods

func (*MediaSession) Decode

func (s *MediaSession) Decode(dec EncoderFunc) *MediaSession

func (*MediaSession) Delete

func (s *MediaSession) Delete(key string)

func (*MediaSession) EmitPacket

func (s *MediaSession) EmitPacket(sender any, packet MediaPacket)

func (*MediaSession) EmitState

func (s *MediaSession) EmitState(sender any, state string, params ...any)

func (*MediaSession) Encode

func (s *MediaSession) Encode(enc EncoderFunc) *MediaSession

func (*MediaSession) Error

func (s *MediaSession) Error(handles ...ErrorHandler) *MediaSession

Handle error caused

func (*MediaSession) Get

func (s *MediaSession) Get(key string) (val any, ok bool)

func (*MediaSession) GetContext

func (s *MediaSession) GetContext() context.Context

func (*MediaSession) GetSession

func (s *MediaSession) GetSession() *MediaSession

func (*MediaSession) GetString

func (s *MediaSession) GetString(key string) string

func (*MediaSession) GetUint

func (s *MediaSession) GetUint(key string) uint

func (*MediaSession) InjectPacket

func (s *MediaSession) InjectPacket(f PacketFilter)

Do nothing

func (*MediaSession) Input

func (s *MediaSession) Input(rx MediaTransport, filterFuncs ...PacketFilter) *MediaSession

Input is an alias for backward compatibility

func (*MediaSession) IsValid

func (s *MediaSession) IsValid() error

func (*MediaSession) NotifyServeStarting

func (s *MediaSession) NotifyServeStarting()

NotifyServeStarting records that Serve() is about to run. Call synchronously before starting the Serve goroutine so WaitServeShutdown can block until that Serve instance has torn down.

func (*MediaSession) On

func (s *MediaSession) On(state string, handles ...StateChangeHandler) *MediaSession

func (*MediaSession) Output

func (s *MediaSession) Output(tx MediaTransport, filterFuncs ...PacketFilter) *MediaSession

Output is an alias for backward compatibility

func (*MediaSession) Pipeline

func (s *MediaSession) Pipeline(handles ...MediaHandlerFunc) *MediaSession

Pipeline is an alias for backward compatibility

func (*MediaSession) PostHook

func (s *MediaSession) PostHook(hooks ...SessionHook) *MediaSession

func (*MediaSession) RegisterProcessor

func (s *MediaSession) RegisterProcessor(processor Processor) *MediaSession

RegisterProcessor registers a processor in the registry

func (*MediaSession) SendToOutput

func (s *MediaSession) SendToOutput(sender any, packet MediaPacket)

func (*MediaSession) Serve

func (s *MediaSession) Serve() error

Serve Start the session, this will block the current goroutine

func (*MediaSession) Set

func (s *MediaSession) Set(key string, val any)

func (*MediaSession) SetSessionID

func (s *MediaSession) SetSessionID(id string) *MediaSession

func (*MediaSession) String

func (s *MediaSession) String() string

func (*MediaSession) Trace

func (s *MediaSession) Trace(trace MediaHandlerFunc) *MediaSession

func (*MediaSession) UseMiddleware

func (s *MediaSession) UseMiddleware(handles ...MediaHandlerFunc) *MediaSession

UseMiddleware is deprecated, use RegisterProcessor instead

func (*MediaSession) WaitServeShutdown

func (s *MediaSession) WaitServeShutdown(ctx context.Context) error

WaitServeShutdown blocks until Serve has finished and released transport readers/writers, or until ctx ends. If Serve was never scheduled for this session, returns nil immediately.

type MediaTransport

type MediaTransport interface {
	io.Closer
	String() string
	Attach(s *MediaSession)
	Next(ctx context.Context) (MediaPacket, error)
	Send(ctx context.Context, packet MediaPacket) (int, error)
	Codec() CodecConfig
	Close() error
}

MediaTransport interface for media transport

type PacketFilter

type PacketFilter func(packet MediaPacket) (bool, error)

type PacketProcessor

type PacketProcessor struct {
	*BaseProcessor
	// contains filtered or unexported fields
}

PacketProcessor is a specialized processor for packet events

func NewPacketProcessor

func NewPacketProcessor(name string, priority ProcessorPriority, fn func(ctx context.Context, session *MediaSession, packet MediaPacket) error) *PacketProcessor

NewPacketProcessor creates a packet processor

func (*PacketProcessor) CanHandle

func (pp *PacketProcessor) CanHandle(ctx context.Context, event *MediaEvent) bool

CanHandle checks if this is a packet event

func (*PacketProcessor) Process

func (pp *PacketProcessor) Process(ctx context.Context, session *MediaSession, event *MediaEvent) error

Process handles packet events

type PacketRequest

type PacketRequest[R any] struct {
	H         MediaHandler
	Interrupt bool
	Req       R
}

PacketRequest is an alias for backward compatibility - maps Interrupt to shouldStop

type PipelineStage

type PipelineStage struct {
	// contains filtered or unexported fields
}

PipelineStage represents a single stage in the processing pipeline Uses asynchronous event-driven architecture instead of synchronous chain

func (*PipelineStage) AddMetric

func (sp *PipelineStage) AddMetric(key string, duration time.Duration)

func (*PipelineStage) CauseError

func (sp *PipelineStage) CauseError(sender any, err error)

func (*PipelineStage) EmitPacket

func (sp *PipelineStage) EmitPacket(sender any, packet MediaPacket)

EmitPacket enqueues packet for asynchronous processing

func (*PipelineStage) EmitState

func (sp *PipelineStage) EmitState(sender any, state string, params ...any)

func (*PipelineStage) GetContext

func (sp *PipelineStage) GetContext() context.Context

func (*PipelineStage) GetSession

func (sp *PipelineStage) GetSession() *MediaSession

func (*PipelineStage) InjectPacket

func (sp *PipelineStage) InjectPacket(f PacketFilter)

InjectPacket sets pre-processing filter function

func (*PipelineStage) SendToOutput

func (sp *PipelineStage) SendToOutput(sender any, packet MediaPacket)

func (*PipelineStage) String

func (sp *PipelineStage) String() string

type Processor

type Processor interface {
	// Name returns the processor name
	Name() string

	// Priority returns processing priority (higher = processed first)
	Priority() ProcessorPriority

	// CanHandle checks if this processor can handle the event
	CanHandle(ctx context.Context, event *MediaEvent) bool

	// Process handles the event
	Process(ctx context.Context, session *MediaSession, event *MediaEvent) error
}

Processor handles media events

type ProcessorCondition

type ProcessorCondition func(ctx context.Context, event *MediaEvent) bool

ProcessorCondition determines if a processor should handle an event

type ProcessorPriority

type ProcessorPriority int

ProcessorPriority defines processing priority

const (
	PriorityLow    ProcessorPriority = 0
	PriorityNormal ProcessorPriority = 50
	PriorityHigh   ProcessorPriority = 100
)

type ProcessorRegistry

type ProcessorRegistry struct {
	// contains filtered or unexported fields
}

ProcessorRegistry manages registered processors

func NewProcessorRegistry

func NewProcessorRegistry() *ProcessorRegistry

NewProcessorRegistry creates a new processor registry

func (*ProcessorRegistry) GetAllProcessors

func (pr *ProcessorRegistry) GetAllProcessors() []Processor

GetAllProcessors returns all registered processors

func (*ProcessorRegistry) GetProcessors

func (pr *ProcessorRegistry) GetProcessors(ctx context.Context, event *MediaEvent) []Processor

GetProcessors returns all processors that can handle the event, in priority order

func (*ProcessorRegistry) Register

func (pr *ProcessorRegistry) Register(processor Processor)

Register adds a processor to the registry

func (*ProcessorRegistry) Unregister

func (pr *ProcessorRegistry) Unregister(name string)

Unregister removes a processor

type RouteRule

type RouteRule struct {
	Condition func(packet MediaPacket) bool
	Targets   []string // Transport IDs
	Strategy  RoutingStrategy
}

RouteRule defines routing rules

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router manages packet routing

func NewRouter

func NewRouter(defaultStrategy RoutingStrategy) *Router

NewRouter creates a new router

func (*Router) AddRule

func (r *Router) AddRule(rule RouteRule)

AddRule adds a routing rule

func (*Router) Route

func (r *Router) Route(packet MediaPacket, availableTransports []*TransportConnector) []*TransportConnector

Route determines where to send a packet

type RoutingStrategy

type RoutingStrategy int

RoutingStrategy defines how packets are routed

const (
	// StrategyBroadcast sends to all outputs
	StrategyBroadcast RoutingStrategy = iota
	// StrategyRoundRobin distributes across outputs
	StrategyRoundRobin
	// StrategyFirstAvailable uses first available output
	StrategyFirstAvailable
)

type SampleRateConverter

type SampleRateConverter interface {
	io.WriteCloser
	Samples() []byte
}

SampleRateConverter defines interface for converting sample rates

func DefaultResampler

func DefaultResampler(inputRate, outputRate int) SampleRateConverter

DefaultResampler creates the standard PCM16 rate converter (cubic interpolation).

func NewCubicInterpolatingConverter

func NewCubicInterpolatingConverter(sourceRate, targetRate int) SampleRateConverter

NewCubicInterpolatingConverter creates a converter with cubic interpolation (better quality)

func NewInterpolatingConverter

func NewInterpolatingConverter(sourceRate, targetRate int) SampleRateConverter

NewInterpolatingConverter creates a new interpolating converter with linear interpolation (fast)

type SessionHook

type SessionHook func(session *MediaSession)

type StateChange

type StateChange struct {
	State  string `json:"state"`
	Params []any  `json:"params,omitempty"`
}

func (*StateChange) SafeGetStr

func (s *StateChange) SafeGetStr(idx int) string

type StateChangeHandler

type StateChangeHandler func(event StateChange)

type StreamFormat

type StreamFormat struct {
	SampleRate    int
	BitDepth      int
	Channels      int
	FrameDuration time.Duration
}

type TextPacket

type TextPacket struct {
	PlayID         string    `json:"id,omitempty"`
	Text           string    `json:"text"`
	IsTranscribed  bool      `json:"isTranscribed"`
	IsLLMGenerated bool      `json:"isLLMGenerated"`
	IsPartial      bool      `json:"isPartial"`
	IsEnd          bool      `json:"isEnd"`
	Sequence       int       `json:"sequence"`
	StartAt        time.Time `json:"startAt"`
}

func (*TextPacket) Body

func (t *TextPacket) Body() []byte

func (*TextPacket) String

func (t *TextPacket) String() string

type TranscribingData

type TranscribingData struct {
	SenderName string        `json:"senderName"` // eg: tts.aws, asr.qcloud
	Duration   time.Duration `json:"duration"`   // total duration
	Source     MediaPacket   `json:"-"`          // last packet
	Result     any           `json:"result"`     // result
	Direction  string        `json:"direction"`  // direction
	DialogID   string        `json:"dialogID"`
}

func (*TranscribingData) MarshalJSON

func (d *TranscribingData) MarshalJSON() ([]byte, error)

func (TranscribingData) String

func (d TranscribingData) String() string

type TransportConnector

type TransportConnector struct {
	ID        string
	Transport MediaTransport
	Direction string // "input" or "output"
	Active    bool
	// contains filtered or unexported fields
}

TransportConnector represents a connection to a transport

func NewTransportConnector

func NewTransportConnector(id string, transport MediaTransport, direction string) *TransportConnector

NewTransportConnector creates a new transport connector

func (*TransportConnector) IsActive

func (tc *TransportConnector) IsActive() bool

IsActive checks if connector is active

func (*TransportConnector) SetActive

func (tc *TransportConnector) SetActive(active bool)

SetActive sets the active state

func (*TransportConnector) String

func (tc *TransportConnector) String() string

String returns string representation

type TransportManager

type TransportManager struct {
	// contains filtered or unexported fields
}

TransportManager manages transport connections

func (*TransportManager) String

func (tl *TransportManager) String() string

type TurnDetectionData

type TurnDetectionData struct {
	SenderName string `json:"senderName"`
	CostTime   int64  `json:"cost_time"`
	Status     string `json:"status"`
	Text       string `json:"text"`
	DialogID   string `json:"dialogID"`
}

type VideoCodec

type VideoCodec string

VideoCodec represents supported video codecs

const (
	// Uncompressed
	VideoCodecRaw VideoCodec = "raw"

	// H.26x family
	VideoCodecH261 VideoCodec = "h261"
	VideoCodecH263 VideoCodec = "h263"
	VideoCodecH264 VideoCodec = "h264"
	VideoCodecH265 VideoCodec = "h265"
	VideoCodecH266 VideoCodec = "h266"

	// VP family
	VideoCodecVP8 VideoCodec = "vp8"
	VideoCodecVP9 VideoCodec = "vp9"

	// AV family
	VideoCodecAV1 VideoCodec = "av1"

	// MPEG family
	VideoCodecMPEG1 VideoCodec = "mpeg1"
	VideoCodecMPEG2 VideoCodec = "mpeg2"
	VideoCodecMPEG4 VideoCodec = "mpeg4"

	// Proprietary
	VideoCodecWMV    VideoCodec = "wmv"
	VideoCodecRV     VideoCodec = "rv"
	VideoCodecProRes VideoCodec = "prores"
	VideoCodecDNxHD  VideoCodec = "dnxhd"

	// Older/Legacy
	VideoCodecSorenson VideoCodec = "sorenson"
	VideoCodecCinepak  VideoCodec = "cinepak"
)

type VideoProfile

type VideoProfile string

VideoProfile represents video codec profiles

const (
	// H.264 profiles
	VideoProfileH264_Baseline VideoProfile = "h264_baseline"
	VideoProfileH264_Main     VideoProfile = "h264_main"
	VideoProfileH264_High     VideoProfile = "h264_high"

	// H.265 profiles
	VideoProfileH265_Main      VideoProfile = "h265_main"
	VideoProfileH265_Main10    VideoProfile = "h265_main10"
	VideoProfileH265_MainStill VideoProfile = "h265_main_still"

	// VP9 profiles
	VideoProfileVP9_Profile0 VideoProfile = "vp9_profile0"
	VideoProfileVP9_Profile1 VideoProfile = "vp9_profile1"
	VideoProfileVP9_Profile2 VideoProfile = "vp9_profile2"
	VideoProfileVP9_Profile3 VideoProfile = "vp9_profile3"

	// AV1 profiles
	VideoProfileAV1_Main VideoProfile = "av1_main"
	VideoProfileAV1_High VideoProfile = "av1_high"
	VideoProfileAV1_Pro  VideoProfile = "av1_pro"
)

Directories

Path Synopsis
Package dsp houses lightweight digital-signal-processing helpers used by the media pipeline (RMS, energy, gain, etc.).
Package dsp houses lightweight digital-signal-processing helpers used by the media pipeline (RMS, energy, gain, etc.).
Package vad provides lightweight RMS-based voice activity hints for sip1 barge-in (interrupt TTS / local WAV toward the callee while synthetic audio is playing).
Package vad provides lightweight RMS-based voice activity hints for sip1 barge-in (interrupt TTS / local WAV toward the callee while synthetic audio is playing).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL