Documentation
¶
Overview ¶
Package stream manages HTTP/2 stream lifecycle, state transitions, flow control, and frame processing.
Index ¶
- Variables
- func ParsePriorityFromHeaders(f *http2.HeadersFrame) (dependency uint32, weight uint8, exclusive bool, hasPriority bool)
- func ResetForPool(s *Stream)
- func ResetH1Stream(s *Stream)
- func ResetH2StreamInline(s *Stream, id uint32)
- type ContinuationState
- type FrameWriter
- type H2Controller
- type Handler
- type HandlerFunc
- type Hijacker
- type Manager
- func (m *Manager) AccumulateWindowUpdate(streamID uint32, increment uint32)
- func (m *Manager) Close()
- func (m *Manager) ConsumeSendWindow(streamID uint32, n int32)
- func (m *Manager) ConsumeSendWindowFast(s *Stream, n int32)
- func (m *Manager) CountActiveStreams() int
- func (m *Manager) CreateStream(id uint32) *Stream
- func (m *Manager) DeleteStream(id uint32)
- func (m *Manager) FlushWindowUpdates(writer FrameWriter, force bool) bool
- func (m *Manager) GetConnectionWindow() int32
- func (m *Manager) GetLastClientStreamID() uint32
- func (m *Manager) GetLastStreamID() uint32
- func (m *Manager) GetMaxConcurrentStreams() uint32
- func (m *Manager) GetMaxFrameSize() uint32
- func (m *Manager) GetOrCreateStream(id uint32) *Stream
- func (m *Manager) GetSendWindowsAndMaxFrame(streamID uint32) (connWindow int32, streamWindow int32, maxFrame uint32)
- func (m *Manager) GetSendWindowsAndMaxFrameFast(s *Stream) (connWindow int32, streamWindow int32, maxFrame uint32)
- func (m *Manager) GetStream(id uint32) (*Stream, bool)
- func (m *Manager) MarkStreamBuffered(id uint32)
- func (m *Manager) MarkStreamEmpty(id uint32)
- func (m *Manager) RemoveStreamFromMap(id uint32)
- func (m *Manager) SetMaxConcurrentStreams(n uint32)
- func (m *Manager) StreamCount() int
- func (m *Manager) TryOpenStream(id uint32) (*Stream, bool)
- func (m *Manager) UpdateConnectionWindow(delta int32)
- type Phase
- type Priority
- type PriorityTree
- func (pt *PriorityTree) CalculateStreamPriority(streamID uint32) int
- func (pt *PriorityTree) GetChildren(streamID uint32) []uint32
- func (pt *PriorityTree) GetPriority(streamID uint32) (*Priority, bool)
- func (pt *PriorityTree) GetWeight(streamID uint32) uint8
- func (pt *PriorityTree) RemoveStream(streamID uint32)
- func (pt *PriorityTree) SetPriority(streamID uint32, priority Priority)
- func (pt *PriorityTree) UpdateFromFrame(streamID uint32, dependency uint32, weight uint8, exclusive bool)
- type Processor
- func (p *Processor) FlushBufferedData(_ uint32)
- func (p *Processor) FlushInlineCleanup()
- func (p *Processor) GetConnection() ResponseWriter
- func (p *Processor) GetCurrentConn() ResponseWriter
- func (p *Processor) GetExpectedContinuationStreamID() (uint32, bool)
- func (p *Processor) GetManager() *Manager
- func (p *Processor) GetStreamPriority(streamID uint32) int
- func (p *Processor) GoAwayErr(lastStreamID uint32, code http2.ErrCode, debug []byte, err error) error
- func (p *Processor) HandleRawPing(flags byte, payload []byte) error
- func (p *Processor) HandleRawWindowUpdate(streamID uint32, payload []byte) error
- func (p *Processor) IsExpectingContinuation() bool
- func (p *Processor) ProcessFrame(ctx context.Context, frame http2.Frame) error
- func (p *Processor) ProcessFrameWithConn(ctx context.Context, frame http2.Frame, conn ResponseWriter) error
- func (p *Processor) ProcessRawHeaders(streamID uint32, endStream bool, headerBlock []byte) error
- func (p *Processor) SendGoAway(lastStreamID uint32, code http2.ErrCode, debugData []byte) error
- func (p *Processor) SetHasMoreFrames(v bool)
- type ResponseWriter
- type State
- type Stream
- func (s *Stream) AddData(data []byte) error
- func (s *Stream) AddHeader(name, value string)
- func (s *Stream) AddHeadersBatch(headers [][2]string)
- func (s *Stream) AddWindowSize(delta int32) int32
- func (s *Stream) BufferOutbound(data []byte, endStream bool)
- func (s *Stream) Cancel()
- func (s *Stream) CompareAndSwapWindowSize(old, val int32) bool
- func (s *Stream) Context() context.Context
- func (s *Stream) DeductWindow(n int32)
- func (s *Stream) ForEachHeader(fn func(name, value string))
- func (s *Stream) GetBuf() *bytes.Buffer
- func (s *Stream) GetData() []byte
- func (s *Stream) GetHandlerStarted() bool
- func (s *Stream) GetHeaders() [][2]string
- func (s *Stream) GetHeadersSent() bool
- func (s *Stream) GetOrCreateWindowUpdateChan() chan int32
- func (s *Stream) GetPhase() Phase
- func (s *Stream) GetState() State
- func (s *Stream) GetWindowSize() int32
- func (s *Stream) HasDoneCh() bool
- func (s *Stream) HeadersLen() int
- func (s *Stream) IsCancelled() bool
- func (s *Stream) IsH1() bool
- func (s *Stream) LoadWindowSize() int32
- func (s *Stream) MarkBuffered()
- func (s *Stream) MarkEmpty()
- func (s *Stream) ProtoMajor() uint8
- func (s *Stream) Release()
- func (s *Stream) SetHandlerStarted()
- func (s *Stream) SetHeadersSent()
- func (s *Stream) SetPhase(p Phase)
- func (s *Stream) SetProtoMajor(v uint8)
- func (s *Stream) SetState(state State)
- func (s *Stream) SetWindowSize(v int32)
- func (s *Stream) StoreState(state State)
- func (s *Stream) WriteLock()
- func (s *Stream) WriteUnlock()
- type Streamer
Constants ¶
This section is empty.
Variables ¶
var ErrHijackNotSupported = errors.New("celeris: hijack not supported by this engine")
ErrHijackNotSupported is returned when the engine does not support connection takeover.
Functions ¶
func ParsePriorityFromHeaders ¶
func ParsePriorityFromHeaders(f *http2.HeadersFrame) (dependency uint32, weight uint8, exclusive bool, hasPriority bool)
ParsePriorityFromHeaders extracts priority information from a HEADERS frame.
func ResetForPool ¶ added in v1.2.2
func ResetForPool(s *Stream)
ResetForPool returns pooled buffers and returns the stream to its pool WITHOUT cancelling the context. Use this in test harnesses where derived contexts (e.g. from context.WithTimeout) may have propagation goroutines that race with cancellation flag clearing.
func ResetH1Stream ¶ added in v1.1.0
func ResetH1Stream(s *Stream)
ResetH1Stream performs a lightweight per-request reset for H1 stream reuse.
func ResetH2StreamInline ¶ added in v1.1.0
ResetH2StreamInline performs a lightweight reset for inline H2 stream reuse. Similar to ResetH1Stream but for H2 inline handler path. Resets per-request fields while preserving the stream's pool buffers.
Types ¶
type ContinuationState ¶
type ContinuationState struct {
// contains filtered or unexported fields
}
ContinuationState tracks the state of CONTINUATION frames.
type FrameWriter ¶
type FrameWriter interface {
WriteSettings(settings ...http2.Setting) error
WriteSettingsAck() error
WriteHeaders(streamID uint32, endStream bool, headerBlock []byte, maxFrameSize uint32) error
WriteData(streamID uint32, endStream bool, data []byte) error
WriteWindowUpdate(streamID uint32, increment uint32) error
WriteRSTStream(streamID uint32, code http2.ErrCode) error
WriteGoAway(lastStreamID uint32, code http2.ErrCode, debugData []byte) error
WritePing(ack bool, data [8]byte) error
WritePushPromise(streamID uint32, promiseID uint32, endHeaders bool, headerBlock []byte) error
}
FrameWriter is an interface for writing HTTP/2 frames.
type H2Controller ¶ added in v1.3.0
type H2Controller interface {
SendGoAway(lastStreamID uint32, code http2.ErrCode, debug []byte) error
MarkStreamClosed(streamID uint32)
IsStreamClosed(streamID uint32) bool
WriteRSTStreamPriority(streamID uint32, code http2.ErrCode) error
CloseConn() error
}
H2Controller provides HTTP/2 connection-level control operations. Only H2-capable engines implement this; H1 adapters, test recorders, and stdlib bridges do not.
type HandlerFunc ¶
HandlerFunc is an adapter to use functions as stream handlers.
func (HandlerFunc) HandleStream ¶
func (f HandlerFunc) HandleStream(ctx context.Context, stream *Stream) error
HandleStream calls the handler function.
type Hijacker ¶ added in v1.1.0
Hijacker is implemented by ResponseWriters that support connection takeover. This enables protocols like WebSocket that require raw TCP access.
type Manager ¶
type Manager struct {
RemoteAddr string
// contains filtered or unexported fields
}
Manager manages multiple HTTP/2 streams.
func (*Manager) AccumulateWindowUpdate ¶
AccumulateWindowUpdate accumulates window credits without sending immediately.
func (*Manager) Close ¶ added in v1.1.0
func (m *Manager) Close()
Close releases all streams still held by the manager. Called when the H2 connection is closed to prevent stream objects from leaking in the map.
func (*Manager) ConsumeSendWindow ¶
ConsumeSendWindow decrements connection and stream windows after sending DATA.
func (*Manager) ConsumeSendWindowFast ¶
ConsumeSendWindowFast decrements connection and stream windows after sending DATA. Avoids Manager lock.
func (*Manager) CountActiveStreams ¶
CountActiveStreams returns number of streams considered active for concurrency limits.
func (*Manager) CreateStream ¶
CreateStream creates a new stream with the given ID.
func (*Manager) DeleteStream ¶
DeleteStream removes a stream and releases its pooled buffers. If the stream has an async handler goroutine running (asyncRunning=true), the stream is removed from the map but NOT released — the goroutine will release it upon completion via ReleaseAsyncStream.
func (*Manager) FlushWindowUpdates ¶
func (m *Manager) FlushWindowUpdates(writer FrameWriter, force bool) bool
FlushWindowUpdates sends accumulated WINDOW_UPDATE frames if threshold is met. Returns true if updates were sent.
func (*Manager) GetConnectionWindow ¶
GetConnectionWindow atomically returns the current connection window size.
func (*Manager) GetLastClientStreamID ¶
GetLastClientStreamID returns the highest client-initiated stream ID observed.
func (*Manager) GetLastStreamID ¶
GetLastStreamID returns the highest stream ID.
func (*Manager) GetMaxConcurrentStreams ¶
GetMaxConcurrentStreams returns the currently configured max concurrent streams value.
func (*Manager) GetMaxFrameSize ¶ added in v1.1.0
GetMaxFrameSize returns the current max frame size (atomic).
func (*Manager) GetOrCreateStream ¶
GetOrCreateStream gets an existing stream or creates a new one.
func (*Manager) GetSendWindowsAndMaxFrame ¶
func (m *Manager) GetSendWindowsAndMaxFrame(streamID uint32) (connWindow int32, streamWindow int32, maxFrame uint32)
GetSendWindowsAndMaxFrame returns current connection window, stream window, and max frame size.
func (*Manager) GetSendWindowsAndMaxFrameFast ¶
func (m *Manager) GetSendWindowsAndMaxFrameFast(s *Stream) (connWindow int32, streamWindow int32, maxFrame uint32)
GetSendWindowsAndMaxFrameFast returns current connection window, stream window, and max frame size. It avoids Manager lock by using atomics and direct stream access.
func (*Manager) MarkStreamBuffered ¶
MarkStreamBuffered adds a stream to the set of streams with buffered data.
func (*Manager) MarkStreamEmpty ¶
MarkStreamEmpty removes a stream from the set of streams with buffered data.
func (*Manager) RemoveStreamFromMap ¶ added in v1.1.0
RemoveStreamFromMap removes a stream from the manager's map without releasing it. Used by async handler goroutines that manage their own stream lifecycle.
func (*Manager) SetMaxConcurrentStreams ¶
SetMaxConcurrentStreams sets the maximum number of concurrent peer-initiated streams allowed.
func (*Manager) StreamCount ¶
StreamCount returns the number of streams in the manager.
func (*Manager) TryOpenStream ¶
TryOpenStream attempts to atomically open a new stream and mark it active. Returns the opened stream and true on success; returns false if the MAX_CONCURRENT_STREAMS limit would be exceeded.
func (*Manager) UpdateConnectionWindow ¶
UpdateConnectionWindow atomically updates the connection-level flow control window.
type Phase ¶
type Phase int
Phase represents the response phase for a stream to ensure proper write ordering.
Response phase constants for ensuring correct write ordering.
type PriorityTree ¶
type PriorityTree struct {
// contains filtered or unexported fields
}
PriorityTree manages stream priorities and dependencies.
func NewPriorityTree ¶
func NewPriorityTree() *PriorityTree
NewPriorityTree creates a new priority tree.
func (*PriorityTree) CalculateStreamPriority ¶
func (pt *PriorityTree) CalculateStreamPriority(streamID uint32) int
CalculateStreamPriority computes a priority score for stream scheduling.
func (*PriorityTree) GetChildren ¶
func (pt *PriorityTree) GetChildren(streamID uint32) []uint32
GetChildren returns the streams that depend on the given stream.
func (*PriorityTree) GetPriority ¶
func (pt *PriorityTree) GetPriority(streamID uint32) (*Priority, bool)
GetPriority retrieves priority information for a stream.
func (*PriorityTree) GetWeight ¶
func (pt *PriorityTree) GetWeight(streamID uint32) uint8
GetWeight returns the weight of a stream, defaulting to 16.
func (*PriorityTree) RemoveStream ¶
func (pt *PriorityTree) RemoveStream(streamID uint32)
RemoveStream removes a stream and reorganizes its dependencies.
func (*PriorityTree) SetPriority ¶
func (pt *PriorityTree) SetPriority(streamID uint32, priority Priority)
SetPriority assigns or updates priority information for a stream.
func (*PriorityTree) UpdateFromFrame ¶
func (pt *PriorityTree) UpdateFromFrame(streamID uint32, dependency uint32, weight uint8, exclusive bool)
UpdateFromFrame updates stream priority from frame parameters.
type Processor ¶
type Processor struct {
InlineCachedCtx any // per-connection cached app context for inline handlers (avoids sync.Pool)
InlineWriter h2Conn // direct-to-outBuf writer for inline handlers (set by conn layer)
InlineCount uint64 // number of requests handled inline (for metrics)
MaxRequestBodySize int64 // 0 = use default (100 MB)
// contains filtered or unexported fields
}
Processor processes incoming HTTP/2 frames and manages streams.
func NewProcessor ¶
func NewProcessor(handler Handler, writer FrameWriter, conn h2Conn) *Processor
NewProcessor creates a new stream processor. The conn parameter must implement both ResponseWriter and H2Controller (all H2 engine adapters do).
func (*Processor) FlushBufferedData ¶
FlushBufferedData exposes flushBufferedData for external callers.
func (*Processor) FlushInlineCleanup ¶ added in v1.1.0
func (p *Processor) FlushInlineCleanup()
FlushInlineCleanup transitions and removes streams that completed inline during the frame loop but had cleanup deferred (pending outbound data or more frames to process). Called after the frame loop, under H2State.mu.
func (*Processor) GetConnection ¶
func (p *Processor) GetConnection() ResponseWriter
GetConnection returns the permanent connection writer.
func (*Processor) GetCurrentConn ¶
func (p *Processor) GetCurrentConn() ResponseWriter
GetCurrentConn returns the current connection.
func (*Processor) GetExpectedContinuationStreamID ¶
GetExpectedContinuationStreamID returns the stream ID we're expecting CONTINUATION frames on.
func (*Processor) GetManager ¶
GetManager returns the stream manager.
func (*Processor) GetStreamPriority ¶
GetStreamPriority returns the priority score for a stream.
func (*Processor) GoAwayErr ¶ added in v1.1.0
func (p *Processor) GoAwayErr(lastStreamID uint32, code http2.ErrCode, debug []byte, err error) error
GoAwayErr sends a GOAWAY frame and returns the given error. SendGoAway handles flushing internally.
func (*Processor) HandleRawPing ¶ added in v1.1.0
HandleRawPing processes a PING frame directly from raw bytes. Payload must be exactly 8 bytes per RFC 7540 §6.7.
func (*Processor) HandleRawWindowUpdate ¶ added in v1.1.0
HandleRawWindowUpdate processes a WINDOW_UPDATE frame directly from raw bytes. Payload must be exactly 4 bytes per RFC 7540 §6.9.
func (*Processor) IsExpectingContinuation ¶
IsExpectingContinuation reports whether the processor is in the middle of receiving a header block.
func (*Processor) ProcessFrame ¶
ProcessFrame processes an incoming HTTP/2 frame.
func (*Processor) ProcessFrameWithConn ¶
func (p *Processor) ProcessFrameWithConn(ctx context.Context, frame http2.Frame, conn ResponseWriter) error
ProcessFrameWithConn processes a frame with a connection context.
func (*Processor) ProcessRawHeaders ¶ added in v1.1.0
ProcessRawHeaders handles a HEADERS frame from raw bytes, bypassing the x/net framer's *HeadersFrame allocation. Only valid for simple HEADERS (END_HEADERS set, no PADDED, no PRIORITY, not during CONTINUATION). All RFC 7540 validations are preserved.
func (*Processor) SendGoAway ¶
SendGoAway sends a GOAWAY frame.
func (*Processor) SetHasMoreFrames ¶ added in v1.1.0
SetHasMoreFrames tells the processor whether more frames follow the current one in the recv buffer. Used to suppress inline handler execution when the frame loop has more frames to process.
type ResponseWriter ¶
type ResponseWriter interface {
WriteResponse(stream *Stream, status int, headers [][2]string, body []byte) error
}
ResponseWriter is the minimal interface for writing HTTP responses. All engine adapters, test recorders, and stdlib bridges implement this.
type State ¶
type State int
State represents the state of an HTTP/2 stream as defined in RFC 7540.
HTTP/2 stream state constants as defined in RFC 7540 Section 5.1.
type Stream ¶
type Stream struct {
ID uint32
Headers [][2]string
Trailers [][2]string
Data *bytes.Buffer
OutboundBuffer *bytes.Buffer
OutboundEndStream bool
EndStream bool
IsStreaming bool
DeferResponse bool
ReceivedWindowUpd chan int32 // buffered; consumed by engine layer during DATA writes
ResponseWriter ResponseWriter
RemoteAddr string
ReceivedDataLen int
ReceivedInitialHeaders bool
ClosedByReset bool
IsHEAD bool
CachedCtx any // per-connection cached context (avoids pool Get/Put per request)
OnDetach func() // called by Context.Detach to install write-thread safety
// contains filtered or unexported fields
}
Stream represents an HTTP/2 stream with its associated state and data.
func NewH1Stream ¶ added in v1.1.0
NewH1Stream creates a lightweight stream optimized for H1 requests.
func (*Stream) AddHeadersBatch ¶ added in v1.1.0
AddHeadersBatch adds multiple headers under a single lock acquisition.
func (*Stream) AddWindowSize ¶ added in v1.1.0
AddWindowSize atomically adds delta to the window size and returns the new value.
func (*Stream) BufferOutbound ¶ added in v0.2.0
BufferOutbound stores data that couldn't be sent due to flow control.
func (*Stream) CompareAndSwapWindowSize ¶ added in v1.1.0
CompareAndSwapWindowSize atomically compares and swaps the window size.
func (*Stream) DeductWindow ¶ added in v0.2.0
DeductWindow subtracts n from the flow control window.
func (*Stream) ForEachHeader ¶
ForEachHeader calls fn for each header under a read lock.
func (*Stream) GetHandlerStarted ¶ added in v1.1.0
GetHandlerStarted reports whether the handler has started.
func (*Stream) GetHeaders ¶
GetHeaders returns the headers. For single-threaded H1 streams, returns the slice directly (no lock, no copy). For H2 streams, returns a safe copy under lock.
func (*Stream) GetHeadersSent ¶ added in v1.1.0
GetHeadersSent reports whether headers have been sent.
func (*Stream) GetOrCreateWindowUpdateChan ¶ added in v1.1.0
GetOrCreateWindowUpdateChan lazily allocates the ReceivedWindowUpd channel. Used by streaming handlers that need to wait for flow control updates.
func (*Stream) GetWindowSize ¶
GetWindowSize returns the current flow control window size.
func (*Stream) HasDoneCh ¶ added in v1.2.2
HasDoneCh reports whether a Done channel was created, indicating a derived context (e.g. context.WithTimeout) is watching this stream.
func (*Stream) HeadersLen ¶
HeadersLen returns the number of headers on the stream.
func (*Stream) IsCancelled ¶ added in v1.1.0
IsCancelled reports whether the stream has been cancelled.
func (*Stream) IsH1 ¶ added in v1.1.0
IsH1 returns true if this is an H1 stream (single-threaded, persistent per connection).
func (*Stream) LoadWindowSize ¶ added in v1.1.0
LoadWindowSize is an alias for GetWindowSize, used for clarity in CAS loops.
func (*Stream) MarkBuffered ¶
func (s *Stream) MarkBuffered()
MarkBuffered marks the stream as having buffered data.
func (*Stream) MarkEmpty ¶
func (s *Stream) MarkEmpty()
MarkEmpty marks the stream as having no buffered data.
func (*Stream) ProtoMajor ¶ added in v1.2.3
ProtoMajor returns the HTTP major version (1 or 2).
func (*Stream) Release ¶ added in v0.3.2
func (s *Stream) Release()
Release returns pooled buffers, cancels the context, and returns the stream to its pool. Safe to call multiple times; subsequent calls are no-ops.
func (*Stream) SetHandlerStarted ¶ added in v1.1.0
func (s *Stream) SetHandlerStarted()
SetHandlerStarted marks the handler as started.
func (*Stream) SetHeadersSent ¶ added in v1.1.0
func (s *Stream) SetHeadersSent()
SetHeadersSent marks headers as sent.
func (*Stream) SetProtoMajor ¶ added in v1.2.3
SetProtoMajor sets the HTTP major version explicitly.
func (*Stream) SetState ¶
SetState sets the stream state and atomically updates the manager's active count.
func (*Stream) SetWindowSize ¶ added in v1.1.0
SetWindowSize stores a new window size value. Used during initialization.
func (*Stream) StoreState ¶ added in v1.1.0
StoreState is a lightweight SetState for H1 streams (no manager, no Swap). Avoids the atomic.Swap + updateActiveCount overhead on the H1 hot path.
func (*Stream) WriteLock ¶
func (s *Stream) WriteLock()
WriteLock acquires the per-stream write lock.
func (*Stream) WriteUnlock ¶
func (s *Stream) WriteUnlock()
WriteUnlock releases the per-stream write lock.
type Streamer ¶ added in v1.1.0
type Streamer interface {
// WriteHeader sends the status line and headers. Must be called once before Write.
WriteHeader(stream *Stream, status int, headers [][2]string) error
// Write sends a chunk of the response body. May be called multiple times.
Write(stream *Stream, data []byte) error
// Flush ensures buffered data is sent to the network.
Flush(stream *Stream) error
// Close signals end of the response body.
Close(stream *Stream) error
}
Streamer supports incremental response writing. Engines that support streaming implement this interface on their ResponseWriter. The existing WriteResponse path is preserved for non-streaming responses (hot path).