stream

package
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package stream manages HTTP/2 stream lifecycle, state transitions, flow control, and frame processing.

Index

Constants

This section is empty.

Variables

View Source
var ErrHijackNotSupported = errors.New("celeris: hijack not supported by this engine")

ErrHijackNotSupported is returned when the engine does not support connection takeover.

Functions

func ParsePriorityFromHeaders

func ParsePriorityFromHeaders(f *http2.HeadersFrame) (dependency uint32, weight uint8, exclusive bool, hasPriority bool)

ParsePriorityFromHeaders extracts priority information from a HEADERS frame.

func ResetForPool added in v1.2.2

func ResetForPool(s *Stream)

ResetForPool returns pooled buffers and returns the stream to its pool WITHOUT cancelling the context. Use this in test harnesses where derived contexts (e.g. from context.WithTimeout) may have propagation goroutines that race with cancellation flag clearing.

func ResetH1Stream added in v1.1.0

func ResetH1Stream(s *Stream)

ResetH1Stream performs a lightweight per-request reset for H1 stream reuse.

func ResetH2StreamInline added in v1.1.0

func ResetH2StreamInline(s *Stream, id uint32)

ResetH2StreamInline performs a lightweight reset for inline H2 stream reuse. Similar to ResetH1Stream but for H2 inline handler path. Resets per-request fields while preserving the stream's pool buffers.

Types

type ContinuationState

type ContinuationState struct {
	// contains filtered or unexported fields
}

ContinuationState tracks the state of CONTINUATION frames.

type FrameWriter

type FrameWriter interface {
	WriteSettings(settings ...http2.Setting) error
	WriteSettingsAck() error
	WriteHeaders(streamID uint32, endStream bool, headerBlock []byte, maxFrameSize uint32) error
	WriteData(streamID uint32, endStream bool, data []byte) error
	WriteWindowUpdate(streamID uint32, increment uint32) error
	WriteRSTStream(streamID uint32, code http2.ErrCode) error
	WriteGoAway(lastStreamID uint32, code http2.ErrCode, debugData []byte) error
	WritePing(ack bool, data [8]byte) error
	WritePushPromise(streamID uint32, promiseID uint32, endHeaders bool, headerBlock []byte) error
}

FrameWriter is an interface for writing HTTP/2 frames.

type H2Controller added in v1.3.0

type H2Controller interface {
	SendGoAway(lastStreamID uint32, code http2.ErrCode, debug []byte) error
	MarkStreamClosed(streamID uint32)
	IsStreamClosed(streamID uint32) bool
	WriteRSTStreamPriority(streamID uint32, code http2.ErrCode) error
	CloseConn() error
}

H2Controller provides HTTP/2 connection-level control operations. Only H2-capable engines implement this; H1 adapters, test recorders, and stdlib bridges do not.

type Handler

type Handler interface {
	HandleStream(ctx context.Context, stream *Stream) error
}

Handler interface for processing streams.

type HandlerFunc

type HandlerFunc func(ctx context.Context, stream *Stream) error

HandlerFunc is an adapter to use functions as stream handlers.

func (HandlerFunc) HandleStream

func (f HandlerFunc) HandleStream(ctx context.Context, stream *Stream) error

HandleStream calls the handler function.

type Hijacker added in v1.1.0

type Hijacker interface {
	Hijack(stream *Stream) (net.Conn, error)
}

Hijacker is implemented by ResponseWriters that support connection takeover. This enables protocols like WebSocket that require raw TCP access.

type Manager

type Manager struct {
	RemoteAddr string
	// contains filtered or unexported fields
}

Manager manages multiple HTTP/2 streams.

func NewManager

func NewManager() *Manager

NewManager creates a new stream manager.

func (*Manager) AccumulateWindowUpdate

func (m *Manager) AccumulateWindowUpdate(streamID uint32, increment uint32)

AccumulateWindowUpdate accumulates window credits without sending immediately.

func (*Manager) Close added in v1.1.0

func (m *Manager) Close()

Close releases all streams still held by the manager. Called when the H2 connection is closed to prevent stream objects from leaking in the map.

func (*Manager) ConsumeSendWindow

func (m *Manager) ConsumeSendWindow(streamID uint32, n int32)

ConsumeSendWindow decrements connection and stream windows after sending DATA.

func (*Manager) ConsumeSendWindowFast

func (m *Manager) ConsumeSendWindowFast(s *Stream, n int32)

ConsumeSendWindowFast decrements connection and stream windows after sending DATA. Avoids Manager lock.

func (*Manager) CountActiveStreams

func (m *Manager) CountActiveStreams() int

CountActiveStreams returns number of streams considered active for concurrency limits.

func (*Manager) CreateStream

func (m *Manager) CreateStream(id uint32) *Stream

CreateStream creates a new stream with the given ID.

func (*Manager) DeleteStream

func (m *Manager) DeleteStream(id uint32)

DeleteStream removes a stream and releases its pooled buffers. If the stream has an async handler goroutine running (asyncRunning=true), the stream is removed from the map but NOT released — the goroutine will release it upon completion via ReleaseAsyncStream.

func (*Manager) FlushWindowUpdates

func (m *Manager) FlushWindowUpdates(writer FrameWriter, force bool) bool

FlushWindowUpdates sends accumulated WINDOW_UPDATE frames if threshold is met. Returns true if updates were sent.

func (*Manager) GetConnectionWindow

func (m *Manager) GetConnectionWindow() int32

GetConnectionWindow atomically returns the current connection window size.

func (*Manager) GetLastClientStreamID

func (m *Manager) GetLastClientStreamID() uint32

GetLastClientStreamID returns the highest client-initiated stream ID observed.

func (*Manager) GetLastStreamID

func (m *Manager) GetLastStreamID() uint32

GetLastStreamID returns the highest stream ID.

func (*Manager) GetMaxConcurrentStreams

func (m *Manager) GetMaxConcurrentStreams() uint32

GetMaxConcurrentStreams returns the currently configured max concurrent streams value.

func (*Manager) GetMaxFrameSize added in v1.1.0

func (m *Manager) GetMaxFrameSize() uint32

GetMaxFrameSize returns the current max frame size (atomic).

func (*Manager) GetOrCreateStream

func (m *Manager) GetOrCreateStream(id uint32) *Stream

GetOrCreateStream gets an existing stream or creates a new one.

func (*Manager) GetSendWindowsAndMaxFrame

func (m *Manager) GetSendWindowsAndMaxFrame(streamID uint32) (connWindow int32, streamWindow int32, maxFrame uint32)

GetSendWindowsAndMaxFrame returns current connection window, stream window, and max frame size.

func (*Manager) GetSendWindowsAndMaxFrameFast

func (m *Manager) GetSendWindowsAndMaxFrameFast(s *Stream) (connWindow int32, streamWindow int32, maxFrame uint32)

GetSendWindowsAndMaxFrameFast returns current connection window, stream window, and max frame size. It avoids Manager lock by using atomics and direct stream access.

func (*Manager) GetStream

func (m *Manager) GetStream(id uint32) (*Stream, bool)

GetStream gets a stream by ID.

func (*Manager) MarkStreamBuffered

func (m *Manager) MarkStreamBuffered(id uint32)

MarkStreamBuffered adds a stream to the set of streams with buffered data.

func (*Manager) MarkStreamEmpty

func (m *Manager) MarkStreamEmpty(id uint32)

MarkStreamEmpty removes a stream from the set of streams with buffered data.

func (*Manager) RemoveStreamFromMap added in v1.1.0

func (m *Manager) RemoveStreamFromMap(id uint32)

RemoveStreamFromMap removes a stream from the manager's map without releasing it. Used by async handler goroutines that manage their own stream lifecycle.

func (*Manager) SetMaxConcurrentStreams

func (m *Manager) SetMaxConcurrentStreams(n uint32)

SetMaxConcurrentStreams sets the maximum number of concurrent peer-initiated streams allowed.

func (*Manager) StreamCount

func (m *Manager) StreamCount() int

StreamCount returns the number of streams in the manager.

func (*Manager) TryOpenStream

func (m *Manager) TryOpenStream(id uint32) (*Stream, bool)

TryOpenStream attempts to atomically open a new stream and mark it active. Returns the opened stream and true on success; returns false if the MAX_CONCURRENT_STREAMS limit would be exceeded.

func (*Manager) UpdateConnectionWindow

func (m *Manager) UpdateConnectionWindow(delta int32)

UpdateConnectionWindow atomically updates the connection-level flow control window.

type Phase

type Phase int

Phase represents the response phase for a stream to ensure proper write ordering.

const (
	PhaseInit Phase = iota
	PhaseHeadersSent
	PhaseBody
)

Response phase constants for ensuring correct write ordering.

func (Phase) String

func (p Phase) String() string

type Priority

type Priority struct {
	StreamDependency uint32
	Weight           uint8
	Exclusive        bool
}

Priority defines stream dependency and weight for HTTP/2 prioritization.

type PriorityTree

type PriorityTree struct {
	// contains filtered or unexported fields
}

PriorityTree manages stream priorities and dependencies.

func NewPriorityTree

func NewPriorityTree() *PriorityTree

NewPriorityTree creates a new priority tree.

func (*PriorityTree) CalculateStreamPriority

func (pt *PriorityTree) CalculateStreamPriority(streamID uint32) int

CalculateStreamPriority computes a priority score for stream scheduling.

func (*PriorityTree) GetChildren

func (pt *PriorityTree) GetChildren(streamID uint32) []uint32

GetChildren returns the streams that depend on the given stream.

func (*PriorityTree) GetPriority

func (pt *PriorityTree) GetPriority(streamID uint32) (*Priority, bool)

GetPriority retrieves priority information for a stream.

func (*PriorityTree) GetWeight

func (pt *PriorityTree) GetWeight(streamID uint32) uint8

GetWeight returns the weight of a stream, defaulting to 16.

func (*PriorityTree) RemoveStream

func (pt *PriorityTree) RemoveStream(streamID uint32)

RemoveStream removes a stream and reorganizes its dependencies.

func (*PriorityTree) SetPriority

func (pt *PriorityTree) SetPriority(streamID uint32, priority Priority)

SetPriority assigns or updates priority information for a stream.

func (*PriorityTree) UpdateFromFrame

func (pt *PriorityTree) UpdateFromFrame(streamID uint32, dependency uint32, weight uint8, exclusive bool)

UpdateFromFrame updates stream priority from frame parameters.

type Processor

type Processor struct {
	InlineCachedCtx any // per-connection cached app context for inline handlers (avoids sync.Pool)

	InlineWriter       h2Conn // direct-to-outBuf writer for inline handlers (set by conn layer)
	InlineCount        uint64 // number of requests handled inline (for metrics)
	MaxRequestBodySize int64  // 0 = use default (100 MB)
	// contains filtered or unexported fields
}

Processor processes incoming HTTP/2 frames and manages streams.

func NewProcessor

func NewProcessor(handler Handler, writer FrameWriter, conn h2Conn) *Processor

NewProcessor creates a new stream processor. The conn parameter must implement both ResponseWriter and H2Controller (all H2 engine adapters do).

func (*Processor) FlushBufferedData

func (p *Processor) FlushBufferedData(_ uint32)

FlushBufferedData exposes flushBufferedData for external callers.

func (*Processor) FlushInlineCleanup added in v1.1.0

func (p *Processor) FlushInlineCleanup()

FlushInlineCleanup transitions and removes streams that completed inline during the frame loop but had cleanup deferred (pending outbound data or more frames to process). Called after the frame loop, under H2State.mu.

func (*Processor) GetConnection

func (p *Processor) GetConnection() ResponseWriter

GetConnection returns the permanent connection writer.

func (*Processor) GetCurrentConn

func (p *Processor) GetCurrentConn() ResponseWriter

GetCurrentConn returns the current connection.

func (*Processor) GetExpectedContinuationStreamID

func (p *Processor) GetExpectedContinuationStreamID() (uint32, bool)

GetExpectedContinuationStreamID returns the stream ID we're expecting CONTINUATION frames on.

func (*Processor) GetManager

func (p *Processor) GetManager() *Manager

GetManager returns the stream manager.

func (*Processor) GetStreamPriority

func (p *Processor) GetStreamPriority(streamID uint32) int

GetStreamPriority returns the priority score for a stream.

func (*Processor) GoAwayErr added in v1.1.0

func (p *Processor) GoAwayErr(lastStreamID uint32, code http2.ErrCode, debug []byte, err error) error

GoAwayErr sends a GOAWAY frame and returns the given error. SendGoAway handles flushing internally.

func (*Processor) HandleRawPing added in v1.1.0

func (p *Processor) HandleRawPing(flags byte, payload []byte) error

HandleRawPing processes a PING frame directly from raw bytes. Payload must be exactly 8 bytes per RFC 7540 §6.7.

func (*Processor) HandleRawWindowUpdate added in v1.1.0

func (p *Processor) HandleRawWindowUpdate(streamID uint32, payload []byte) error

HandleRawWindowUpdate processes a WINDOW_UPDATE frame directly from raw bytes. Payload must be exactly 4 bytes per RFC 7540 §6.9.

func (*Processor) IsExpectingContinuation

func (p *Processor) IsExpectingContinuation() bool

IsExpectingContinuation reports whether the processor is in the middle of receiving a header block.

func (*Processor) ProcessFrame

func (p *Processor) ProcessFrame(ctx context.Context, frame http2.Frame) error

ProcessFrame processes an incoming HTTP/2 frame.

func (*Processor) ProcessFrameWithConn

func (p *Processor) ProcessFrameWithConn(ctx context.Context, frame http2.Frame, conn ResponseWriter) error

ProcessFrameWithConn processes a frame with a connection context.

func (*Processor) ProcessRawHeaders added in v1.1.0

func (p *Processor) ProcessRawHeaders(streamID uint32, endStream bool, headerBlock []byte) error

ProcessRawHeaders handles a HEADERS frame from raw bytes, bypassing the x/net framer's *HeadersFrame allocation. Only valid for simple HEADERS (END_HEADERS set, no PADDED, no PRIORITY, not during CONTINUATION). All RFC 7540 validations are preserved.

func (*Processor) SendGoAway

func (p *Processor) SendGoAway(lastStreamID uint32, code http2.ErrCode, debugData []byte) error

SendGoAway sends a GOAWAY frame.

func (*Processor) SetHasMoreFrames added in v1.1.0

func (p *Processor) SetHasMoreFrames(v bool)

SetHasMoreFrames tells the processor whether more frames follow the current one in the recv buffer. Used to suppress inline handler execution when the frame loop has more frames to process.

type ResponseWriter

type ResponseWriter interface {
	WriteResponse(stream *Stream, status int, headers [][2]string, body []byte) error
}

ResponseWriter is the minimal interface for writing HTTP responses. All engine adapters, test recorders, and stdlib bridges implement this.

type State

type State int

State represents the state of an HTTP/2 stream as defined in RFC 7540.

const (
	StateIdle State = iota
	StateOpen
	StateHalfClosedLocal
	StateHalfClosedRemote
	StateClosed
)

HTTP/2 stream state constants as defined in RFC 7540 Section 5.1.

func (State) String

func (s State) String() string

type Stream

type Stream struct {
	ID uint32

	Headers           [][2]string
	Trailers          [][2]string
	Data              *bytes.Buffer
	OutboundBuffer    *bytes.Buffer
	OutboundEndStream bool

	EndStream   bool
	IsStreaming bool

	DeferResponse bool

	ReceivedWindowUpd chan int32 // buffered; consumed by engine layer during DATA writes

	ResponseWriter         ResponseWriter
	RemoteAddr             string
	ReceivedDataLen        int
	ReceivedInitialHeaders bool
	ClosedByReset          bool
	IsHEAD                 bool

	CachedCtx any    // per-connection cached context (avoids pool Get/Put per request)
	OnDetach  func() // called by Context.Detach to install write-thread safety
	// contains filtered or unexported fields
}

Stream represents an HTTP/2 stream with its associated state and data.

func NewH1Stream added in v1.1.0

func NewH1Stream(id uint32) *Stream

NewH1Stream creates a lightweight stream optimized for H1 requests.

func NewStream

func NewStream(id uint32) *Stream

NewStream creates a new stream with full H2 initialization.

func (*Stream) AddData

func (s *Stream) AddData(data []byte) error

AddData adds data to the stream buffer.

func (*Stream) AddHeader

func (s *Stream) AddHeader(name, value string)

AddHeader adds a header to the stream.

func (*Stream) AddHeadersBatch added in v1.1.0

func (s *Stream) AddHeadersBatch(headers [][2]string)

AddHeadersBatch adds multiple headers under a single lock acquisition.

func (*Stream) AddWindowSize added in v1.1.0

func (s *Stream) AddWindowSize(delta int32) int32

AddWindowSize atomically adds delta to the window size and returns the new value.

func (*Stream) BufferOutbound added in v0.2.0

func (s *Stream) BufferOutbound(data []byte, endStream bool)

BufferOutbound stores data that couldn't be sent due to flow control.

func (*Stream) Cancel

func (s *Stream) Cancel()

Cancel cancels the stream's context.

func (*Stream) CompareAndSwapWindowSize added in v1.1.0

func (s *Stream) CompareAndSwapWindowSize(old, val int32) bool

CompareAndSwapWindowSize atomically compares and swaps the window size.

func (*Stream) Context

func (s *Stream) Context() context.Context

Context returns the stream's context.

func (*Stream) DeductWindow added in v0.2.0

func (s *Stream) DeductWindow(n int32)

DeductWindow subtracts n from the flow control window.

func (*Stream) ForEachHeader

func (s *Stream) ForEachHeader(fn func(name, value string))

ForEachHeader calls fn for each header under a read lock.

func (*Stream) GetBuf added in v1.1.0

func (s *Stream) GetBuf() *bytes.Buffer

GetBuf lazily allocates the Data buffer and returns it.

func (*Stream) GetData

func (s *Stream) GetData() []byte

GetData returns the buffered data.

func (*Stream) GetHandlerStarted added in v1.1.0

func (s *Stream) GetHandlerStarted() bool

GetHandlerStarted reports whether the handler has started.

func (*Stream) GetHeaders

func (s *Stream) GetHeaders() [][2]string

GetHeaders returns the headers. For single-threaded H1 streams, returns the slice directly (no lock, no copy). For H2 streams, returns a safe copy under lock.

func (*Stream) GetHeadersSent added in v1.1.0

func (s *Stream) GetHeadersSent() bool

GetHeadersSent reports whether headers have been sent.

func (*Stream) GetOrCreateWindowUpdateChan added in v1.1.0

func (s *Stream) GetOrCreateWindowUpdateChan() chan int32

GetOrCreateWindowUpdateChan lazily allocates the ReceivedWindowUpd channel. Used by streaming handlers that need to wait for flow control updates.

func (*Stream) GetPhase

func (s *Stream) GetPhase() Phase

GetPhase returns the current stream phase.

func (*Stream) GetState

func (s *Stream) GetState() State

GetState returns the current stream state.

func (*Stream) GetWindowSize

func (s *Stream) GetWindowSize() int32

GetWindowSize returns the current flow control window size.

func (*Stream) HasDoneCh added in v1.2.2

func (s *Stream) HasDoneCh() bool

HasDoneCh reports whether a Done channel was created, indicating a derived context (e.g. context.WithTimeout) is watching this stream.

func (*Stream) HeadersLen

func (s *Stream) HeadersLen() int

HeadersLen returns the number of headers on the stream.

func (*Stream) IsCancelled added in v1.1.0

func (s *Stream) IsCancelled() bool

IsCancelled reports whether the stream has been cancelled.

func (*Stream) IsH1 added in v1.1.0

func (s *Stream) IsH1() bool

IsH1 returns true if this is an H1 stream (single-threaded, persistent per connection).

func (*Stream) LoadWindowSize added in v1.1.0

func (s *Stream) LoadWindowSize() int32

LoadWindowSize is an alias for GetWindowSize, used for clarity in CAS loops.

func (*Stream) MarkBuffered

func (s *Stream) MarkBuffered()

MarkBuffered marks the stream as having buffered data.

func (*Stream) MarkEmpty

func (s *Stream) MarkEmpty()

MarkEmpty marks the stream as having no buffered data.

func (*Stream) ProtoMajor added in v1.2.3

func (s *Stream) ProtoMajor() uint8

ProtoMajor returns the HTTP major version (1 or 2).

func (*Stream) Release added in v0.3.2

func (s *Stream) Release()

Release returns pooled buffers, cancels the context, and returns the stream to its pool. Safe to call multiple times; subsequent calls are no-ops.

func (*Stream) SetHandlerStarted added in v1.1.0

func (s *Stream) SetHandlerStarted()

SetHandlerStarted marks the handler as started.

func (*Stream) SetHeadersSent added in v1.1.0

func (s *Stream) SetHeadersSent()

SetHeadersSent marks headers as sent.

func (*Stream) SetPhase

func (s *Stream) SetPhase(p Phase)

SetPhase sets the stream phase.

func (*Stream) SetProtoMajor added in v1.2.3

func (s *Stream) SetProtoMajor(v uint8)

SetProtoMajor sets the HTTP major version explicitly.

func (*Stream) SetState

func (s *Stream) SetState(state State)

SetState sets the stream state and atomically updates the manager's active count.

func (*Stream) SetWindowSize added in v1.1.0

func (s *Stream) SetWindowSize(v int32)

SetWindowSize stores a new window size value. Used during initialization.

func (*Stream) StoreState added in v1.1.0

func (s *Stream) StoreState(state State)

StoreState is a lightweight SetState for H1 streams (no manager, no Swap). Avoids the atomic.Swap + updateActiveCount overhead on the H1 hot path.

func (*Stream) WriteLock

func (s *Stream) WriteLock()

WriteLock acquires the per-stream write lock.

func (*Stream) WriteUnlock

func (s *Stream) WriteUnlock()

WriteUnlock releases the per-stream write lock.

type Streamer added in v1.1.0

type Streamer interface {
	// WriteHeader sends the status line and headers. Must be called once before Write.
	WriteHeader(stream *Stream, status int, headers [][2]string) error
	// Write sends a chunk of the response body. May be called multiple times.
	Write(stream *Stream, data []byte) error
	// Flush ensures buffered data is sent to the network.
	Flush(stream *Stream) error
	// Close signals end of the response body.
	Close(stream *Stream) error
}

Streamer supports incremental response writing. Engines that support streaming implement this interface on their ResponseWriter. The existing WriteResponse path is preserved for non-streaming responses (hot path).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL