Documentation
¶
Index ¶
- func BuildBinaryMessage(msg BinaryMessage) ([]byte, error)
- type BinaryMessage
- type BinaryProtocolAdapter
- func (a *BinaryProtocolAdapter) Destroy()
- func (a *BinaryProtocolAdapter) HandleBinaryMessage(message []byte) error
- func (a *BinaryProtocolAdapter) OnHTTPRequest(handler func(id string, data []byte) error)
- func (a *BinaryProtocolAdapter) OnHTTPResponse(handler func(id string, data []byte) error)
- func (a *BinaryProtocolAdapter) OnTCPData(handler func(streamId string, data []byte) error) func()
- func (a *BinaryProtocolAdapter) RemoveDataChannelForStream(streamId string)
- func (a *BinaryProtocolAdapter) SendHTTPRequest(id string, data []byte) error
- func (a *BinaryProtocolAdapter) SendHTTPResponse(id string, data []byte) error
- func (a *BinaryProtocolAdapter) SendTCPData(streamId string, data []byte) error
- func (a *BinaryProtocolAdapter) SetDataChannel(dataConn *websocket.Conn, dataWriteMu *sync.Mutex)
- func (a *BinaryProtocolAdapter) SetDataChannelForStream(streamId string, dataConn *websocket.Conn, dataWriteMu *sync.Mutex)
- type CompressionAlgorithm
- type FlowController
- func (fc *FlowController) CanSend(streamId string, size int) bool
- func (fc *FlowController) Clear()
- func (fc *FlowController) GetWindowState(streamId string) *WindowState
- func (fc *FlowController) InitializeStream(streamId string, maxWindowSize ...int)
- func (fc *FlowController) OnAck(streamId string, ackedSize int)
- func (fc *FlowController) Receive(streamId string, size int) bool
- func (fc *FlowController) ReleaseReceiveWindow(streamId string, size int)
- func (fc *FlowController) RemoveStream(streamId string)
- func (fc *FlowController) Send(streamId string, size int) bool
- func (fc *FlowController) TrySend(streamId string, size int) bool
- type LegacyProtocolAdapter
- func (a *LegacyProtocolAdapter) Destroy()
- func (a *LegacyProtocolAdapter) HandleEvent(event string, payload interface{}) error
- func (a *LegacyProtocolAdapter) OnHTTPRequest(handler func(id string, data []byte) error)
- func (a *LegacyProtocolAdapter) OnHTTPResponse(handler func(id string, data []byte) error)
- func (a *LegacyProtocolAdapter) OnTCPData(handler func(streamId string, data []byte) error) func()
- func (a *LegacyProtocolAdapter) SendHTTPRequest(id string, data []byte) error
- func (a *LegacyProtocolAdapter) SendHTTPResponse(id string, data []byte) error
- func (a *LegacyProtocolAdapter) SendTCPData(streamId string, data []byte) error
- type MessageFlags
- type MessageType
- type ProtocolAdapter
- type ProtocolAdapterFactory
- type Stream
- type StreamChunk
- type StreamManager
- func (sm *StreamManager) AddChunk(streamId string, sequence int, data []byte, isLast bool)
- func (sm *StreamManager) CreateStream(streamId string, onComplete func(data []byte), onError func(error error)) *Stream
- func (sm *StreamManager) Destroy()
- func (sm *StreamManager) GetStream(streamId string) *Stream
- func (sm *StreamManager) RemoveStream(streamId string)
- func (sm *StreamManager) SplitIntoChunks(data []byte, chunkSize int) [][]byte
- type StreamState
- type WindowState
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildBinaryMessage ¶
func BuildBinaryMessage(msg BinaryMessage) ([]byte, error)
BuildBinaryMessage builds a binary message from the struct
Types ¶
type BinaryMessage ¶
type BinaryMessage struct {
Type MessageType
StreamID string
Sequence uint32
Flags MessageFlags
Data []byte
}
BinaryMessage represents a binary message Format: [MessageType(1 byte)] + [StreamIDLength(1 byte)] + [StreamID(variable)] + [Sequence(4 bytes)] + [Flags(1 byte)] + [DataLength(4 bytes)] + [Data(variable)]
func ParseBinaryMessage ¶
func ParseBinaryMessage(buffer []byte) (*BinaryMessage, error)
ParseBinaryMessage parses a binary message from bytes
type BinaryProtocolAdapter ¶
type BinaryProtocolAdapter struct {
// contains filtered or unexported fields
}
BinaryProtocolAdapter implements the binary protocol adapter
func NewBinaryProtocolAdapter ¶
func NewBinaryProtocolAdapter(conn *websocket.Conn, capabilities *client.Capabilities, isClient bool) *BinaryProtocolAdapter
NewBinaryProtocolAdapter creates a new binary protocol adapter
func (*BinaryProtocolAdapter) Destroy ¶
func (a *BinaryProtocolAdapter) Destroy()
Destroy cleans up resources
func (*BinaryProtocolAdapter) HandleBinaryMessage ¶
func (a *BinaryProtocolAdapter) HandleBinaryMessage(message []byte) error
HandleBinaryMessage handles a received binary message (public method for external callers)
func (*BinaryProtocolAdapter) OnHTTPRequest ¶
func (a *BinaryProtocolAdapter) OnHTTPRequest(handler func(id string, data []byte) error)
OnHTTPRequest registers an HTTP request handler
func (*BinaryProtocolAdapter) OnHTTPResponse ¶
func (a *BinaryProtocolAdapter) OnHTTPResponse(handler func(id string, data []byte) error)
OnHTTPResponse registers an HTTP response handler
func (*BinaryProtocolAdapter) OnTCPData ¶
func (a *BinaryProtocolAdapter) OnTCPData(handler func(streamId string, data []byte) error) func()
OnTCPData registers a TCP data handler
func (*BinaryProtocolAdapter) RemoveDataChannelForStream ¶
func (a *BinaryProtocolAdapter) RemoveDataChannelForStream(streamId string)
RemoveDataChannelForStream removes the data channel mapping for a stream Also cleans up flow control window and stream manager resources for the stream
func (*BinaryProtocolAdapter) SendHTTPRequest ¶
func (a *BinaryProtocolAdapter) SendHTTPRequest(id string, data []byte) error
SendHTTPRequest sends an HTTP request
func (*BinaryProtocolAdapter) SendHTTPResponse ¶
func (a *BinaryProtocolAdapter) SendHTTPResponse(id string, data []byte) error
SendHTTPResponse sends an HTTP response
func (*BinaryProtocolAdapter) SendTCPData ¶
func (a *BinaryProtocolAdapter) SendTCPData(streamId string, data []byte) error
SendTCPData sends TCP data For new protocol, if data channel is available, sends via data channel; otherwise uses monitor channel
func (*BinaryProtocolAdapter) SetDataChannel ¶
func (a *BinaryProtocolAdapter) SetDataChannel(dataConn *websocket.Conn, dataWriteMu *sync.Mutex)
SetDataChannel sets the shared data channel connection (legacy fallback)
func (*BinaryProtocolAdapter) SetDataChannelForStream ¶
func (a *BinaryProtocolAdapter) SetDataChannelForStream(streamId string, dataConn *websocket.Conn, dataWriteMu *sync.Mutex)
SetDataChannelForStream sets the data channel connection for a specific stream (new protocol)
type CompressionAlgorithm ¶
type CompressionAlgorithm string
CompressionAlgorithm represents compression algorithm type
const ( CompressionBrotli CompressionAlgorithm = "brotli" CompressionGzip CompressionAlgorithm = "gzip" CompressionNone CompressionAlgorithm = "none" )
type FlowController ¶
type FlowController struct {
// contains filtered or unexported fields
}
FlowController implements backpressure control mechanism
func NewFlowController ¶
func NewFlowController(defaultMaxWindowSize int, onBackpressure func(streamId string, pause bool)) *FlowController
NewFlowController creates a new flow controller
func (*FlowController) CanSend ¶
func (fc *FlowController) CanSend(streamId string, size int) bool
CanSend checks if data can be sent (read-only check, no state modification)
func (*FlowController) GetWindowState ¶
func (fc *FlowController) GetWindowState(streamId string) *WindowState
GetWindowState gets the window state for a stream
func (*FlowController) InitializeStream ¶
func (fc *FlowController) InitializeStream(streamId string, maxWindowSize ...int)
InitializeStream initializes the window for a stream
func (*FlowController) OnAck ¶
func (fc *FlowController) OnAck(streamId string, ackedSize int)
OnAck receives acknowledgment (updates send window)
func (*FlowController) Receive ¶
func (fc *FlowController) Receive(streamId string, size int) bool
Receive receives data (updates receive window)
func (*FlowController) ReleaseReceiveWindow ¶
func (fc *FlowController) ReleaseReceiveWindow(streamId string, size int)
ReleaseReceiveWindow releases receive window (data has been processed)
func (*FlowController) RemoveStream ¶
func (fc *FlowController) RemoveStream(streamId string)
RemoveStream removes the window for a stream
type LegacyProtocolAdapter ¶
type LegacyProtocolAdapter struct {
// contains filtered or unexported fields
}
LegacyProtocolAdapter implements the legacy protocol (JSON + Base64) Maintains compatibility with old clients
func NewLegacyProtocolAdapter ¶
func NewLegacyProtocolAdapter(conn *websocket.Conn, isClient bool) *LegacyProtocolAdapter
NewLegacyProtocolAdapter creates a new legacy protocol adapter
func (*LegacyProtocolAdapter) Destroy ¶
func (a *LegacyProtocolAdapter) Destroy()
Destroy cleans up resources
func (*LegacyProtocolAdapter) HandleEvent ¶
func (a *LegacyProtocolAdapter) HandleEvent(event string, payload interface{}) error
HandleEvent handles incoming events (public method for external callers)
func (*LegacyProtocolAdapter) OnHTTPRequest ¶
func (a *LegacyProtocolAdapter) OnHTTPRequest(handler func(id string, data []byte) error)
OnHTTPRequest registers an HTTP request handler
func (*LegacyProtocolAdapter) OnHTTPResponse ¶
func (a *LegacyProtocolAdapter) OnHTTPResponse(handler func(id string, data []byte) error)
OnHTTPResponse registers an HTTP response handler
func (*LegacyProtocolAdapter) OnTCPData ¶
func (a *LegacyProtocolAdapter) OnTCPData(handler func(streamId string, data []byte) error) func()
OnTCPData registers a TCP data handler
func (*LegacyProtocolAdapter) SendHTTPRequest ¶
func (a *LegacyProtocolAdapter) SendHTTPRequest(id string, data []byte) error
SendHTTPRequest sends an HTTP request Legacy protocol: Base64 encode -> compress -> JSON
func (*LegacyProtocolAdapter) SendHTTPResponse ¶
func (a *LegacyProtocolAdapter) SendHTTPResponse(id string, data []byte) error
SendHTTPResponse sends an HTTP response Legacy protocol: Base64 encode -> compress -> JSON
func (*LegacyProtocolAdapter) SendTCPData ¶
func (a *LegacyProtocolAdapter) SendTCPData(streamId string, data []byte) error
SendTCPData sends TCP data Legacy protocol doesn't support TCP over WebSocket
type MessageFlags ¶
type MessageFlags uint8
MessageFlags represents message flags
const ( MessageFlagFIN MessageFlags = 0x01 // Stream end MessageFlagACK MessageFlags = 0x02 // Acknowledgment MessageFlagBackpressure MessageFlags = 0x04 // Backpressure )
type MessageType ¶
type MessageType uint8
MessageType represents the type of binary message
const ( MessageTypeControl MessageType = 0x00 // Control message (auth, heartbeat, etc.) MessageTypeHTTPRequest MessageType = 0x01 // HTTP request MessageTypeHTTPResponse MessageType = 0x02 // HTTP response MessageTypeTCPData MessageType = 0x03 // TCP data MessageTypeTCPOpen MessageType = 0x04 // TCP connection open MessageTypeTCPClose MessageType = 0x05 // TCP connection close MessageTypeFlowControl MessageType = 0x06 // Flow control message )
type ProtocolAdapter ¶
type ProtocolAdapter interface {
// SendHTTPRequest sends an HTTP request
SendHTTPRequest(id string, data []byte) error
// SendHTTPResponse sends an HTTP response
SendHTTPResponse(id string, data []byte) error
// SendTCPData sends TCP data
SendTCPData(streamId string, data []byte) error
// OnHTTPRequest registers an HTTP request handler
OnHTTPRequest(handler func(id string, data []byte) error)
// OnHTTPResponse registers an HTTP response handler
OnHTTPResponse(handler func(id string, data []byte) error)
// OnTCPData registers a TCP data handler
// Returns an unsubscribe function
OnTCPData(handler func(streamId string, data []byte) error) func()
// Destroy cleans up resources
Destroy()
}
ProtocolAdapter interface for protocol adaptation Used to unify message transmission handling across different protocol versions
func Create ¶
func Create(conn *websocket.Conn, capabilities *client.Capabilities, isClient bool) ProtocolAdapter
Create is a convenience function to create a protocol adapter
type ProtocolAdapterFactory ¶
type ProtocolAdapterFactory struct{}
ProtocolAdapterFactory creates protocol adapters based on capability negotiation
func NewProtocolAdapterFactory ¶
func NewProtocolAdapterFactory() *ProtocolAdapterFactory
NewProtocolAdapterFactory creates a new protocol adapter factory
func (*ProtocolAdapterFactory) Create ¶
func (f *ProtocolAdapterFactory) Create(conn *websocket.Conn, capabilities *client.Capabilities, isClient bool) ProtocolAdapter
Create creates a protocol adapter based on capabilities If capabilities is nil or has no flags, returns a LegacyProtocolAdapter Otherwise, returns a BinaryProtocolAdapter
type Stream ¶
type Stream struct {
ID string
State StreamState
Chunks map[uint32]*StreamChunk
ExpectedSequence uint32
TotalChunks int
OnComplete func(data []byte)
OnError func(error error)
CompletedData []byte
// contains filtered or unexported fields
}
Stream represents a data stream
type StreamChunk ¶
StreamChunk represents a chunk of stream data
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager manages multiple data streams
func NewStreamManager ¶
func NewStreamManager(defaultChunkSize int) *StreamManager
NewStreamManager creates a new stream manager
func (*StreamManager) AddChunk ¶
func (sm *StreamManager) AddChunk(streamId string, sequence int, data []byte, isLast bool)
AddChunk adds a chunk to a stream
func (*StreamManager) CreateStream ¶
func (sm *StreamManager) CreateStream(streamId string, onComplete func(data []byte), onError func(error error)) *Stream
CreateStream creates a new stream
func (*StreamManager) Destroy ¶
func (sm *StreamManager) Destroy()
Destroy destroys the stream manager
func (*StreamManager) GetStream ¶
func (sm *StreamManager) GetStream(streamId string) *Stream
GetStream gets a stream by ID
func (*StreamManager) RemoveStream ¶
func (sm *StreamManager) RemoveStream(streamId string)
RemoveStream removes a stream
func (*StreamManager) SplitIntoChunks ¶
func (sm *StreamManager) SplitIntoChunks(data []byte, chunkSize int) [][]byte
SplitIntoChunks splits data into chunks
type StreamState ¶
type StreamState string
StreamState represents the state of a stream
const ( StreamStateInitializing StreamState = "initializing" StreamStateActive StreamState = "active" StreamStatePaused StreamState = "paused" StreamStateCompleted StreamState = "completed" StreamStateError StreamState = "error" )
type WindowState ¶
type WindowState struct {
SendWindow int // Send window size (bytes sent but not acknowledged)
ReceiveWindow int // Receive window size (bytes that can be received)
MaxWindowSize int // Maximum window size
}
WindowState represents the state of a flow control window