Documentation
¶
Overview ¶
Package stream manages HTTP/2 stream lifecycle, state transitions, flow control, and frame processing.
Index ¶
- func ParsePriorityFromHeaders(f *http2.HeadersFrame) (dependency uint32, weight uint8, exclusive bool, hasPriority bool)
- type ContinuationState
- type FrameWriter
- type Handler
- type HandlerFunc
- type Manager
- func (m *Manager) AccumulateWindowUpdate(streamID uint32, increment uint32)
- func (m *Manager) ConsumeSendWindow(streamID uint32, n int32)
- func (m *Manager) ConsumeSendWindowFast(s *Stream, n int32)
- func (m *Manager) CountActiveStreams() int
- func (m *Manager) CreateStream(id uint32) *Stream
- func (m *Manager) DeleteStream(id uint32)
- func (m *Manager) FlushWindowUpdates(writer FrameWriter, force bool) bool
- func (m *Manager) GetConnectionWindow() int32
- func (m *Manager) GetLastClientStreamID() uint32
- func (m *Manager) GetLastStreamID() uint32
- func (m *Manager) GetMaxConcurrentStreams() uint32
- func (m *Manager) GetOrCreateStream(id uint32) *Stream
- func (m *Manager) GetSendWindowsAndMaxFrame(streamID uint32) (connWindow int32, streamWindow int32, maxFrame uint32)
- func (m *Manager) GetSendWindowsAndMaxFrameFast(s *Stream) (connWindow int32, streamWindow int32, maxFrame uint32)
- func (m *Manager) GetStream(id uint32) (*Stream, bool)
- func (m *Manager) MarkStreamBuffered(id uint32)
- func (m *Manager) MarkStreamEmpty(id uint32)
- func (m *Manager) SetMaxConcurrentStreams(n uint32)
- func (m *Manager) StreamCount() int
- func (m *Manager) TryOpenStream(id uint32) (*Stream, bool)
- func (m *Manager) UpdateConnectionWindow(delta int32)
- type Phase
- type Priority
- type PriorityTree
- func (pt *PriorityTree) CalculateStreamPriority(streamID uint32) int
- func (pt *PriorityTree) GetChildren(streamID uint32) []uint32
- func (pt *PriorityTree) GetPriority(streamID uint32) (*Priority, bool)
- func (pt *PriorityTree) GetWeight(streamID uint32) uint8
- func (pt *PriorityTree) RemoveStream(streamID uint32)
- func (pt *PriorityTree) SetPriority(streamID uint32, priority Priority)
- func (pt *PriorityTree) UpdateFromFrame(streamID uint32, dependency uint32, weight uint8, exclusive bool)
- type Processor
- func (p *Processor) FlushBufferedData(_ uint32)
- func (p *Processor) GetConnection() ResponseWriter
- func (p *Processor) GetCurrentConn() ResponseWriter
- func (p *Processor) GetExpectedContinuationStreamID() (uint32, bool)
- func (p *Processor) GetManager() *Manager
- func (p *Processor) GetStreamPriority(streamID uint32) int
- func (p *Processor) IsExpectingContinuation() bool
- func (p *Processor) ProcessFrame(ctx context.Context, frame http2.Frame) error
- func (p *Processor) ProcessFrameWithConn(ctx context.Context, frame http2.Frame, conn ResponseWriter) error
- func (p *Processor) SendGoAway(lastStreamID uint32, code http2.ErrCode, debugData []byte) error
- type ResponseWriter
- type State
- type Stream
- func (s *Stream) AddData(data []byte) error
- func (s *Stream) AddHeader(name, value string)
- func (s *Stream) BufferOutbound(data []byte, endStream bool)
- func (s *Stream) Cancel()
- func (s *Stream) Context() context.Context
- func (s *Stream) DeductWindow(n int32)
- func (s *Stream) ForEachHeader(fn func(name, value string))
- func (s *Stream) GetData() []byte
- func (s *Stream) GetHeaders() [][2]string
- func (s *Stream) GetPhase() Phase
- func (s *Stream) GetState() State
- func (s *Stream) GetWindowSize() int32
- func (s *Stream) HeadersLen() int
- func (s *Stream) MarkBuffered()
- func (s *Stream) MarkEmpty()
- func (s *Stream) Release()
- func (s *Stream) SetPhase(p Phase)
- func (s *Stream) SetState(state State)
- func (s *Stream) WriteLock()
- func (s *Stream) WriteUnlock()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ParsePriorityFromHeaders ¶
func ParsePriorityFromHeaders(f *http2.HeadersFrame) (dependency uint32, weight uint8, exclusive bool, hasPriority bool)
ParsePriorityFromHeaders extracts priority information from a HEADERS frame.
Types ¶
type ContinuationState ¶
type ContinuationState struct {
// contains filtered or unexported fields
}
ContinuationState tracks the state of CONTINUATION frames.
type FrameWriter ¶
type FrameWriter interface {
WriteSettings(settings ...http2.Setting) error
WriteSettingsAck() error
WriteHeaders(streamID uint32, endStream bool, headerBlock []byte, maxFrameSize uint32) error
WriteData(streamID uint32, endStream bool, data []byte) error
WriteWindowUpdate(streamID uint32, increment uint32) error
WriteRSTStream(streamID uint32, code http2.ErrCode) error
WriteGoAway(lastStreamID uint32, code http2.ErrCode, debugData []byte) error
WritePing(ack bool, data [8]byte) error
WritePushPromise(streamID uint32, promiseID uint32, endHeaders bool, headerBlock []byte) error
}
FrameWriter is an interface for writing HTTP/2 frames.
type HandlerFunc ¶
HandlerFunc is an adapter to use functions as stream handlers.
func (HandlerFunc) HandleStream ¶
func (f HandlerFunc) HandleStream(ctx context.Context, stream *Stream) error
HandleStream calls the handler function.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages multiple HTTP/2 streams.
func (*Manager) AccumulateWindowUpdate ¶
AccumulateWindowUpdate accumulates window credits without sending immediately.
func (*Manager) ConsumeSendWindow ¶
ConsumeSendWindow decrements connection and stream windows after sending DATA.
func (*Manager) ConsumeSendWindowFast ¶
ConsumeSendWindowFast decrements connection and stream windows after sending DATA. Avoids Manager lock.
func (*Manager) CountActiveStreams ¶
CountActiveStreams returns number of streams considered active for concurrency limits.
func (*Manager) CreateStream ¶
CreateStream creates a new stream with the given ID.
func (*Manager) DeleteStream ¶
DeleteStream removes a stream and releases its pooled buffers.
func (*Manager) FlushWindowUpdates ¶
func (m *Manager) FlushWindowUpdates(writer FrameWriter, force bool) bool
FlushWindowUpdates sends accumulated WINDOW_UPDATE frames if threshold is met. Returns true if updates were sent.
func (*Manager) GetConnectionWindow ¶
GetConnectionWindow returns the current connection window size.
func (*Manager) GetLastClientStreamID ¶
GetLastClientStreamID returns the highest client-initiated stream ID observed.
func (*Manager) GetLastStreamID ¶
GetLastStreamID returns the highest stream ID.
func (*Manager) GetMaxConcurrentStreams ¶
GetMaxConcurrentStreams returns the currently configured max concurrent streams value.
func (*Manager) GetOrCreateStream ¶
GetOrCreateStream gets an existing stream or creates a new one.
func (*Manager) GetSendWindowsAndMaxFrame ¶
func (m *Manager) GetSendWindowsAndMaxFrame(streamID uint32) (connWindow int32, streamWindow int32, maxFrame uint32)
GetSendWindowsAndMaxFrame returns current connection window, stream window, and max frame size.
func (*Manager) GetSendWindowsAndMaxFrameFast ¶
func (m *Manager) GetSendWindowsAndMaxFrameFast(s *Stream) (connWindow int32, streamWindow int32, maxFrame uint32)
GetSendWindowsAndMaxFrameFast returns current connection window, stream window, and max frame size. It avoids Manager lock by using atomics and direct stream access.
func (*Manager) MarkStreamBuffered ¶
MarkStreamBuffered adds a stream to the set of streams with buffered data.
func (*Manager) MarkStreamEmpty ¶
MarkStreamEmpty removes a stream from the set of streams with buffered data.
func (*Manager) SetMaxConcurrentStreams ¶
SetMaxConcurrentStreams sets the maximum number of concurrent peer-initiated streams allowed.
func (*Manager) StreamCount ¶
StreamCount returns the number of streams in the manager.
func (*Manager) TryOpenStream ¶
TryOpenStream attempts to atomically open a new stream and mark it active. Returns the opened stream and true on success; returns false if the MAX_CONCURRENT_STREAMS limit would be exceeded.
func (*Manager) UpdateConnectionWindow ¶
UpdateConnectionWindow updates the connection-level flow control window.
type Phase ¶
type Phase int
Phase represents the response phase for a stream to ensure proper write ordering.
Response phase constants for ensuring correct write ordering.
type PriorityTree ¶
type PriorityTree struct {
// contains filtered or unexported fields
}
PriorityTree manages stream priorities and dependencies.
func NewPriorityTree ¶
func NewPriorityTree() *PriorityTree
NewPriorityTree creates a new priority tree.
func (*PriorityTree) CalculateStreamPriority ¶
func (pt *PriorityTree) CalculateStreamPriority(streamID uint32) int
CalculateStreamPriority computes a priority score for stream scheduling.
func (*PriorityTree) GetChildren ¶
func (pt *PriorityTree) GetChildren(streamID uint32) []uint32
GetChildren returns the streams that depend on the given stream.
func (*PriorityTree) GetPriority ¶
func (pt *PriorityTree) GetPriority(streamID uint32) (*Priority, bool)
GetPriority retrieves priority information for a stream.
func (*PriorityTree) GetWeight ¶
func (pt *PriorityTree) GetWeight(streamID uint32) uint8
GetWeight returns the weight of a stream, defaulting to 16.
func (*PriorityTree) RemoveStream ¶
func (pt *PriorityTree) RemoveStream(streamID uint32)
RemoveStream removes a stream and reorganizes its dependencies.
func (*PriorityTree) SetPriority ¶
func (pt *PriorityTree) SetPriority(streamID uint32, priority Priority)
SetPriority assigns or updates priority information for a stream.
func (*PriorityTree) UpdateFromFrame ¶
func (pt *PriorityTree) UpdateFromFrame(streamID uint32, dependency uint32, weight uint8, exclusive bool)
UpdateFromFrame updates stream priority from frame parameters.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor processes incoming HTTP/2 frames and manages streams.
func NewProcessor ¶
func NewProcessor(handler Handler, writer FrameWriter, conn ResponseWriter) *Processor
NewProcessor creates a new stream processor.
func (*Processor) FlushBufferedData ¶
FlushBufferedData exposes flushBufferedData for external callers.
func (*Processor) GetConnection ¶
func (p *Processor) GetConnection() ResponseWriter
GetConnection returns the permanent connection writer.
func (*Processor) GetCurrentConn ¶
func (p *Processor) GetCurrentConn() ResponseWriter
GetCurrentConn returns the current connection.
func (*Processor) GetExpectedContinuationStreamID ¶
GetExpectedContinuationStreamID returns the stream ID we're expecting CONTINUATION frames on.
func (*Processor) GetManager ¶
GetManager returns the stream manager.
func (*Processor) GetStreamPriority ¶
GetStreamPriority returns the priority score for a stream.
func (*Processor) IsExpectingContinuation ¶
IsExpectingContinuation reports whether the processor is in the middle of receiving a header block.
func (*Processor) ProcessFrame ¶
ProcessFrame processes an incoming HTTP/2 frame.
func (*Processor) ProcessFrameWithConn ¶
func (p *Processor) ProcessFrameWithConn(ctx context.Context, frame http2.Frame, conn ResponseWriter) error
ProcessFrameWithConn processes a frame with a connection context.
type ResponseWriter ¶
type ResponseWriter interface {
WriteResponse(stream *Stream, status int, headers [][2]string, body []byte) error
SendGoAway(lastStreamID uint32, code http2.ErrCode, debug []byte) error
MarkStreamClosed(streamID uint32)
IsStreamClosed(streamID uint32) bool
WriteRSTStreamPriority(streamID uint32, code http2.ErrCode) error
CloseConn() error
}
ResponseWriter is an interface for writing responses.
type State ¶
type State int
State represents the state of an HTTP/2 stream as defined in RFC 7540.
HTTP/2 stream state constants as defined in RFC 7540 Section 5.1.
type Stream ¶
type Stream struct {
ID uint32
State State
Headers [][2]string
Trailers [][2]string
Data *bytes.Buffer
OutboundBuffer *bytes.Buffer
OutboundEndStream bool
HeadersSent bool
EndStream bool
IsStreaming bool
HandlerStarted bool
DeferResponse bool
WindowSize int32
ReceivedWindowUpd chan int32 // buffered; consumed by engine layer during DATA writes
ResponseWriter ResponseWriter
ReceivedDataLen int
ReceivedInitialHeaders bool
ClosedByReset bool
// contains filtered or unexported fields
}
Stream represents an HTTP/2 stream with its associated state and data.
func (*Stream) BufferOutbound ¶ added in v0.2.0
BufferOutbound stores data that couldn't be sent due to flow control.
func (*Stream) DeductWindow ¶ added in v0.2.0
DeductWindow subtracts n from the flow control window.
func (*Stream) ForEachHeader ¶
ForEachHeader calls fn for each header under a read lock.
func (*Stream) GetHeaders ¶
GetHeaders returns a copy of the headers.
func (*Stream) GetWindowSize ¶
GetWindowSize returns the current flow control window size.
func (*Stream) HeadersLen ¶
HeadersLen returns the number of headers on the stream.
func (*Stream) MarkBuffered ¶
func (s *Stream) MarkBuffered()
MarkBuffered marks the stream as having buffered data.
func (*Stream) MarkEmpty ¶
func (s *Stream) MarkEmpty()
MarkEmpty marks the stream as having no buffered data.
func (*Stream) Release ¶ added in v0.3.2
func (s *Stream) Release()
Release returns pooled buffers, cancels the context, and returns the stream to its pool. Safe to call multiple times; subsequent calls are no-ops.
func (*Stream) WriteLock ¶
func (s *Stream) WriteLock()
WriteLock acquires the per-stream write lock.
func (*Stream) WriteUnlock ¶
func (s *Stream) WriteUnlock()
WriteUnlock releases the per-stream write lock.