Documentation
¶
Index ¶
- Variables
- type Connection
- func (sc *Connection) Close() error
- func (sc *Connection) Error(message string) error
- func (sc *Connection) ErrorMessage(ctx context.Context, code code.Code, connErr error)
- func (sc *Connection) ReadWebsocketRequest(ctx context.Context) (*streamv1.WebsocketRequest, error)
- func (sc *Connection) ReadWebsocketResponse(ctx context.Context) (*streamv1.WebsocketResponse, error)
- func (sc *Connection) WriteMessage(messageType int, data []byte) error
- func (sc *Connection) WriteWebsocketResponse(ctx context.Context, resp *streamv1.WebsocketResponse) error
- type Multiplexer
- type Processor
- func (p *Processor) Context() context.Context
- func (p *Processor) Recv() (*v2.ExecuteRequest, error)
- func (p *Processor) RecvMsg(m any) error
- func (p *Processor) Send(res *v2.ExecuteResponse) error
- func (p *Processor) SendHeader(md metadata.MD) error
- func (p *Processor) SendMsg(m any) error
- func (p *Processor) SetHeader(md metadata.MD) error
- func (p *Processor) SetTrailer(md metadata.MD)
- type StreamTap
- type Streams
- type TapFactory
- type WebSocketHandler
Constants ¶
This section is empty.
Variables ¶
var ( MultiplexerTimeout = 20 * time.Minute MultiplexerInterval = 30 * time.Second )
Multiplexer timeout and interval for inactivity (package-level, for testability).
var ClientGracePeriod = 30 * time.Second
Stream client grace period (package-level, for testability).
Functions ¶
This section is empty.
Types ¶
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection is a thin wrapper around *websocket.Conn for common WebsocketRequest/WebsocketResponse operations.
func NewConnection ¶
func NewConnection(conn *websocket.Conn) *Connection
NewConnection creates a new Connection from a websocket connection.
func (*Connection) Close ¶
func (sc *Connection) Close() error
Close closes the websocket connection.
func (*Connection) Error ¶
func (sc *Connection) Error(message string) error
Error closes the connection with a protocol error; details are in the message.
func (*Connection) ErrorMessage ¶
ErrorMessage sends an error to the websocket client before closing the connection.
func (*Connection) ReadWebsocketRequest ¶
func (sc *Connection) ReadWebsocketRequest(ctx context.Context) (*streamv1.WebsocketRequest, error)
ReadWebsocketRequest reads a WebsocketRequest from the websocket connection.
func (*Connection) ReadWebsocketResponse ¶
func (sc *Connection) ReadWebsocketResponse(ctx context.Context) (*streamv1.WebsocketResponse, error)
ReadWebsocketResponse reads a WebsocketResponse from the websocket connection.
func (*Connection) WriteMessage ¶
func (sc *Connection) WriteMessage(messageType int, data []byte) error
WriteMessage writes a message to the websocket connection.
func (*Connection) WriteWebsocketResponse ¶
func (sc *Connection) WriteWebsocketResponse(ctx context.Context, resp *streamv1.WebsocketResponse) error
WriteWebsocketResponse writes a WebsocketResponse to the websocket connection as a TextMessage.
type Multiplexer ¶
type Multiplexer struct {
// contains filtered or unexported fields
}
Multiplexer manages websocket connections, runme.Runner Execution bidirectional processing, and request/response multiplexing for a given runID. It handles multiple streams and clients, coordinating authenticated requests and responses between them and the Runme runner. The same multiplexer bridges the v2.ExecuteRequest and v2.ExecuteResponse for a run in runme.Runner for one or many Console DOM element with the same runID. Todo(sebastian): Deduplicate Cell ID to the runID to peg the run to a specific cell.
func NewMultiplexer ¶
func NewMultiplexer(ctx context.Context, runID string, auth *iam.AuthContext, runner *runme.Runner, tap StreamTap) *Multiplexer
NewMultiplexer creates a new Multiplexer (see description above). tap may be nil, in which case recording is disabled.
type Processor ¶
type Processor struct {
Ctx context.Context
RunID string
// ActiveRequests is used to signal to the multiplexer that the processor is actively executing requests.
ActiveRequests bool
ExecuteRequests chan *v2.ExecuteRequest
ExecuteResponses chan *v2.ExecuteResponse
// StopReading is used to signal to the readMessages goroutine that it should stop reading messages
StopReading chan bool
Runner *runme.Runner
}
Processor handles the v2.ExecuteRequest and v2.ExecuteResponse for a run in runme.Runner.
func NewProcessor ¶
NewProcessor creates a new Processor that handles the execution requests and responses for a run.
func (*Processor) Recv ¶
func (p *Processor) Recv() (*v2.ExecuteRequest, error)
Recv reads a v2.ExecuteRequest inside Runme.Runner for the Execute operation from the channel until it is closed.
func (*Processor) Send ¶
func (p *Processor) Send(res *v2.ExecuteResponse) error
Send sends a response message to the client. The server handler may call Send multiple times to send multiple messages to the client. An error is returned if the stream was terminated unexpectedly, and the handler method should return, as the stream is no longer usable.
func (*Processor) SetTrailer ¶
type StreamTap ¶ added in v3.16.6
type StreamTap interface {
// RunStart is called when a new multiplexer run begins processing.
RunStart(runID string)
// RunEnd is called when the multiplexer run completes.
// exitCode is the final process exit code (or -1 if unknown).
RunEnd(exitCode int)
// Output records terminal stdout data from an ExecuteResponse.
Output(data []byte)
// Stderr records terminal stderr data from an ExecuteResponse.
// Implementations may also merge this into the output stream for
// replay compatibility.
Stderr(data []byte)
// Input records terminal input data from a WebsocketRequest.
Input(data []byte)
// Resize records a terminal resize event.
Resize(cols, rows uint32)
// CommandStart is called when an ExecuteRequest with a Config is received,
// signaling the start of a command execution.
CommandStart(program, cwd string)
// CommandEnd is called when command execution finishes.
CommandEnd(exitCode int)
// ClientConnect is called when a WebSocket client connects to the multiplexer.
ClientConnect(streamID string)
// ClientDisconnect is called when a WebSocket client disconnects.
ClientDisconnect(streamID string)
// Close finalizes the recording and releases resources.
Close() error
}
StreamTap receives lifecycle and data events from the multiplexer. Implementations must be safe for concurrent use.
The multiplexer calls these methods at the corresponding points in the request/response flow. All methods are best-effort: errors are logged but do not interrupt the session.
type Streams ¶
type Streams struct {
// contains filtered or unexported fields
}
Streams manages multiple websocket connections for a Runme execution (aka "run"). Each connection represents either: - A single Console DOM element - A client reconnection (e.g. when the client is disconnected and reconnects/resumes) These connections are multiplexed bidirectionally together to handle the execution flow.
func NewStreams ¶
func NewStreams(ctx context.Context, auth *iam.AuthContext, socketRequests chan *streamv1.WebsocketRequest) *Streams
NewStreams creates a instance of Streams that manages multiple websocket connections attached to a muliplexed Runme execution.
type TapFactory ¶ added in v3.16.6
TapFactory creates a StreamTap for a given runID. If nil is returned, recording is disabled for that run.
type WebSocketHandler ¶
type WebSocketHandler struct {
// contains filtered or unexported fields
}
WebSocketHandler is a handler for websockets. A single instance is registered with the http server to connect websocket requests to RunmeHandlers.
func NewWebSocketHandler ¶
func NewWebSocketHandler(runner *runme.Runner, auth *iam.AuthContext) *WebSocketHandler
func (*WebSocketHandler) Handler ¶
func (h *WebSocketHandler) Handler(w http.ResponseWriter, r *http.Request)
Handler is the main handler mounted in a mux to handle websocket connection upgrades.
func (*WebSocketHandler) SetTapFactory ¶ added in v3.16.6
func (h *WebSocketHandler) SetTapFactory(factory TapFactory)
SetTapFactory configures a factory that creates a StreamTap for each new run. If factory is nil or returns nil, recording is disabled for that run.
func (*WebSocketHandler) Shutdown ¶ added in v3.16.5
func (h *WebSocketHandler) Shutdown()
Shutdown cancels all active multiplexers, which propagates context cancellation to running commands.