protocol

package
v1.29.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrSemanticHTTPNotSupported = errors.New("semantic HTTP streaming not supported")

ErrSemanticHTTPNotSupported is returned by legacy adapters for semantic streaming sends.

Functions

func BuildBinaryMessage

func BuildBinaryMessage(msg BinaryMessage) ([]byte, error)

BuildBinaryMessage builds a binary message from the struct

func DecompressBinaryPayloadForCapabilities added in v1.26.15

func DecompressBinaryPayloadForCapabilities(caps *client.Capabilities, data []byte) ([]byte, error)

DecompressBinaryPayloadForCapabilities decompresses a binary frame payload when compression is negotiated (e.g. semantic HTTP head).

Types

type BinaryMessage

type BinaryMessage struct {
	Type     MessageType
	StreamID string
	Sequence uint32
	Flags    MessageFlags
	Data     []byte
}

BinaryMessage represents a binary message Format: [MessageType(1 byte)] + [StreamIDLength(1 byte)] + [StreamID(variable)] + [Sequence(4 bytes)] + [Flags(1 byte)] + [DataLength(4 bytes)] + [Data(variable)]

func ParseBinaryMessage

func ParseBinaryMessage(buffer []byte) (*BinaryMessage, error)

ParseBinaryMessage parses a binary message from bytes

type BinaryProtocolAdapter

type BinaryProtocolAdapter struct {
	// contains filtered or unexported fields
}

BinaryProtocolAdapter implements the binary protocol adapter

func NewBinaryProtocolAdapter

func NewBinaryProtocolAdapter(conn *websocket.Conn, capabilities *client.Capabilities, isClient bool) *BinaryProtocolAdapter

NewBinaryProtocolAdapter creates a new binary protocol adapter

func (*BinaryProtocolAdapter) Destroy

func (a *BinaryProtocolAdapter) Destroy()

Destroy cleans up resources

func (*BinaryProtocolAdapter) HandleBinaryMessage

func (a *BinaryProtocolAdapter) HandleBinaryMessage(message []byte) error

HandleBinaryMessage handles a received binary message (public method for external callers)

func (*BinaryProtocolAdapter) NegotiatedFlags added in v1.26.15

func (a *BinaryProtocolAdapter) NegotiatedFlags() int

NegotiatedFlags returns capability flags negotiated for this adapter (0 if nil caps).

func (*BinaryProtocolAdapter) OnHTTPRequest

func (a *BinaryProtocolAdapter) OnHTTPRequest(handler func(id string, data []byte) error)

OnHTTPRequest registers an HTTP request handler

func (*BinaryProtocolAdapter) OnHTTPRequestBody added in v1.26.15

func (a *BinaryProtocolAdapter) OnHTTPRequestBody(handler func(id string, chunk []byte, fin bool) error)

OnHTTPRequestBody registers a handler for semantic-streaming HTTP request body chunks.

func (*BinaryProtocolAdapter) OnHTTPRequestHead added in v1.26.15

func (a *BinaryProtocolAdapter) OnHTTPRequestHead(handler func(id string, head []byte, fin bool) error)

OnHTTPRequestHead registers a handler for semantic-streaming HTTP request headers (server -> client).

func (*BinaryProtocolAdapter) OnHTTPResponse

func (a *BinaryProtocolAdapter) OnHTTPResponse(handler func(id string, data []byte) error)

OnHTTPResponse registers an HTTP response handler

func (*BinaryProtocolAdapter) OnHTTPResponseBody added in v1.26.15

func (a *BinaryProtocolAdapter) OnHTTPResponseBody(handler func(id string, chunk []byte, fin bool) error)

OnHTTPResponseBody registers a handler for semantic-streaming HTTP response body chunks.

func (*BinaryProtocolAdapter) OnHTTPResponseHead added in v1.26.15

func (a *BinaryProtocolAdapter) OnHTTPResponseHead(handler func(id string, head []byte, fin bool) error)

OnHTTPResponseHead registers a handler for semantic-streaming HTTP response headers (client -> server).

func (*BinaryProtocolAdapter) OnTCPClose added in v1.27.0

func (a *BinaryProtocolAdapter) OnTCPClose(handler func(streamId string) error) func()

OnTCPClose registers a handler when the client signals that a TCP stream must be torn down.

func (*BinaryProtocolAdapter) OnTCPData

func (a *BinaryProtocolAdapter) OnTCPData(handler func(streamId string, data []byte) error) func()

OnTCPData registers a TCP data handler

func (*BinaryProtocolAdapter) RemoveDataChannelForStream

func (a *BinaryProtocolAdapter) RemoveDataChannelForStream(streamId string)

RemoveDataChannelForStream removes the data channel mapping for a stream Also cleans up flow control window and stream manager resources for the stream

func (*BinaryProtocolAdapter) SendHTTPRequest

func (a *BinaryProtocolAdapter) SendHTTPRequest(id string, data []byte) error

SendHTTPRequest sends an HTTP request

func (*BinaryProtocolAdapter) SendHTTPRequestBody added in v1.26.15

func (a *BinaryProtocolAdapter) SendHTTPRequestBody(id string, chunk []byte, fin bool) error

SendHTTPRequestBody sends a tunneled HTTP request body chunk.

func (*BinaryProtocolAdapter) SendHTTPRequestHead added in v1.26.15

func (a *BinaryProtocolAdapter) SendHTTPRequestHead(id string, head []byte, fin bool) error

SendHTTPRequestHead sends tunneled HTTP request headers (semantic streaming).

func (*BinaryProtocolAdapter) SendHTTPResponse

func (a *BinaryProtocolAdapter) SendHTTPResponse(id string, data []byte) error

SendHTTPResponse sends an HTTP response

func (*BinaryProtocolAdapter) SendHTTPResponseBody added in v1.26.15

func (a *BinaryProtocolAdapter) SendHTTPResponseBody(id string, chunk []byte, fin bool) error

SendHTTPResponseBody sends a tunneled HTTP response body chunk.

func (*BinaryProtocolAdapter) SendHTTPResponseHead added in v1.26.15

func (a *BinaryProtocolAdapter) SendHTTPResponseHead(id string, head []byte, fin bool) error

SendHTTPResponseHead sends tunneled HTTP response headers (semantic streaming).

func (*BinaryProtocolAdapter) SendTCPData

func (a *BinaryProtocolAdapter) SendTCPData(streamId string, data []byte) error

SendTCPData sends TCP data For new protocol, if data channel is available, sends via data channel; otherwise uses monitor channel

func (*BinaryProtocolAdapter) SetConnWriteMu added in v1.26.14

func (a *BinaryProtocolAdapter) SetConnWriteMu(mu *sync.Mutex)

SetConnWriteMu sets the mutex used to serialize writes on the monitor connection.

func (*BinaryProtocolAdapter) SetDataChannel

func (a *BinaryProtocolAdapter) SetDataChannel(dataConn *websocket.Conn, dataWriteMu *sync.Mutex)

SetDataChannel sets the shared data channel connection (legacy fallback)

func (*BinaryProtocolAdapter) SetDataChannelForStream

func (a *BinaryProtocolAdapter) SetDataChannelForStream(streamId string, dataConn *websocket.Conn, dataWriteMu *sync.Mutex)

SetDataChannelForStream sets the data channel connection for a specific stream (new protocol)

type CompressionAlgorithm

type CompressionAlgorithm string

CompressionAlgorithm represents compression algorithm type

const (
	CompressionBrotli CompressionAlgorithm = "brotli"
	CompressionGzip   CompressionAlgorithm = "gzip"
	CompressionNone   CompressionAlgorithm = "none"
)

type FlowController

type FlowController struct {
	// contains filtered or unexported fields
}

FlowController implements backpressure control mechanism

func NewFlowController

func NewFlowController(defaultMaxWindowSize int, onBackpressure func(streamId string, pause bool)) *FlowController

NewFlowController creates a new flow controller

func (*FlowController) CanSend

func (fc *FlowController) CanSend(streamId string, size int) bool

CanSend is a read-only snapshot of whether a send might fit the window. Do not use CanSend followed by Send/TrySend for admission control — use TrySend only so check and window update happen atomically under the same lock.

func (*FlowController) Clear

func (fc *FlowController) Clear()

Clear clears all windows

func (*FlowController) GetWindowState

func (fc *FlowController) GetWindowState(streamId string) *WindowState

GetWindowState gets the window state for a stream

func (*FlowController) InitializeStream

func (fc *FlowController) InitializeStream(streamId string, maxWindowSize ...int)

InitializeStream initializes the window for a stream

func (*FlowController) OnAck

func (fc *FlowController) OnAck(streamId string, ackedSize int)

OnAck receives acknowledgment (updates send window)

func (*FlowController) Receive

func (fc *FlowController) Receive(streamId string, size int) bool

Receive receives data (updates receive window)

func (*FlowController) ReleaseReceiveWindow

func (fc *FlowController) ReleaseReceiveWindow(streamId string, size int)

ReleaseReceiveWindow releases receive window (data has been processed)

func (*FlowController) RemoveStream

func (fc *FlowController) RemoveStream(streamId string)

RemoveStream removes the window for a stream

func (*FlowController) Send

func (fc *FlowController) Send(streamId string, size int) bool

Send sends data (updates send window) Deprecated: Use TrySend for atomic check-and-update operation

func (*FlowController) TrySend

func (fc *FlowController) TrySend(streamId string, size int) bool

TrySend attempts to send data atomically (checks and updates in one operation) Returns true if the send was successful, false if the window is full

type LegacyProtocolAdapter

type LegacyProtocolAdapter struct {
	// contains filtered or unexported fields
}

LegacyProtocolAdapter implements the legacy protocol (JSON + Base64) Maintains compatibility with old clients

func NewLegacyProtocolAdapter

func NewLegacyProtocolAdapter(conn *websocket.Conn, isClient bool) *LegacyProtocolAdapter

NewLegacyProtocolAdapter creates a new legacy protocol adapter

func (*LegacyProtocolAdapter) Destroy

func (a *LegacyProtocolAdapter) Destroy()

Destroy cleans up resources

func (*LegacyProtocolAdapter) HandleEvent

func (a *LegacyProtocolAdapter) HandleEvent(event string, payload interface{}) error

HandleEvent handles incoming events (public method for external callers)

func (*LegacyProtocolAdapter) NegotiatedFlags added in v1.26.15

func (a *LegacyProtocolAdapter) NegotiatedFlags() int

NegotiatedFlags returns 0 for legacy protocol.

func (*LegacyProtocolAdapter) OnHTTPRequest

func (a *LegacyProtocolAdapter) OnHTTPRequest(handler func(id string, data []byte) error)

OnHTTPRequest registers an HTTP request handler

func (*LegacyProtocolAdapter) OnHTTPRequestBody added in v1.26.15

func (a *LegacyProtocolAdapter) OnHTTPRequestBody(handler func(id string, chunk []byte, fin bool) error)

func (*LegacyProtocolAdapter) OnHTTPRequestHead added in v1.26.15

func (a *LegacyProtocolAdapter) OnHTTPRequestHead(handler func(id string, head []byte, fin bool) error)

func (*LegacyProtocolAdapter) OnHTTPResponse

func (a *LegacyProtocolAdapter) OnHTTPResponse(handler func(id string, data []byte) error)

OnHTTPResponse registers an HTTP response handler

func (*LegacyProtocolAdapter) OnHTTPResponseBody added in v1.26.15

func (a *LegacyProtocolAdapter) OnHTTPResponseBody(handler func(id string, chunk []byte, fin bool) error)

func (*LegacyProtocolAdapter) OnHTTPResponseHead added in v1.26.15

func (a *LegacyProtocolAdapter) OnHTTPResponseHead(handler func(id string, head []byte, fin bool) error)

func (*LegacyProtocolAdapter) OnTCPClose added in v1.27.0

func (a *LegacyProtocolAdapter) OnTCPClose(_ func(streamId string) error) func()

OnTCPClose is a no-op for legacy protocol (TCP-over-WS close signals are not used).

func (*LegacyProtocolAdapter) OnTCPData

func (a *LegacyProtocolAdapter) OnTCPData(handler func(streamId string, data []byte) error) func()

OnTCPData registers a TCP data handler

func (*LegacyProtocolAdapter) SendHTTPRequest

func (a *LegacyProtocolAdapter) SendHTTPRequest(id string, data []byte) error

SendHTTPRequest sends an HTTP request Legacy protocol: Base64 encode -> compress -> JSON

func (*LegacyProtocolAdapter) SendHTTPRequestBody added in v1.26.15

func (a *LegacyProtocolAdapter) SendHTTPRequestBody(id string, chunk []byte, fin bool) error

func (*LegacyProtocolAdapter) SendHTTPRequestHead added in v1.26.15

func (a *LegacyProtocolAdapter) SendHTTPRequestHead(id string, head []byte, fin bool) error

func (*LegacyProtocolAdapter) SendHTTPResponse

func (a *LegacyProtocolAdapter) SendHTTPResponse(id string, data []byte) error

SendHTTPResponse sends an HTTP response Legacy protocol: Base64 encode -> compress -> JSON

func (*LegacyProtocolAdapter) SendHTTPResponseBody added in v1.26.15

func (a *LegacyProtocolAdapter) SendHTTPResponseBody(id string, chunk []byte, fin bool) error

func (*LegacyProtocolAdapter) SendHTTPResponseHead added in v1.26.15

func (a *LegacyProtocolAdapter) SendHTTPResponseHead(id string, head []byte, fin bool) error

func (*LegacyProtocolAdapter) SendTCPData

func (a *LegacyProtocolAdapter) SendTCPData(streamId string, data []byte) error

SendTCPData sends TCP data Legacy protocol doesn't support TCP over WebSocket

func (*LegacyProtocolAdapter) SetConnWriteMu added in v1.26.14

func (a *LegacyProtocolAdapter) SetConnWriteMu(mu *sync.Mutex)

SetConnWriteMu sets the mutex used to serialize writes on the monitor connection.

func (*LegacyProtocolAdapter) SetLegacyPeerVersion added in v1.28.3

func (a *LegacyProtocolAdapter) SetLegacyPeerVersion(version string)

SetLegacyPeerVersion switches legacy wire encoding to match historical Node client behavior: - < 1.3.0: plain base64(raw HTTP) - >= 1.3.0: base64(brotli(base64(raw HTTP)))

type MessageFlags

type MessageFlags uint8

MessageFlags represents message flags

const (
	MessageFlagFIN          MessageFlags = 0x01 // Stream end
	MessageFlagACK          MessageFlags = 0x02 // Acknowledgment
	MessageFlagBackpressure MessageFlags = 0x04 // Backpressure
)

type MessageType

type MessageType uint8

MessageType represents the type of binary message

const (
	MessageTypeControl          MessageType = 0x00 // Control message (auth, heartbeat, etc.)
	MessageTypeHTTPRequest      MessageType = 0x01 // HTTP request
	MessageTypeHTTPResponse     MessageType = 0x02 // HTTP response
	MessageTypeTCPData          MessageType = 0x03 // TCP data
	MessageTypeTCPOpen          MessageType = 0x04 // TCP connection open
	MessageTypeTCPClose         MessageType = 0x05 // TCP connection close
	MessageTypeFlowControl      MessageType = 0x06 // Flow control message
	MessageTypeHTTPRequestHead  MessageType = 0x07 // HTTP request headers only (semantic streaming)
	MessageTypeHTTPRequestBody  MessageType = 0x08 // HTTP request body chunk
	MessageTypeHTTPResponseHead MessageType = 0x09 // HTTP response headers only
	MessageTypeHTTPResponseBody MessageType = 0x0a // HTTP response body chunk
)

type ProtocolAdapter

type ProtocolAdapter interface {
	// SendHTTPRequest sends an HTTP request
	SendHTTPRequest(id string, data []byte) error

	// SendHTTPResponse sends an HTTP response
	SendHTTPResponse(id string, data []byte) error

	// SendHTTPRequestHead sends HTTP request headers only (semantic body streaming).
	SendHTTPRequestHead(id string, head []byte, fin bool) error
	// SendHTTPRequestBody sends one HTTP request body chunk (last chunk sets fin).
	SendHTTPRequestBody(id string, chunk []byte, fin bool) error
	// SendHTTPResponseHead sends HTTP response headers only.
	SendHTTPResponseHead(id string, head []byte, fin bool) error
	// SendHTTPResponseBody sends one HTTP response body chunk.
	SendHTTPResponseBody(id string, chunk []byte, fin bool) error

	// SendTCPData sends TCP data
	SendTCPData(streamId string, data []byte) error

	// OnHTTPRequest registers an HTTP request handler
	OnHTTPRequest(handler func(id string, data []byte) error)

	// OnHTTPResponse registers an HTTP response handler
	OnHTTPResponse(handler func(id string, data []byte) error)

	OnHTTPRequestHead(handler func(id string, head []byte, fin bool) error)
	OnHTTPRequestBody(handler func(id string, chunk []byte, fin bool) error)
	OnHTTPResponseHead(handler func(id string, head []byte, fin bool) error)
	OnHTTPResponseBody(handler func(id string, chunk []byte, fin bool) error)

	// OnTCPData registers a TCP data handler
	// Returns an unsubscribe function
	OnTCPData(handler func(streamId string, data []byte) error) func()

	// OnTCPClose registers a handler for client-side TCP stream teardown (e.g. upstream dial failed).
	// Legacy adapters return a no-op unsubscribe.
	OnTCPClose(handler func(streamId string) error) func()

	// Destroy cleans up resources
	Destroy()

	// SetConnWriteMu serializes WriteMessage on the monitor WebSocket when non-nil (server-side; required by gorilla/websocket).
	SetConnWriteMu(mu *sync.Mutex)

	// NegotiatedFlags returns the bitmask from capability negotiation (0 for legacy).
	NegotiatedFlags() int
}

ProtocolAdapter interface for protocol adaptation Used to unify message transmission handling across different protocol versions

func Create

func Create(conn *websocket.Conn, capabilities *client.Capabilities, isClient bool) ProtocolAdapter

Create is a convenience function to create a protocol adapter

type ProtocolAdapterFactory

type ProtocolAdapterFactory struct{}

ProtocolAdapterFactory creates protocol adapters based on capability negotiation

func NewProtocolAdapterFactory

func NewProtocolAdapterFactory() *ProtocolAdapterFactory

NewProtocolAdapterFactory creates a new protocol adapter factory

func (*ProtocolAdapterFactory) Create

func (f *ProtocolAdapterFactory) Create(conn *websocket.Conn, capabilities *client.Capabilities, isClient bool) ProtocolAdapter

Create creates a protocol adapter based on capabilities If capabilities is nil or has no flags, returns a LegacyProtocolAdapter Otherwise, returns a BinaryProtocolAdapter

type Stream

type Stream struct {
	ID               string
	State            StreamState
	Chunks           map[uint32]*StreamChunk
	ExpectedSequence uint32
	TotalChunks      int
	OnComplete       func(data []byte)
	OnError          func(error error)
	CompletedData    []byte
	// contains filtered or unexported fields
}

Stream represents a data stream

func NewStream

func NewStream(id string) *Stream

NewStream creates a new stream

func (*Stream) AddChunk

func (s *Stream) AddChunk(sequence uint32, data []byte, isLast bool)

AddChunk adds a chunk to the stream

func (*Stream) Destroy

func (s *Stream) Destroy()

Destroy destroys the stream

func (*Stream) Pause

func (s *Stream) Pause()

Pause pauses the stream

func (*Stream) Resume

func (s *Stream) Resume()

Resume resumes the stream

type StreamChunk

type StreamChunk struct {
	Sequence uint32
	Data     []byte
	IsLast   bool
}

StreamChunk represents a chunk of stream data

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager manages multiple data streams

func NewStreamManager

func NewStreamManager(defaultChunkSize int) *StreamManager

NewStreamManager creates a new stream manager

func (*StreamManager) AddChunk

func (sm *StreamManager) AddChunk(streamId string, sequence int, data []byte, isLast bool)

AddChunk adds a chunk to an existing stream. The stream must be created first (e.g. via EnsureStream from handleBinaryMessage); auto-create was removed because it called CreateStream while holding the manager lock (deadlock) and used nil onComplete.

func (*StreamManager) CreateStream

func (sm *StreamManager) CreateStream(streamId string, onComplete func(data []byte), onError func(error error)) *Stream

CreateStream creates a new stream

func (*StreamManager) Destroy

func (sm *StreamManager) Destroy()

Destroy destroys the stream manager

func (*StreamManager) EnsureStream added in v1.27.0

func (sm *StreamManager) EnsureStream(streamId string, onComplete func(data []byte), onError func(error error)) *Stream

EnsureStream returns the existing stream or creates one with the given callbacks. If the stream already exists, callbacks are ignored (first registration wins).

func (*StreamManager) GetStream

func (sm *StreamManager) GetStream(streamId string) *Stream

GetStream gets a stream by ID

func (*StreamManager) RemoveStream

func (sm *StreamManager) RemoveStream(streamId string)

RemoveStream removes a stream

func (*StreamManager) SplitIntoChunks

func (sm *StreamManager) SplitIntoChunks(data []byte, chunkSize int) [][]byte

SplitIntoChunks splits data into chunks

type StreamState

type StreamState string

StreamState represents the state of a stream

const (
	StreamStateInitializing StreamState = "initializing"
	StreamStateActive       StreamState = "active"
	StreamStatePaused       StreamState = "paused"
	StreamStateCompleted    StreamState = "completed"
	StreamStateError        StreamState = "error"
)

type WindowState

type WindowState struct {
	SendWindow    int // Send window size (bytes sent but not acknowledged)
	ReceiveWindow int // Receive window size (bytes that can be received)
	MaxWindowSize int // Maximum window size
}

WindowState represents the state of a flow control window

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL