stream

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package stream manages HTTP/2 stream lifecycle, state transitions, flow control, and frame processing.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ParsePriorityFromHeaders

func ParsePriorityFromHeaders(f *http2.HeadersFrame) (dependency uint32, weight uint8, exclusive bool, hasPriority bool)

ParsePriorityFromHeaders extracts priority information from a HEADERS frame.

Types

type ContinuationState

type ContinuationState struct {
	// contains filtered or unexported fields
}

ContinuationState tracks the state of CONTINUATION frames.

type FrameWriter

type FrameWriter interface {
	WriteSettings(settings ...http2.Setting) error
	WriteSettingsAck() error
	WriteHeaders(streamID uint32, endStream bool, headerBlock []byte, maxFrameSize uint32) error
	WriteData(streamID uint32, endStream bool, data []byte) error
	WriteWindowUpdate(streamID uint32, increment uint32) error
	WriteRSTStream(streamID uint32, code http2.ErrCode) error
	WriteGoAway(lastStreamID uint32, code http2.ErrCode, debugData []byte) error
	WritePing(ack bool, data [8]byte) error
	WritePushPromise(streamID uint32, promiseID uint32, endHeaders bool, headerBlock []byte) error
}

FrameWriter is an interface for writing HTTP/2 frames.

type Handler

type Handler interface {
	HandleStream(ctx context.Context, stream *Stream) error
}

Handler interface for processing streams.

type HandlerFunc

type HandlerFunc func(ctx context.Context, stream *Stream) error

HandlerFunc is an adapter to use functions as stream handlers.

func (HandlerFunc) HandleStream

func (f HandlerFunc) HandleStream(ctx context.Context, stream *Stream) error

HandleStream calls the handler function.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages multiple HTTP/2 streams.

func NewManager

func NewManager() *Manager

NewManager creates a new stream manager.

func (*Manager) AccumulateWindowUpdate

func (m *Manager) AccumulateWindowUpdate(streamID uint32, increment uint32)

AccumulateWindowUpdate accumulates window credits without sending immediately.

func (*Manager) ConsumeSendWindow

func (m *Manager) ConsumeSendWindow(streamID uint32, n int32)

ConsumeSendWindow decrements connection and stream windows after sending DATA.

func (*Manager) ConsumeSendWindowFast

func (m *Manager) ConsumeSendWindowFast(s *Stream, n int32)

ConsumeSendWindowFast decrements connection and stream windows after sending DATA. Avoids Manager lock.

func (*Manager) CountActiveStreams

func (m *Manager) CountActiveStreams() int

CountActiveStreams returns number of streams considered active for concurrency limits.

func (*Manager) CreateStream

func (m *Manager) CreateStream(id uint32) *Stream

CreateStream creates a new stream with the given ID.

func (*Manager) DeleteStream

func (m *Manager) DeleteStream(id uint32)

DeleteStream removes a stream and releases its pooled buffers.

func (*Manager) FlushWindowUpdates

func (m *Manager) FlushWindowUpdates(writer FrameWriter, force bool) bool

FlushWindowUpdates sends accumulated WINDOW_UPDATE frames if threshold is met. Returns true if updates were sent.

func (*Manager) GetConnectionWindow

func (m *Manager) GetConnectionWindow() int32

GetConnectionWindow returns the current connection window size.

func (*Manager) GetLastClientStreamID

func (m *Manager) GetLastClientStreamID() uint32

GetLastClientStreamID returns the highest client-initiated stream ID observed.

func (*Manager) GetLastStreamID

func (m *Manager) GetLastStreamID() uint32

GetLastStreamID returns the highest stream ID.

func (*Manager) GetMaxConcurrentStreams

func (m *Manager) GetMaxConcurrentStreams() uint32

GetMaxConcurrentStreams returns the currently configured max concurrent streams value.

func (*Manager) GetOrCreateStream

func (m *Manager) GetOrCreateStream(id uint32) *Stream

GetOrCreateStream gets an existing stream or creates a new one.

func (*Manager) GetSendWindowsAndMaxFrame

func (m *Manager) GetSendWindowsAndMaxFrame(streamID uint32) (connWindow int32, streamWindow int32, maxFrame uint32)

GetSendWindowsAndMaxFrame returns current connection window, stream window, and max frame size.

func (*Manager) GetSendWindowsAndMaxFrameFast

func (m *Manager) GetSendWindowsAndMaxFrameFast(s *Stream) (connWindow int32, streamWindow int32, maxFrame uint32)

GetSendWindowsAndMaxFrameFast returns current connection window, stream window, and max frame size. It avoids Manager lock by using atomics and direct stream access.

func (*Manager) GetStream

func (m *Manager) GetStream(id uint32) (*Stream, bool)

GetStream gets a stream by ID.

func (*Manager) MarkStreamBuffered

func (m *Manager) MarkStreamBuffered(id uint32)

MarkStreamBuffered adds a stream to the set of streams with buffered data.

func (*Manager) MarkStreamEmpty

func (m *Manager) MarkStreamEmpty(id uint32)

MarkStreamEmpty removes a stream from the set of streams with buffered data.

func (*Manager) SetMaxConcurrentStreams

func (m *Manager) SetMaxConcurrentStreams(n uint32)

SetMaxConcurrentStreams sets the maximum number of concurrent peer-initiated streams allowed.

func (*Manager) StreamCount

func (m *Manager) StreamCount() int

StreamCount returns the number of streams in the manager.

func (*Manager) TryOpenStream

func (m *Manager) TryOpenStream(id uint32) (*Stream, bool)

TryOpenStream attempts to atomically open a new stream and mark it active. Returns the opened stream and true on success; returns false if the MAX_CONCURRENT_STREAMS limit would be exceeded.

func (*Manager) UpdateConnectionWindow

func (m *Manager) UpdateConnectionWindow(delta int32)

UpdateConnectionWindow updates the connection-level flow control window.

type Phase

type Phase int

Phase represents the response phase for a stream to ensure proper write ordering.

const (
	PhaseInit Phase = iota
	PhaseHeadersSent
	PhaseBody
)

Response phase constants for ensuring correct write ordering.

func (Phase) String

func (p Phase) String() string

type Priority

type Priority struct {
	StreamDependency uint32
	Weight           uint8
	Exclusive        bool
}

Priority defines stream dependency and weight for HTTP/2 prioritization.

type PriorityTree

type PriorityTree struct {
	// contains filtered or unexported fields
}

PriorityTree manages stream priorities and dependencies.

func NewPriorityTree

func NewPriorityTree() *PriorityTree

NewPriorityTree creates a new priority tree.

func (*PriorityTree) CalculateStreamPriority

func (pt *PriorityTree) CalculateStreamPriority(streamID uint32) int

CalculateStreamPriority computes a priority score for stream scheduling.

func (*PriorityTree) GetChildren

func (pt *PriorityTree) GetChildren(streamID uint32) []uint32

GetChildren returns the streams that depend on the given stream.

func (*PriorityTree) GetPriority

func (pt *PriorityTree) GetPriority(streamID uint32) (*Priority, bool)

GetPriority retrieves priority information for a stream.

func (*PriorityTree) GetWeight

func (pt *PriorityTree) GetWeight(streamID uint32) uint8

GetWeight returns the weight of a stream, defaulting to 16.

func (*PriorityTree) RemoveStream

func (pt *PriorityTree) RemoveStream(streamID uint32)

RemoveStream removes a stream and reorganizes its dependencies.

func (*PriorityTree) SetPriority

func (pt *PriorityTree) SetPriority(streamID uint32, priority Priority)

SetPriority assigns or updates priority information for a stream.

func (*PriorityTree) UpdateFromFrame

func (pt *PriorityTree) UpdateFromFrame(streamID uint32, dependency uint32, weight uint8, exclusive bool)

UpdateFromFrame updates stream priority from frame parameters.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor processes incoming HTTP/2 frames and manages streams.

func NewProcessor

func NewProcessor(handler Handler, writer FrameWriter, conn ResponseWriter) *Processor

NewProcessor creates a new stream processor.

func (*Processor) FlushBufferedData

func (p *Processor) FlushBufferedData(_ uint32)

FlushBufferedData exposes flushBufferedData for external callers.

func (*Processor) GetConnection

func (p *Processor) GetConnection() ResponseWriter

GetConnection returns the permanent connection writer.

func (*Processor) GetCurrentConn

func (p *Processor) GetCurrentConn() ResponseWriter

GetCurrentConn returns the current connection.

func (*Processor) GetExpectedContinuationStreamID

func (p *Processor) GetExpectedContinuationStreamID() (uint32, bool)

GetExpectedContinuationStreamID returns the stream ID we're expecting CONTINUATION frames on.

func (*Processor) GetManager

func (p *Processor) GetManager() *Manager

GetManager returns the stream manager.

func (*Processor) GetStreamPriority

func (p *Processor) GetStreamPriority(streamID uint32) int

GetStreamPriority returns the priority score for a stream.

func (*Processor) IsExpectingContinuation

func (p *Processor) IsExpectingContinuation() bool

IsExpectingContinuation reports whether the processor is in the middle of receiving a header block.

func (*Processor) ProcessFrame

func (p *Processor) ProcessFrame(ctx context.Context, frame http2.Frame) error

ProcessFrame processes an incoming HTTP/2 frame.

func (*Processor) ProcessFrameWithConn

func (p *Processor) ProcessFrameWithConn(ctx context.Context, frame http2.Frame, conn ResponseWriter) error

ProcessFrameWithConn processes a frame with a connection context.

func (*Processor) SendGoAway

func (p *Processor) SendGoAway(lastStreamID uint32, code http2.ErrCode, debugData []byte) error

SendGoAway sends a GOAWAY frame.

type ResponseWriter

type ResponseWriter interface {
	WriteResponse(stream *Stream, status int, headers [][2]string, body []byte) error
	SendGoAway(lastStreamID uint32, code http2.ErrCode, debug []byte) error
	MarkStreamClosed(streamID uint32)
	IsStreamClosed(streamID uint32) bool
	WriteRSTStreamPriority(streamID uint32, code http2.ErrCode) error
	CloseConn() error
}

ResponseWriter is an interface for writing responses.

type State

type State int

State represents the state of an HTTP/2 stream as defined in RFC 7540.

const (
	StateIdle State = iota
	StateOpen
	StateHalfClosedLocal
	StateHalfClosedRemote
	StateClosed
)

HTTP/2 stream state constants as defined in RFC 7540 Section 5.1.

func (State) String

func (s State) String() string

type Stream

type Stream struct {
	ID    uint32
	State State

	Headers           [][2]string
	Trailers          [][2]string
	Data              *bytes.Buffer
	OutboundBuffer    *bytes.Buffer
	OutboundEndStream bool
	HeadersSent       bool
	EndStream         bool
	IsStreaming       bool
	HandlerStarted    bool
	DeferResponse     bool
	WindowSize        int32
	ReceivedWindowUpd chan int32 // buffered; consumed by engine layer during DATA writes

	ResponseWriter         ResponseWriter
	ReceivedDataLen        int
	ReceivedInitialHeaders bool
	ClosedByReset          bool
	// contains filtered or unexported fields
}

Stream represents an HTTP/2 stream with its associated state and data.

func NewStream

func NewStream(id uint32) *Stream

NewStream creates a new stream.

func (*Stream) AddData

func (s *Stream) AddData(data []byte) error

AddData adds data to the stream buffer.

func (*Stream) AddHeader

func (s *Stream) AddHeader(name, value string)

AddHeader adds a header to the stream.

func (*Stream) BufferOutbound added in v0.2.0

func (s *Stream) BufferOutbound(data []byte, endStream bool)

BufferOutbound stores data that couldn't be sent due to flow control.

func (*Stream) Cancel

func (s *Stream) Cancel()

Cancel cancels the stream's context.

func (*Stream) Context

func (s *Stream) Context() context.Context

Context returns the stream's context.

func (*Stream) DeductWindow added in v0.2.0

func (s *Stream) DeductWindow(n int32)

DeductWindow subtracts n from the flow control window.

func (*Stream) ForEachHeader

func (s *Stream) ForEachHeader(fn func(name, value string))

ForEachHeader calls fn for each header under a read lock.

func (*Stream) GetData

func (s *Stream) GetData() []byte

GetData returns the buffered data.

func (*Stream) GetHeaders

func (s *Stream) GetHeaders() [][2]string

GetHeaders returns a copy of the headers.

func (*Stream) GetPhase

func (s *Stream) GetPhase() Phase

GetPhase returns the current stream phase.

func (*Stream) GetState

func (s *Stream) GetState() State

GetState returns the current stream state.

func (*Stream) GetWindowSize

func (s *Stream) GetWindowSize() int32

GetWindowSize returns the current flow control window size.

func (*Stream) HeadersLen

func (s *Stream) HeadersLen() int

HeadersLen returns the number of headers on the stream.

func (*Stream) MarkBuffered

func (s *Stream) MarkBuffered()

MarkBuffered marks the stream as having buffered data.

func (*Stream) MarkEmpty

func (s *Stream) MarkEmpty()

MarkEmpty marks the stream as having no buffered data.

func (*Stream) Release added in v0.3.2

func (s *Stream) Release()

Release returns pooled buffers, cancels the context, and returns the stream to its pool. Safe to call multiple times; subsequent calls are no-ops.

func (*Stream) SetPhase

func (s *Stream) SetPhase(p Phase)

SetPhase sets the stream phase.

func (*Stream) SetState

func (s *Stream) SetState(state State)

SetState sets the stream state.

func (*Stream) WriteLock

func (s *Stream) WriteLock()

WriteLock acquires the per-stream write lock.

func (*Stream) WriteUnlock

func (s *Stream) WriteUnlock()

WriteUnlock releases the per-stream write lock.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL