Documentation
¶
Index ¶
- Variables
- func BuildBinaryMessage(msg BinaryMessage) ([]byte, error)
- func DecompressBinaryPayloadForCapabilities(caps *client.Capabilities, data []byte) ([]byte, error)
- type BinaryMessage
- type BinaryProtocolAdapter
- func (a *BinaryProtocolAdapter) Destroy()
- func (a *BinaryProtocolAdapter) HandleBinaryMessage(message []byte) error
- func (a *BinaryProtocolAdapter) NegotiatedFlags() int
- func (a *BinaryProtocolAdapter) OnHTTPRequest(handler func(id string, data []byte) error)
- func (a *BinaryProtocolAdapter) OnHTTPRequestBody(handler func(id string, chunk []byte, fin bool) error)
- func (a *BinaryProtocolAdapter) OnHTTPRequestHead(handler func(id string, head []byte, fin bool) error)
- func (a *BinaryProtocolAdapter) OnHTTPResponse(handler func(id string, data []byte) error)
- func (a *BinaryProtocolAdapter) OnHTTPResponseBody(handler func(id string, chunk []byte, fin bool) error)
- func (a *BinaryProtocolAdapter) OnHTTPResponseHead(handler func(id string, head []byte, fin bool) error)
- func (a *BinaryProtocolAdapter) OnTCPClose(handler func(streamId string) error) func()
- func (a *BinaryProtocolAdapter) OnTCPData(handler func(streamId string, data []byte) error) func()
- func (a *BinaryProtocolAdapter) RemoveDataChannelForStream(streamId string)
- func (a *BinaryProtocolAdapter) SendHTTPRequest(id string, data []byte) error
- func (a *BinaryProtocolAdapter) SendHTTPRequestBody(id string, chunk []byte, fin bool) error
- func (a *BinaryProtocolAdapter) SendHTTPRequestHead(id string, head []byte, fin bool) error
- func (a *BinaryProtocolAdapter) SendHTTPResponse(id string, data []byte) error
- func (a *BinaryProtocolAdapter) SendHTTPResponseBody(id string, chunk []byte, fin bool) error
- func (a *BinaryProtocolAdapter) SendHTTPResponseHead(id string, head []byte, fin bool) error
- func (a *BinaryProtocolAdapter) SendTCPData(streamId string, data []byte) error
- func (a *BinaryProtocolAdapter) SetConnWriteMu(mu *sync.Mutex)
- func (a *BinaryProtocolAdapter) SetDataChannel(dataConn *websocket.Conn, dataWriteMu *sync.Mutex)
- func (a *BinaryProtocolAdapter) SetDataChannelForStream(streamId string, dataConn *websocket.Conn, dataWriteMu *sync.Mutex)
- type CompressionAlgorithm
- type FlowController
- func (fc *FlowController) CanSend(streamId string, size int) bool
- func (fc *FlowController) Clear()
- func (fc *FlowController) GetWindowState(streamId string) *WindowState
- func (fc *FlowController) InitializeStream(streamId string, maxWindowSize ...int)
- func (fc *FlowController) OnAck(streamId string, ackedSize int)
- func (fc *FlowController) Receive(streamId string, size int) bool
- func (fc *FlowController) ReleaseReceiveWindow(streamId string, size int)
- func (fc *FlowController) RemoveStream(streamId string)
- func (fc *FlowController) Send(streamId string, size int) bool
- func (fc *FlowController) TrySend(streamId string, size int) bool
- type LegacyProtocolAdapter
- func (a *LegacyProtocolAdapter) Destroy()
- func (a *LegacyProtocolAdapter) HandleEvent(event string, payload interface{}) error
- func (a *LegacyProtocolAdapter) NegotiatedFlags() int
- func (a *LegacyProtocolAdapter) OnHTTPRequest(handler func(id string, data []byte) error)
- func (a *LegacyProtocolAdapter) OnHTTPRequestBody(handler func(id string, chunk []byte, fin bool) error)
- func (a *LegacyProtocolAdapter) OnHTTPRequestHead(handler func(id string, head []byte, fin bool) error)
- func (a *LegacyProtocolAdapter) OnHTTPResponse(handler func(id string, data []byte) error)
- func (a *LegacyProtocolAdapter) OnHTTPResponseBody(handler func(id string, chunk []byte, fin bool) error)
- func (a *LegacyProtocolAdapter) OnHTTPResponseHead(handler func(id string, head []byte, fin bool) error)
- func (a *LegacyProtocolAdapter) OnTCPClose(_ func(streamId string) error) func()
- func (a *LegacyProtocolAdapter) OnTCPData(handler func(streamId string, data []byte) error) func()
- func (a *LegacyProtocolAdapter) SendHTTPRequest(id string, data []byte) error
- func (a *LegacyProtocolAdapter) SendHTTPRequestBody(id string, chunk []byte, fin bool) error
- func (a *LegacyProtocolAdapter) SendHTTPRequestHead(id string, head []byte, fin bool) error
- func (a *LegacyProtocolAdapter) SendHTTPResponse(id string, data []byte) error
- func (a *LegacyProtocolAdapter) SendHTTPResponseBody(id string, chunk []byte, fin bool) error
- func (a *LegacyProtocolAdapter) SendHTTPResponseHead(id string, head []byte, fin bool) error
- func (a *LegacyProtocolAdapter) SendTCPData(streamId string, data []byte) error
- func (a *LegacyProtocolAdapter) SetConnWriteMu(mu *sync.Mutex)
- func (a *LegacyProtocolAdapter) SetLegacyPeerVersion(version string)
- type MessageFlags
- type MessageType
- type ProtocolAdapter
- type ProtocolAdapterFactory
- type Stream
- type StreamChunk
- type StreamManager
- func (sm *StreamManager) AddChunk(streamId string, sequence int, data []byte, isLast bool)
- func (sm *StreamManager) CreateStream(streamId string, onComplete func(data []byte), onError func(error error)) *Stream
- func (sm *StreamManager) Destroy()
- func (sm *StreamManager) EnsureStream(streamId string, onComplete func(data []byte), onError func(error error)) *Stream
- func (sm *StreamManager) GetStream(streamId string) *Stream
- func (sm *StreamManager) RemoveStream(streamId string)
- func (sm *StreamManager) SplitIntoChunks(data []byte, chunkSize int) [][]byte
- type StreamState
- type WindowState
Constants ¶
This section is empty.
Variables ¶
var ErrSemanticHTTPNotSupported = errors.New("semantic HTTP streaming not supported")
ErrSemanticHTTPNotSupported is returned by legacy adapters for semantic streaming sends.
Functions ¶
func BuildBinaryMessage ¶
func BuildBinaryMessage(msg BinaryMessage) ([]byte, error)
BuildBinaryMessage builds a binary message from the struct
func DecompressBinaryPayloadForCapabilities ¶ added in v1.26.15
func DecompressBinaryPayloadForCapabilities(caps *client.Capabilities, data []byte) ([]byte, error)
DecompressBinaryPayloadForCapabilities decompresses a binary frame payload when compression is negotiated (e.g. semantic HTTP head).
Types ¶
type BinaryMessage ¶
type BinaryMessage struct {
Type MessageType
StreamID string
Sequence uint32
Flags MessageFlags
Data []byte
}
BinaryMessage represents a binary message Format: [MessageType(1 byte)] + [StreamIDLength(1 byte)] + [StreamID(variable)] + [Sequence(4 bytes)] + [Flags(1 byte)] + [DataLength(4 bytes)] + [Data(variable)]
func ParseBinaryMessage ¶
func ParseBinaryMessage(buffer []byte) (*BinaryMessage, error)
ParseBinaryMessage parses a binary message from bytes
type BinaryProtocolAdapter ¶
type BinaryProtocolAdapter struct {
// contains filtered or unexported fields
}
BinaryProtocolAdapter implements the binary protocol adapter
func NewBinaryProtocolAdapter ¶
func NewBinaryProtocolAdapter(conn *websocket.Conn, capabilities *client.Capabilities, isClient bool) *BinaryProtocolAdapter
NewBinaryProtocolAdapter creates a new binary protocol adapter
func (*BinaryProtocolAdapter) Destroy ¶
func (a *BinaryProtocolAdapter) Destroy()
Destroy cleans up resources
func (*BinaryProtocolAdapter) HandleBinaryMessage ¶
func (a *BinaryProtocolAdapter) HandleBinaryMessage(message []byte) error
HandleBinaryMessage handles a received binary message (public method for external callers)
func (*BinaryProtocolAdapter) NegotiatedFlags ¶ added in v1.26.15
func (a *BinaryProtocolAdapter) NegotiatedFlags() int
NegotiatedFlags returns capability flags negotiated for this adapter (0 if nil caps).
func (*BinaryProtocolAdapter) OnHTTPRequest ¶
func (a *BinaryProtocolAdapter) OnHTTPRequest(handler func(id string, data []byte) error)
OnHTTPRequest registers an HTTP request handler
func (*BinaryProtocolAdapter) OnHTTPRequestBody ¶ added in v1.26.15
func (a *BinaryProtocolAdapter) OnHTTPRequestBody(handler func(id string, chunk []byte, fin bool) error)
OnHTTPRequestBody registers a handler for semantic-streaming HTTP request body chunks.
func (*BinaryProtocolAdapter) OnHTTPRequestHead ¶ added in v1.26.15
func (a *BinaryProtocolAdapter) OnHTTPRequestHead(handler func(id string, head []byte, fin bool) error)
OnHTTPRequestHead registers a handler for semantic-streaming HTTP request headers (server -> client).
func (*BinaryProtocolAdapter) OnHTTPResponse ¶
func (a *BinaryProtocolAdapter) OnHTTPResponse(handler func(id string, data []byte) error)
OnHTTPResponse registers an HTTP response handler
func (*BinaryProtocolAdapter) OnHTTPResponseBody ¶ added in v1.26.15
func (a *BinaryProtocolAdapter) OnHTTPResponseBody(handler func(id string, chunk []byte, fin bool) error)
OnHTTPResponseBody registers a handler for semantic-streaming HTTP response body chunks.
func (*BinaryProtocolAdapter) OnHTTPResponseHead ¶ added in v1.26.15
func (a *BinaryProtocolAdapter) OnHTTPResponseHead(handler func(id string, head []byte, fin bool) error)
OnHTTPResponseHead registers a handler for semantic-streaming HTTP response headers (client -> server).
func (*BinaryProtocolAdapter) OnTCPClose ¶ added in v1.27.0
func (a *BinaryProtocolAdapter) OnTCPClose(handler func(streamId string) error) func()
OnTCPClose registers a handler when the client signals that a TCP stream must be torn down.
func (*BinaryProtocolAdapter) OnTCPData ¶
func (a *BinaryProtocolAdapter) OnTCPData(handler func(streamId string, data []byte) error) func()
OnTCPData registers a TCP data handler
func (*BinaryProtocolAdapter) RemoveDataChannelForStream ¶
func (a *BinaryProtocolAdapter) RemoveDataChannelForStream(streamId string)
RemoveDataChannelForStream removes the data channel mapping for a stream Also cleans up flow control window and stream manager resources for the stream
func (*BinaryProtocolAdapter) SendHTTPRequest ¶
func (a *BinaryProtocolAdapter) SendHTTPRequest(id string, data []byte) error
SendHTTPRequest sends an HTTP request
func (*BinaryProtocolAdapter) SendHTTPRequestBody ¶ added in v1.26.15
func (a *BinaryProtocolAdapter) SendHTTPRequestBody(id string, chunk []byte, fin bool) error
SendHTTPRequestBody sends a tunneled HTTP request body chunk.
func (*BinaryProtocolAdapter) SendHTTPRequestHead ¶ added in v1.26.15
func (a *BinaryProtocolAdapter) SendHTTPRequestHead(id string, head []byte, fin bool) error
SendHTTPRequestHead sends tunneled HTTP request headers (semantic streaming).
func (*BinaryProtocolAdapter) SendHTTPResponse ¶
func (a *BinaryProtocolAdapter) SendHTTPResponse(id string, data []byte) error
SendHTTPResponse sends an HTTP response
func (*BinaryProtocolAdapter) SendHTTPResponseBody ¶ added in v1.26.15
func (a *BinaryProtocolAdapter) SendHTTPResponseBody(id string, chunk []byte, fin bool) error
SendHTTPResponseBody sends a tunneled HTTP response body chunk.
func (*BinaryProtocolAdapter) SendHTTPResponseHead ¶ added in v1.26.15
func (a *BinaryProtocolAdapter) SendHTTPResponseHead(id string, head []byte, fin bool) error
SendHTTPResponseHead sends tunneled HTTP response headers (semantic streaming).
func (*BinaryProtocolAdapter) SendTCPData ¶
func (a *BinaryProtocolAdapter) SendTCPData(streamId string, data []byte) error
SendTCPData sends TCP data For new protocol, if data channel is available, sends via data channel; otherwise uses monitor channel
func (*BinaryProtocolAdapter) SetConnWriteMu ¶ added in v1.26.14
func (a *BinaryProtocolAdapter) SetConnWriteMu(mu *sync.Mutex)
SetConnWriteMu sets the mutex used to serialize writes on the monitor connection.
func (*BinaryProtocolAdapter) SetDataChannel ¶
func (a *BinaryProtocolAdapter) SetDataChannel(dataConn *websocket.Conn, dataWriteMu *sync.Mutex)
SetDataChannel sets the shared data channel connection (legacy fallback)
func (*BinaryProtocolAdapter) SetDataChannelForStream ¶
func (a *BinaryProtocolAdapter) SetDataChannelForStream(streamId string, dataConn *websocket.Conn, dataWriteMu *sync.Mutex)
SetDataChannelForStream sets the data channel connection for a specific stream (new protocol)
type CompressionAlgorithm ¶
type CompressionAlgorithm string
CompressionAlgorithm represents compression algorithm type
const ( CompressionBrotli CompressionAlgorithm = "brotli" CompressionGzip CompressionAlgorithm = "gzip" CompressionNone CompressionAlgorithm = "none" )
type FlowController ¶
type FlowController struct {
// contains filtered or unexported fields
}
FlowController implements backpressure control mechanism
func NewFlowController ¶
func NewFlowController(defaultMaxWindowSize int, onBackpressure func(streamId string, pause bool)) *FlowController
NewFlowController creates a new flow controller
func (*FlowController) CanSend ¶
func (fc *FlowController) CanSend(streamId string, size int) bool
CanSend is a read-only snapshot of whether a send might fit the window. Do not use CanSend followed by Send/TrySend for admission control — use TrySend only so check and window update happen atomically under the same lock.
func (*FlowController) GetWindowState ¶
func (fc *FlowController) GetWindowState(streamId string) *WindowState
GetWindowState gets the window state for a stream
func (*FlowController) InitializeStream ¶
func (fc *FlowController) InitializeStream(streamId string, maxWindowSize ...int)
InitializeStream initializes the window for a stream
func (*FlowController) OnAck ¶
func (fc *FlowController) OnAck(streamId string, ackedSize int)
OnAck receives acknowledgment (updates send window)
func (*FlowController) Receive ¶
func (fc *FlowController) Receive(streamId string, size int) bool
Receive receives data (updates receive window)
func (*FlowController) ReleaseReceiveWindow ¶
func (fc *FlowController) ReleaseReceiveWindow(streamId string, size int)
ReleaseReceiveWindow releases receive window (data has been processed)
func (*FlowController) RemoveStream ¶
func (fc *FlowController) RemoveStream(streamId string)
RemoveStream removes the window for a stream
type LegacyProtocolAdapter ¶
type LegacyProtocolAdapter struct {
// contains filtered or unexported fields
}
LegacyProtocolAdapter implements the legacy protocol (JSON + Base64) Maintains compatibility with old clients
func NewLegacyProtocolAdapter ¶
func NewLegacyProtocolAdapter(conn *websocket.Conn, isClient bool) *LegacyProtocolAdapter
NewLegacyProtocolAdapter creates a new legacy protocol adapter
func (*LegacyProtocolAdapter) Destroy ¶
func (a *LegacyProtocolAdapter) Destroy()
Destroy cleans up resources
func (*LegacyProtocolAdapter) HandleEvent ¶
func (a *LegacyProtocolAdapter) HandleEvent(event string, payload interface{}) error
HandleEvent handles incoming events (public method for external callers)
func (*LegacyProtocolAdapter) NegotiatedFlags ¶ added in v1.26.15
func (a *LegacyProtocolAdapter) NegotiatedFlags() int
NegotiatedFlags returns 0 for legacy protocol.
func (*LegacyProtocolAdapter) OnHTTPRequest ¶
func (a *LegacyProtocolAdapter) OnHTTPRequest(handler func(id string, data []byte) error)
OnHTTPRequest registers an HTTP request handler
func (*LegacyProtocolAdapter) OnHTTPRequestBody ¶ added in v1.26.15
func (a *LegacyProtocolAdapter) OnHTTPRequestBody(handler func(id string, chunk []byte, fin bool) error)
func (*LegacyProtocolAdapter) OnHTTPRequestHead ¶ added in v1.26.15
func (a *LegacyProtocolAdapter) OnHTTPRequestHead(handler func(id string, head []byte, fin bool) error)
func (*LegacyProtocolAdapter) OnHTTPResponse ¶
func (a *LegacyProtocolAdapter) OnHTTPResponse(handler func(id string, data []byte) error)
OnHTTPResponse registers an HTTP response handler
func (*LegacyProtocolAdapter) OnHTTPResponseBody ¶ added in v1.26.15
func (a *LegacyProtocolAdapter) OnHTTPResponseBody(handler func(id string, chunk []byte, fin bool) error)
func (*LegacyProtocolAdapter) OnHTTPResponseHead ¶ added in v1.26.15
func (a *LegacyProtocolAdapter) OnHTTPResponseHead(handler func(id string, head []byte, fin bool) error)
func (*LegacyProtocolAdapter) OnTCPClose ¶ added in v1.27.0
func (a *LegacyProtocolAdapter) OnTCPClose(_ func(streamId string) error) func()
OnTCPClose is a no-op for legacy protocol (TCP-over-WS close signals are not used).
func (*LegacyProtocolAdapter) OnTCPData ¶
func (a *LegacyProtocolAdapter) OnTCPData(handler func(streamId string, data []byte) error) func()
OnTCPData registers a TCP data handler
func (*LegacyProtocolAdapter) SendHTTPRequest ¶
func (a *LegacyProtocolAdapter) SendHTTPRequest(id string, data []byte) error
SendHTTPRequest sends an HTTP request Legacy protocol: Base64 encode -> compress -> JSON
func (*LegacyProtocolAdapter) SendHTTPRequestBody ¶ added in v1.26.15
func (a *LegacyProtocolAdapter) SendHTTPRequestBody(id string, chunk []byte, fin bool) error
func (*LegacyProtocolAdapter) SendHTTPRequestHead ¶ added in v1.26.15
func (a *LegacyProtocolAdapter) SendHTTPRequestHead(id string, head []byte, fin bool) error
func (*LegacyProtocolAdapter) SendHTTPResponse ¶
func (a *LegacyProtocolAdapter) SendHTTPResponse(id string, data []byte) error
SendHTTPResponse sends an HTTP response Legacy protocol: Base64 encode -> compress -> JSON
func (*LegacyProtocolAdapter) SendHTTPResponseBody ¶ added in v1.26.15
func (a *LegacyProtocolAdapter) SendHTTPResponseBody(id string, chunk []byte, fin bool) error
func (*LegacyProtocolAdapter) SendHTTPResponseHead ¶ added in v1.26.15
func (a *LegacyProtocolAdapter) SendHTTPResponseHead(id string, head []byte, fin bool) error
func (*LegacyProtocolAdapter) SendTCPData ¶
func (a *LegacyProtocolAdapter) SendTCPData(streamId string, data []byte) error
SendTCPData sends TCP data Legacy protocol doesn't support TCP over WebSocket
func (*LegacyProtocolAdapter) SetConnWriteMu ¶ added in v1.26.14
func (a *LegacyProtocolAdapter) SetConnWriteMu(mu *sync.Mutex)
SetConnWriteMu sets the mutex used to serialize writes on the monitor connection.
func (*LegacyProtocolAdapter) SetLegacyPeerVersion ¶ added in v1.28.3
func (a *LegacyProtocolAdapter) SetLegacyPeerVersion(version string)
SetLegacyPeerVersion switches legacy wire encoding to match historical Node client behavior: - < 1.3.0: plain base64(raw HTTP) - >= 1.3.0: base64(brotli(base64(raw HTTP)))
type MessageFlags ¶
type MessageFlags uint8
MessageFlags represents message flags
const ( MessageFlagFIN MessageFlags = 0x01 // Stream end MessageFlagACK MessageFlags = 0x02 // Acknowledgment MessageFlagBackpressure MessageFlags = 0x04 // Backpressure )
type MessageType ¶
type MessageType uint8
MessageType represents the type of binary message
const ( MessageTypeControl MessageType = 0x00 // Control message (auth, heartbeat, etc.) MessageTypeHTTPRequest MessageType = 0x01 // HTTP request MessageTypeHTTPResponse MessageType = 0x02 // HTTP response MessageTypeTCPData MessageType = 0x03 // TCP data MessageTypeTCPOpen MessageType = 0x04 // TCP connection open MessageTypeTCPClose MessageType = 0x05 // TCP connection close MessageTypeFlowControl MessageType = 0x06 // Flow control message MessageTypeHTTPRequestHead MessageType = 0x07 // HTTP request headers only (semantic streaming) MessageTypeHTTPRequestBody MessageType = 0x08 // HTTP request body chunk MessageTypeHTTPResponseHead MessageType = 0x09 // HTTP response headers only MessageTypeHTTPResponseBody MessageType = 0x0a // HTTP response body chunk )
type ProtocolAdapter ¶
type ProtocolAdapter interface {
// SendHTTPRequest sends an HTTP request
SendHTTPRequest(id string, data []byte) error
// SendHTTPResponse sends an HTTP response
SendHTTPResponse(id string, data []byte) error
// SendHTTPRequestHead sends HTTP request headers only (semantic body streaming).
SendHTTPRequestHead(id string, head []byte, fin bool) error
// SendHTTPRequestBody sends one HTTP request body chunk (last chunk sets fin).
SendHTTPRequestBody(id string, chunk []byte, fin bool) error
// SendHTTPResponseHead sends HTTP response headers only.
SendHTTPResponseHead(id string, head []byte, fin bool) error
// SendHTTPResponseBody sends one HTTP response body chunk.
SendHTTPResponseBody(id string, chunk []byte, fin bool) error
// SendTCPData sends TCP data
SendTCPData(streamId string, data []byte) error
// OnHTTPRequest registers an HTTP request handler
OnHTTPRequest(handler func(id string, data []byte) error)
// OnHTTPResponse registers an HTTP response handler
OnHTTPResponse(handler func(id string, data []byte) error)
OnHTTPRequestHead(handler func(id string, head []byte, fin bool) error)
OnHTTPRequestBody(handler func(id string, chunk []byte, fin bool) error)
OnHTTPResponseHead(handler func(id string, head []byte, fin bool) error)
OnHTTPResponseBody(handler func(id string, chunk []byte, fin bool) error)
// OnTCPData registers a TCP data handler
// Returns an unsubscribe function
OnTCPData(handler func(streamId string, data []byte) error) func()
// OnTCPClose registers a handler for client-side TCP stream teardown (e.g. upstream dial failed).
// Legacy adapters return a no-op unsubscribe.
OnTCPClose(handler func(streamId string) error) func()
// Destroy cleans up resources
Destroy()
// SetConnWriteMu serializes WriteMessage on the monitor WebSocket when non-nil (server-side; required by gorilla/websocket).
SetConnWriteMu(mu *sync.Mutex)
// NegotiatedFlags returns the bitmask from capability negotiation (0 for legacy).
NegotiatedFlags() int
}
ProtocolAdapter interface for protocol adaptation Used to unify message transmission handling across different protocol versions
func Create ¶
func Create(conn *websocket.Conn, capabilities *client.Capabilities, isClient bool) ProtocolAdapter
Create is a convenience function to create a protocol adapter
type ProtocolAdapterFactory ¶
type ProtocolAdapterFactory struct{}
ProtocolAdapterFactory creates protocol adapters based on capability negotiation
func NewProtocolAdapterFactory ¶
func NewProtocolAdapterFactory() *ProtocolAdapterFactory
NewProtocolAdapterFactory creates a new protocol adapter factory
func (*ProtocolAdapterFactory) Create ¶
func (f *ProtocolAdapterFactory) Create(conn *websocket.Conn, capabilities *client.Capabilities, isClient bool) ProtocolAdapter
Create creates a protocol adapter based on capabilities If capabilities is nil or has no flags, returns a LegacyProtocolAdapter Otherwise, returns a BinaryProtocolAdapter
type Stream ¶
type Stream struct {
ID string
State StreamState
Chunks map[uint32]*StreamChunk
ExpectedSequence uint32
TotalChunks int
OnComplete func(data []byte)
OnError func(error error)
CompletedData []byte
// contains filtered or unexported fields
}
Stream represents a data stream
type StreamChunk ¶
StreamChunk represents a chunk of stream data
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager manages multiple data streams
func NewStreamManager ¶
func NewStreamManager(defaultChunkSize int) *StreamManager
NewStreamManager creates a new stream manager
func (*StreamManager) AddChunk ¶
func (sm *StreamManager) AddChunk(streamId string, sequence int, data []byte, isLast bool)
AddChunk adds a chunk to an existing stream. The stream must be created first (e.g. via EnsureStream from handleBinaryMessage); auto-create was removed because it called CreateStream while holding the manager lock (deadlock) and used nil onComplete.
func (*StreamManager) CreateStream ¶
func (sm *StreamManager) CreateStream(streamId string, onComplete func(data []byte), onError func(error error)) *Stream
CreateStream creates a new stream
func (*StreamManager) Destroy ¶
func (sm *StreamManager) Destroy()
Destroy destroys the stream manager
func (*StreamManager) EnsureStream ¶ added in v1.27.0
func (sm *StreamManager) EnsureStream(streamId string, onComplete func(data []byte), onError func(error error)) *Stream
EnsureStream returns the existing stream or creates one with the given callbacks. If the stream already exists, callbacks are ignored (first registration wins).
func (*StreamManager) GetStream ¶
func (sm *StreamManager) GetStream(streamId string) *Stream
GetStream gets a stream by ID
func (*StreamManager) RemoveStream ¶
func (sm *StreamManager) RemoveStream(streamId string)
RemoveStream removes a stream
func (*StreamManager) SplitIntoChunks ¶
func (sm *StreamManager) SplitIntoChunks(data []byte, chunkSize int) [][]byte
SplitIntoChunks splits data into chunks
type StreamState ¶
type StreamState string
StreamState represents the state of a stream
const ( StreamStateInitializing StreamState = "initializing" StreamStateActive StreamState = "active" StreamStatePaused StreamState = "paused" StreamStateCompleted StreamState = "completed" StreamStateError StreamState = "error" )
type WindowState ¶
type WindowState struct {
SendWindow int // Send window size (bytes sent but not acknowledged)
ReceiveWindow int // Receive window size (bytes that can be received)
MaxWindowSize int // Maximum window size
}
WindowState represents the state of a flow control window