websockets

package module
v0.0.0-...-bafa5a9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2026 License: MIT Imports: 11 Imported by: 0

README

WebSockets

Go Report CardGo Reference

A minimalist WebSocket transport layer (RFC 6455 & RFC 7692) for Go.

This package provides synchronous, state-free primitives for WebSocket communication. It does not manage network concurrency via hidden background goroutines, automated heartbeats, or internal connection maps; control over scheduling, buffer reuse, and I/O execution is left entirely to the calling application.


Features

  • Allocation-Free Framing: Frame assembly writes directly into a reusable, contiguous scratchpad allocated during connection setup, bypassing the runtime heap entirely on writes.
  • Lookahead Streaming: Both the raw and compressed streaming engines use a double-buffer lookahead strategy.
  • Direct Dispatch Cache: The connection extracts and caches concrete pointers for *net.TCPConn and *tls.Conn at initialization. This allows the compiler to inline writes and execute static method calls instead of generic interface lookups.
  • Pure Standard Library: Zero external dependencies. Features like permessage-deflate are decoupled as decorators, exposing a loose Compressor interface to allow alternative implementations.

Performance Baselines

The following metrics were captured on an Apple M1 Pro streaming 1 MB payloads over local loopbacks (net.Pipe):

Operation Throughput Allocations Heap Churn
StreamMessageExt (Raw TCP Path) ~4.01 GB/s 0 allocs/op 0 B/op
StreamMessage (Deflate Path) ~770 MB/s 0 allocs/op 0 B/op

Note: The 3 setup allocations recorded during full benchmark runs represent the one-time initialization overhead of the test pipeline or the compression engine, not the hot-path loop execution.


Installation

go get lowbit.dev/websockets


Quick Start

1. Connection Initialization
package main

import (
	"net/http"
	"strings"
	"lowbit.dev/websockets"
)

func handleWebSocket(w http.ResponseWriter, r *http.Request) {
	extensions := r.Header.Get("Sec-WebSocket-Extensions")
	negotiateDeflate := strings.Contains(strings.ToLower(extensions), "permessage-deflate")

	hj, _ := w.(http.Hijacker)
	netConn, rw, _ := hj.Hijack()
	
	// NOTE: The returned 'rw' (*bufio.ReadWriter) may contain unread buffered bytes 
	// if the client sent data immediately following the upgrade request headers. 
	// Applications must inspect and drain rw.Reader, or use an upgrade coordinator 
	// (like Cooper) that automatically handles buffered byte replay.

	// [Perform standard HTTP 101 Handshake validation and write response headers here]
	
	var maxReadLimit int64 = 1024 * 1024 // 1 MB -- ReadLimitStandard
	var maxChunkSize int64 = 4096        // 4 KB -- ChunkSizeLowMemory
	
	conn := websockets.NewConnection(netConn, maxReadLimit, maxChunkSize)
	defer conn.Close()

	if negotiateDeflate {
		deflateConn, _ := websockets.WrapDeflate(conn, websockets.BestSpeed)
		defer deflateConn.Close()
		runEchoLoop(deflateConn)
	} else {
		runEchoLoop(conn)
	}
}

2. Reading & Writing Messages

To maintain a flat memory profile during read loops, provide a reusable byte buffer to ReadMessage. Incoming payloads will slice directly into this buffer when possible.

type WebSocketPeer interface {
	ReadMessage(buf []byte) (payload []byte, op websockets.OpCode, err error)
	WriteMessage(op websockets.OpCode, payload []byte) error
}

func runEchoLoop(ws WebSocketPeer) {
	readBuf := make([]byte, 4096)

	for {
		payload, op, err := ws.ReadMessage(readBuf)
		if err != nil {
			return // Connection closed, dropped, or protocol error
		}

		switch op {
		case websockets.OpCodeText, websockets.OpCodeBinary:
			if err := ws.WriteMessage(op, payload); err != nil {
				return
			}
		case websockets.OpCodeClose:
			return
		}
	}
}

3. Chunked Streaming

The lookahead engine dynamically segments an io.Reader into fixed fragments. Memory consumption remains locked to the specified chunk size.

func streamLargePayload(ws *websockets.Conn, data io.Reader) error {
	// Fragments and flushes data in 4KB chunks with zero heap allocations.
	// Returns ErrChunkSizeExceeded if the chunk size parameters exceed the connection maxChunkSize limit.
	return ws.StreamMessageExt(websockets.OpCodeBinary, 0x00, 4096, data)
}


Advanced: Alternative Compressors

The package uses Go's standard compress/flate implementation by default. If your environment requires higher compression throughput, you can swap out the standard library engine for an assembly-optimized or SIMD-accelerated alternative (such as Klauspost's implementation) via the WrapDeflateWithCompressor hook:

import (
	"io"
	"github.com/klauspost/compress/flate"
	"lowbit.dev/websockets"
)

func CustomCompressionUpgrade(netConn net.Conn) {
	baseConn := websockets.NewConnection(netConn, ReadLimitStandard, ChunkSizeLowMemory)

	// Initialize the alternative assembly-accelerated compressor
	klausCompressor, _ := flate.NewWriter(io.Discard, flate.BestSpeed)

	// Inject the alternative implementation into the deflation decorator layer
	deflateConn := websockets.WrapDeflateWithCompressor(baseConn, klausCompressor)
	defer deflateConn.Close()
}


Ecosystem: Handshake Management with Cooper

This library integrates with cooper, a zero-dependency handshake tool that manages HTTP/1.1 protocol negotiation, connection hijacking, and response header verification.

cooper.Hijack transparently wraps the underlying bufio read buffers, prepending any leftover bytes back onto the returned connection before passing it to your WebSocket initialization loops:

package main

import (
	"net"
	"net/http"
	"lowbit.dev/cooper"
	"lowbit.dev/websockets"
)

func main() {
	handler := cooper.Hijack(func(netConn net.Conn, proto string) {
		defer netConn.Close()

		// The connection returned by cooper is safe to use immediately; 
		// any early client data has already been prepended.
		conn := websockets.NewConnection(netConn, ReadLimitStandard, ChunkSizeLowMemory)
		
		// Run read/write loops...
	}, 
		cooper.Protocols("websocket"),
		cooper.ResponseHeaders(func(r *http.Request, proto string) http.Header {
			h := http.Header{}
			h.Set("Sec-WebSocket-Accept", calculateAcceptKey(r.Header.Get("Sec-WebSocket-Key")))
			return h
		}),
	)

	http.Handle("/ws", handler)
	http.ListenAndServe(":8080", nil)
}


License

MIT License. See LICENSE for details.

Documentation

Overview

Package websockets implements a minimalist, zero-allocation WebSocket transport layer (RFC 6455 & RFC 7692) built entirely on the Go standard library.

Rather than managing network concurrency via hidden background goroutines, automated heartbeats, or internal connection maps, this package provides synchronous, state-free building blocks. Control over scheduling, buffer reuse, and I/O execution is left completely to the calling application.

Design Characteristics

  • Allocation-Free Framing: Frame assembly writes directly into a reusable, contiguous scratchpad allocated during connection setup, bypassing the runtime heap entirely on writes.

  • Lookahead Streaming: Both the raw and compressed streaming engines use a double-buffer lookahead strategy.

  • Direct Dispatch Cache: The connection extracts and caches concrete pointers for *net.TCPConn and *tls.Conn at initialization. This allows the compiler to inline writes and execute static method calls instead of generic interface lookups.

  • Abstract Extension Hooks: Features like permessage-deflate are decoupled as decorators. The package exposes a loose Compressor interface, letting you swap out the pure-Go stdlib flate engine for alternative hardware-accelerated or assembly-optimized implementations if needed.

Usage

Connections are established by wrapping an existing network socket along with explicit message and boundary size constraints:

conn := websockets.NewConnection(netConn, maxReadLimit, maxChunkSize)
defer conn.Close()

// Reuse a local slice to keep read loops allocation-free
readBuf := make([]byte, ReadLimitStandard, ChunkSizeLowMemory)
for {
    payload, op, err := conn.ReadMessage(readBuf)
    if err != nil {
        return // Physical drop, protocol error, or explicit close
    }

    if err := conn.WriteMessage(op, payload); err != nil {
        return
    }
}

Index

Constants

View Source
const (
	// CloseNormalClosure indicates a normal connection closure.
	CloseNormalClosure CloseCode = 1000

	// CloseGoingAway indicates that an endpoint is "going away",
	// such as a server shutdown or browser navigation.
	CloseGoingAway CloseCode = 1001

	// CloseProtocolError indicates a protocol error.
	CloseProtocolError CloseCode = 1002

	// CloseUnsupportedData indicates that the endpoint received
	// data of a type it cannot accept.
	CloseUnsupportedData CloseCode = 1003

	// CloseReserved is reserved and must not be used.
	CloseReserved CloseCode = 1004

	// CloseNoStatusReceived indicates that no status code was present.
	// This code is reserved and must not be sent in a Close frame.
	CloseNoStatusReceived CloseCode = 1005

	// CloseAbnormalClosure indicates that the connection closed
	// abnormally without sending or receiving a Close frame.
	// This code is reserved and must not be sent in a Close frame.
	CloseAbnormalClosure CloseCode = 1006

	// CloseInvalidFramePayloadData indicates that the endpoint received
	// inconsistent or invalid data within a message.
	CloseInvalidFramePayloadData CloseCode = 1007

	// ClosePolicyViolation indicates that the endpoint is terminating
	// the connection due to a policy violation.
	ClosePolicyViolation CloseCode = 1008

	// CloseMessageTooBig indicates that a message was too large to process.
	CloseMessageTooBig CloseCode = 1009

	// CloseMandatoryExtension indicates that the client expected one or
	// more extensions that were not negotiated by the server.
	CloseMandatoryExtension CloseCode = 1010

	// CloseInternalServerError indicates that the server encountered
	// an unexpected condition that prevented it from fulfilling the request.
	CloseInternalServerError CloseCode = 1011

	// CloseTLSHandshake indicates that the connection was closed due
	// to a failure during the TLS handshake.
	// This code is reserved and must not be sent in a Close frame.
	CloseTLSHandshake CloseCode = 1015

	// ReadLimitSmall (64 KB) restricts incoming messages to light updates,
	// individual events, or text command signals.
	ReadLimitSmall = 64 * 1024

	// ReadLimitStandard (1 MB) provides an industry-standard ceiling suitable
	// for typical JSON API payloads, application state syncs, and average documents.
	ReadLimitStandard = 1024 * 1024

	// ReadLimitLarge (16 MB) accommodates heavy incoming data pipelines, such as
	// raw image uploads, file attachments, or dense data arrays.
	ReadLimitLarge = 16 * 1024 * 1024

	// ChunkSizeLowMemory (4 KB) minimizes the per-connection RAM footprint.
	// Aligns with standard OS virtual memory pages and fits well within common
	// network MTU boundaries. Best for high-concurrency systems like chat,
	// notifications, or IoT gateways.
	ChunkSizeLowMemory = 4096

	// ChunkSizeBalanced (8 KB) provides a middle ground for typical web applications
	// transferring medium-sized text or JSON payloads.
	ChunkSizeBalanced = 8192

	// ChunkSizeStreaming (32 KB) maximizes processing throughput for heavy file
	// transfers or compressed data lines. This matches the internal sliding
	// history window of the DEFLATE algorithm and standard io.Copy buffers.
	ChunkSizeStreaming = 32768
)

Variables

View Source
var (
	// ErrMessageTooBig is returned when an incoming message's total payload size
	// exceeds the configured maxReadLimit boundary to prevent out-of-memory vectors.
	ErrMessageTooBig = errors.New("message exceeds max read limit")

	// ErrProtocolReservedBits is returned when a frame arrives with its RSV1, RSV2,
	// or RSV3 bits set without an explicit extension (like permessage-deflate)
	// having negotiated their use during the handshake.
	ErrProtocolReservedBits = errors.New("protocol error, reserved bits must be 0")

	// ErrWebSocketClosed is returned when a read or write operation is attempted
	// on a connection that has already completed its closure handshake or has
	// been physically severed.
	ErrWebSocketClosed = errors.New("websocket closed")

	// ErrUnexpectedContinuation is returned when a continuation frame is received
	// on the wire but no fragmented message sequence was currently in progress.
	ErrUnexpectedContinuation = errors.New("protocol error, unexpected continuation frame")

	// ErrExpectedContinuation is returned when a new message initiator frame is
	// received before the prior fragmented frame sequence was properly concluded
	// with a final (FIN=true) continuation frame.
	ErrExpectedContinuation = errors.New("protocol error, expected continuation frame")

	// ErrChunkSizeExceeded is returned when a streaming operation requests a chunk
	// size that is larger than the pre-allocated maxChunkSize limit defined
	// during connection initialization.
	ErrChunkSizeExceeded = errors.New("requested chunk size exceeds connection limit")
)
View Source
var (
	// ErrInvalidOpCode is returned when a frame arrives with an unrecognized
	// or unallocated opcode bit configuration, violating RFC 6455 framing definitions.
	ErrInvalidOpCode = errors.New("invalid opcode")

	// ErrControlFramePayloadTooLarge is returned when a control frame (such as Ping,
	// Pong, or Close) arrives carrying a payload greater than 125 bytes, violating
	// the strict length boundary defined in RFC 6455 Section 5.5.
	ErrControlFramePayloadTooLarge = errors.New("control frame payload exceeds 125 bytes")

	// ErrFrameBufferTooSmall is returned when a slice provided to a low-level frame
	// reader function lacks the capacity or length necessary to ingest the incoming
	// frame's payload body without overflowing.
	ErrFrameBufferTooSmall = errors.New("provided buffer capacity too small for frame payload")
)

Functions

This section is empty.

Types

type CloseCode

type CloseCode uint16

CloseCode represents a WebSocket close status code as defined by RFC 6455.

type CloseHandler

type CloseHandler func(CloseCode, []byte)

type Compressor

type Compressor interface {
	io.Writer
	Flush() error
	Reset(w io.Writer)
}

Compressor defines the structural lifecycle operations required by the deflation loop. It directly matches the method signature of the standard library's *flate.Writer, allowing seamless interoperability with optimized assembly-accelerated engines.

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn represents an active RFC 6455 WebSocket connection.

func NewConnection

func NewConnection(conn net.Conn, maxReadLimit int64, maxChunkSize int64) *Conn

New takes an already-upgraded HTTP connection and wraps it in a WebSocket Conn. It requires a maxReadLimit to strictly bound memory allocations during frame assembly, protecting the runtime from out-of-memory vulnerabilities. if maxReadLimit is 0, the default of ReadLimitStandard (1MB) will be applied if maxChunkSize is 0, the default of ChunkSizeLowMemory (4KB) will be applied

func (*Conn) Close

func (c *Conn) Close() error

Close satisfies the io.Closer interface. It initiates a standard normal closure (status 1000) and terminates the socket, allowing it to compose naturally with standard Go defer patterns.

func (*Conn) CloseWithCode

func (c *Conn) CloseWithCode(code CloseCode) error

CloseWithCode sends a specific RFC 6455 closure status code before severing the underlying network connection.

func (*Conn) KeepAlive

func (c *Conn) KeepAlive(ctx context.Context, interval time.Duration) error

KeepAlive blocks and sends a Ping frame at the specified interval. It relies on the caller to dispatch it in a goroutine. Hidden state fails silently; by forcing the caller to invoke this, concurrency management remains entirely visible and under the application's control.

func (*Conn) ReadFrame

func (c *Conn) ReadFrame(p []byte) (Frame, error)

ReadFrame reads a single raw frame from the connection buffer. It populates and returns the data inside the provided 'p' slice to minimize allocations.

func (*Conn) ReadHeader

func (c *Conn) ReadHeader() (Header, error)

ReadHeader reads and parses a frame header directly from a raw connection. It performs no payload reading or heap allocations.

func (*Conn) ReadMessage

func (c *Conn) ReadMessage(buf []byte) ([]byte, OpCode, error)

func (*Conn) ReadMessageExt

func (c *Conn) ReadMessageExt(buf []byte) ([]byte, OpCode, byte, error)

func (*Conn) SetCloseHandler

func (c *Conn) SetCloseHandler(h CloseHandler)

func (*Conn) SetPingHandler

func (c *Conn) SetPingHandler(h PingHandler)

SetPingHandler allows callers to inject application logic (e.g., heartbeats).

func (*Conn) SetPongHandler

func (c *Conn) SetPongHandler(h PongHandler)

func (*Conn) StreamMessage

func (c *Conn) StreamMessage(op OpCode, chunkSize int, r io.Reader) error

func (*Conn) StreamMessageExt

func (c *Conn) StreamMessageExt(op OpCode, rsv byte, chunkSize int, r io.Reader) error

StreamMessageExt reads from r chunk-by-chunk and streams it over the network as fragmented frames. It utilizes a double-buffer lookahead to eliminate empty trailing closure fragments. Returns ErrInvalidOpCode if the initial opcode is not OpCodeText or OpCodeBinary.

func (*Conn) WriteFrame

func (c *Conn) WriteFrame(isFinal bool, rsv byte, op OpCode, payload []byte) error

WriteFrame is the lowest-level write primitive. It allows callers to manually construct fragmented messages by controlling the FIN bit. It uses the underlying buffered writer to minimize system calls and flushes immediately. It exposes the rsv byte (e.g., 0x40 for RSV1) so callers can implement extensions.

func (*Conn) WriteMessage

func (c *Conn) WriteMessage(op OpCode, payload []byte) error

WriteMessage sends a single, unfragmented WebSocket frame. It is a predictable convenience wrapper around WriteFrame for the vast majority of use cases.

type Frame

type Frame struct {
	IsFinal bool
	RSV     byte
	Op      OpCode
	Payload []byte
}

Frame represents the raw metadata of an individual RFC 6455 frame.

type Header struct {
	IsFinal    bool
	RSV        byte
	Op         OpCode
	IsMasked   bool
	PayloadLen int64
	MaskKey    [4]byte
}

Header represents the parsed metadata of an RFC 6455 frame header.

type OpCode

type OpCode byte

OpCode represents the type of a WebSocket frame as defined by RFC 6455.

const (

	// OpCodeContinuation identifies a continuation frame.
	// It is used to continue a fragmented message started by a
	// preceding text or binary frame.
	OpCodeContinuation OpCode = 0

	// OpCodeText identifies a text frame containing UTF-8 encoded data.
	OpCodeText OpCode = 1

	// OpCodeBinary identifies a binary frame containing arbitrary
	// application-defined binary data.
	OpCodeBinary OpCode = 2

	// OpCodeClose identifies a close control frame used to initiate
	// or acknowledge the closing handshake.
	OpCodeClose OpCode = 8

	// OpCodePing identifies a ping control frame used to check whether
	// the peer is responsive. The recipient should reply with a Pong frame.
	OpCodePing OpCode = 9

	// OpCodePong identifies a pong control frame. It is sent in response
	// to a Ping frame or may be sent unsolicited as a heartbeat.
	OpCodePong OpCode = 10
)

type PerMessageDeflateConn

type PerMessageDeflateConn struct {
	// contains filtered or unexported fields
}

PerMessageDeflateConn wraps a base Conn to handle transparent RFC 7692 compression. It recycles an internal scratch buffer to keep reading paths allocation-efficient.

func WrapDeflate

func WrapDeflate(conn *Conn, compressionLevel int) (*PerMessageDeflateConn, error)

WrapDeflate decorates a base connection with a DEFLATE compression layer. It initializes the flate compression engine with the specified compression level.

func WrapDeflateWithCompresor

func WrapDeflateWithCompresor(conn *Conn, compressor Compressor) (*PerMessageDeflateConn, error)

WrapDeflateWithCompressor allows for injecting an alternative hardware-accelerated compressor conforming to the Compressor interface.

func (*PerMessageDeflateConn) Close

func (dc *PerMessageDeflateConn) Close() error

Close terminates the active decompressor stream and shuts down the underlying connection.

func (*PerMessageDeflateConn) ReadMessage

func (dc *PerMessageDeflateConn) ReadMessage(buf []byte) ([]byte, OpCode, error)

ReadMessage reads an assembled message and deflates it.

func (*PerMessageDeflateConn) StreamMessage

func (dc *PerMessageDeflateConn) StreamMessage(op OpCode, chunkSize int, r io.Reader) error

StreamMessage reads raw data from r chunk-by-chunk, compresses it on the fly, and streams it over the wire as spec-compliant WebSocket fragments using a single-chunk lookahead.

func (*PerMessageDeflateConn) WriteMessage

func (dc *PerMessageDeflateConn) WriteMessage(op OpCode, payload []byte) error

WriteMessage compresses the payload, and writes it as an unfragmented frame with RSV1 set. Control frames bypass compression.

type PingHandler

type PingHandler func([]byte) error

type PongHandler

type PongHandler func([]byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL