stream

package
v3.16.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	MultiplexerTimeout  = 20 * time.Minute
	MultiplexerInterval = 30 * time.Second
)

Multiplexer timeout and interval for inactivity (package-level, for testability).

View Source
var ClientGracePeriod = 30 * time.Second

Stream client grace period (package-level, for testability).

Functions

This section is empty.

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is a thin wrapper around *websocket.Conn for common WebsocketRequest/WebsocketResponse operations.

func NewConnection

func NewConnection(conn *websocket.Conn) *Connection

NewConnection creates a new Connection from a websocket connection.

func (*Connection) Close

func (sc *Connection) Close() error

Close closes the websocket connection.

func (*Connection) Error

func (sc *Connection) Error(message string) error

Error closes the connection with a protocol error; details are in the message.

func (*Connection) ErrorMessage

func (sc *Connection) ErrorMessage(ctx context.Context, code code.Code, connErr error)

ErrorMessage sends an error to the websocket client before closing the connection.

func (*Connection) ReadWebsocketRequest

func (sc *Connection) ReadWebsocketRequest(ctx context.Context) (*streamv1.WebsocketRequest, error)

ReadWebsocketRequest reads a WebsocketRequest from the websocket connection.

func (*Connection) ReadWebsocketResponse

func (sc *Connection) ReadWebsocketResponse(ctx context.Context) (*streamv1.WebsocketResponse, error)

ReadWebsocketResponse reads a WebsocketResponse from the websocket connection.

func (*Connection) WriteMessage

func (sc *Connection) WriteMessage(messageType int, data []byte) error

WriteMessage writes a message to the websocket connection.

func (*Connection) WriteWebsocketResponse

func (sc *Connection) WriteWebsocketResponse(ctx context.Context, resp *streamv1.WebsocketResponse) error

WriteWebsocketResponse writes a WebsocketResponse to the websocket connection as a TextMessage.

type Multiplexer

type Multiplexer struct {
	// contains filtered or unexported fields
}

Multiplexer manages websocket connections, runme.Runner Execution bidirectional processing, and request/response multiplexing for a given runID. It handles multiple streams and clients, coordinating authenticated requests and responses between them and the Runme runner. The same multiplexer bridges the v2.ExecuteRequest and v2.ExecuteResponse for a run in runme.Runner for one or many Console DOM element with the same runID. Todo(sebastian): Deduplicate Cell ID to the runID to peg the run to a specific cell.

func NewMultiplexer

func NewMultiplexer(ctx context.Context, runID string, auth *iam.AuthContext, runner *runme.Runner, tap StreamTap) *Multiplexer

NewMultiplexer creates a new Multiplexer (see description above). tap may be nil, in which case recording is disabled.

type Processor

type Processor struct {
	Ctx   context.Context
	RunID string
	// ActiveRequests is used to signal to the multiplexer that the processor is actively executing requests.
	ActiveRequests   bool
	ExecuteRequests  chan *v2.ExecuteRequest
	ExecuteResponses chan *v2.ExecuteResponse
	// StopReading is used to signal to the readMessages goroutine that it should stop reading messages
	StopReading chan bool

	Runner *runme.Runner
}

Processor handles the v2.ExecuteRequest and v2.ExecuteResponse for a run in runme.Runner.

func NewProcessor

func NewProcessor(ctx context.Context, runID string) *Processor

NewProcessor creates a new Processor that handles the execution requests and responses for a run.

func (*Processor) Context

func (p *Processor) Context() context.Context

func (*Processor) Recv

func (p *Processor) Recv() (*v2.ExecuteRequest, error)

Recv reads a v2.ExecuteRequest inside Runme.Runner for the Execute operation from the channel until it is closed.

func (*Processor) RecvMsg

func (p *Processor) RecvMsg(m any) error

func (*Processor) Send

func (p *Processor) Send(res *v2.ExecuteResponse) error

Send sends a response message to the client. The server handler may call Send multiple times to send multiple messages to the client. An error is returned if the stream was terminated unexpectedly, and the handler method should return, as the stream is no longer usable.

func (*Processor) SendHeader

func (p *Processor) SendHeader(md metadata.MD) error

func (*Processor) SendMsg

func (p *Processor) SendMsg(m any) error

func (*Processor) SetHeader

func (p *Processor) SetHeader(md metadata.MD) error

func (*Processor) SetTrailer

func (p *Processor) SetTrailer(md metadata.MD)

type StreamTap added in v3.16.6

type StreamTap interface {
	// RunStart is called when a new multiplexer run begins processing.
	RunStart(runID string)

	// RunEnd is called when the multiplexer run completes.
	// exitCode is the final process exit code (or -1 if unknown).
	RunEnd(exitCode int)

	// Output records terminal stdout data from an ExecuteResponse.
	Output(data []byte)

	// Stderr records terminal stderr data from an ExecuteResponse.
	// Implementations may also merge this into the output stream for
	// replay compatibility.
	Stderr(data []byte)

	// Input records terminal input data from a WebsocketRequest.
	Input(data []byte)

	// Resize records a terminal resize event.
	Resize(cols, rows uint32)

	// CommandStart is called when an ExecuteRequest with a Config is received,
	// signaling the start of a command execution.
	CommandStart(program, cwd string)

	// CommandEnd is called when command execution finishes.
	CommandEnd(exitCode int)

	// ClientConnect is called when a WebSocket client connects to the multiplexer.
	ClientConnect(streamID string)

	// ClientDisconnect is called when a WebSocket client disconnects.
	ClientDisconnect(streamID string)

	// Close finalizes the recording and releases resources.
	Close() error
}

StreamTap receives lifecycle and data events from the multiplexer. Implementations must be safe for concurrent use.

The multiplexer calls these methods at the corresponding points in the request/response flow. All methods are best-effort: errors are logged but do not interrupt the session.

type Streams

type Streams struct {
	// contains filtered or unexported fields
}

Streams manages multiple websocket connections for a Runme execution (aka "run"). Each connection represents either: - A single Console DOM element - A client reconnection (e.g. when the client is disconnected and reconnects/resumes) These connections are multiplexed bidirectionally together to handle the execution flow.

func NewStreams

func NewStreams(ctx context.Context, auth *iam.AuthContext, socketRequests chan *streamv1.WebsocketRequest) *Streams

NewStreams creates a instance of Streams that manages multiple websocket connections attached to a muliplexed Runme execution.

type TapFactory added in v3.16.6

type TapFactory func(runID string) StreamTap

TapFactory creates a StreamTap for a given runID. If nil is returned, recording is disabled for that run.

type WebSocketHandler

type WebSocketHandler struct {
	// contains filtered or unexported fields
}

WebSocketHandler is a handler for websockets. A single instance is registered with the http server to connect websocket requests to RunmeHandlers.

func NewWebSocketHandler

func NewWebSocketHandler(runner *runme.Runner, auth *iam.AuthContext) *WebSocketHandler

func (*WebSocketHandler) Handler

func (h *WebSocketHandler) Handler(w http.ResponseWriter, r *http.Request)

Handler is the main handler mounted in a mux to handle websocket connection upgrades.

func (*WebSocketHandler) SetTapFactory added in v3.16.6

func (h *WebSocketHandler) SetTapFactory(factory TapFactory)

SetTapFactory configures a factory that creates a StreamTap for each new run. If factory is nil or returns nil, recording is disabled for that run.

func (*WebSocketHandler) Shutdown added in v3.16.5

func (h *WebSocketHandler) Shutdown()

Shutdown cancels all active multiplexers, which propagates context cancellation to running commands.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL