wspulse

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2026 License: MIT Imports: 10 Imported by: 0

README

wspulse/server

CI Go Reference Go

A minimal, production-ready WebSocket server for Go with room-based routing, session resumption, and pluggable codecs.

Status: v0 — API is being stabilized. Module path: github.com/wspulse/server. Package name: wspulse.


Design Goals

  • Thin transport layer: no business logic, no auth, no message history
  • Three public concepts: Server, Connection, Frame
  • Plug in any HTTP router via http.Handler
  • Swappable codecs: JSON (default) or binary (e.g. Protobuf)
  • Transparent session resumption with configurable grace window

Install

go get github.com/wspulse/server

Quick Start

import "github.com/wspulse/server" // package name: wspulse

srv := wspulse.NewServer(
    // ConnectFunc: authenticate and assign room + connection IDs
    func(r *http.Request) (roomID, connectionID string, err error) {
        token := r.URL.Query().Get("token")
        userID, err := myauth.Verify(token)
        if err != nil {
            return "", "", err // → HTTP 401
        }
        return r.URL.Query().Get("room"), userID, nil
    },
    wspulse.WithOnMessage(func(connection wspulse.Connection, f wspulse.Frame) {
        // echo back to the whole room
        srv.Broadcast(connection.RoomID(), f)
    }),
    wspulse.WithOnDisconnect(func(connection wspulse.Connection, err error) {
        log.Printf("disconnected: %s", connection.ID())
    }),
    wspulse.WithHeartbeat(10*time.Second, 30*time.Second),
    wspulse.WithResumeWindow(30*time.Second),
)

// Standard library
http.Handle("/ws", srv)

// Gin
router.GET("/ws", func(c *gin.Context) {
    srv.ServeHTTP(c.Writer, c.Request)
})
Server-initiated push
// Send to a specific connection
srv.Send(connectionID, wspulse.Frame{Event: "sys", Payload: []byte(`{"event":"welcome"}`)})

// Broadcast to a room
payload, _ := json.Marshal(myMessage)
srv.Broadcast(roomID, wspulse.Frame{Event: "msg", Payload: payload})

// Kick a connection
srv.Kick(connectionID)

// List connections in a room
connections := srv.GetConnections(roomID)
Event Routing with core/router

core/router provides Gin-style middleware and per-event handler dispatch. wspulse.Connection satisfies router.Connection via structural subtyping — no adaptor needed.

import (
    "github.com/wspulse/server" // package name: wspulse
    "github.com/wspulse/core/router"
)

rtr := router.New()
rtr.Use(router.Recovery()) // recover from panics in handlers

rtr.On("chat.message", func(c *router.Context) {
    srv.Broadcast(c.Connection.RoomID(), c.Frame)
})
rtr.On("chat.join", func(c *router.Context) {
    welcome := wspulse.Frame{Event: "chat.welcome", Payload: []byte(`"hello"`)}
    _ = c.Connection.Send(welcome)
})

var srv wspulse.Server
srv = wspulse.NewServer(
    connectFunc,
    wspulse.WithOnMessage(func(conn wspulse.Connection, f wspulse.Frame) {
        rtr.Dispatch(conn, f)
    }),
)

See wspulse/core for the full router API.


Public API Surface

Symbol Description
Server Manages sessions, heartbeats, and room routing
Connection A logical WebSocket session (ID, RoomID, Send, Close, Done)
Frame Transport unit (ID, Event, Payload []byte) — re-exported from core
ConnectFunc func(*http.Request) (roomID, connectionID string, err error)
Codec Interface: Encode(Frame), Decode([]byte), FrameType() — from core
JSONCodec Default codec — text frames, JSON payload — re-exported from core
Server options
Option Default
WithOnConnect(fn)
WithOnMessage(fn)
WithOnDisconnect(fn)
WithOnTransportDrop(fn)
WithOnTransportRestore(fn)
WithResumeWindow(d) 0 (disabled)
WithHeartbeat(ping, pong) 10 s / 30 s
WithWriteWait(d) 10 s
WithMaxMessageSize(n) 512 B
WithSendBufferSize(n) 256 frames
WithCodec(c) JSONCodec
WithCheckOrigin(fn) allow all
WithLogger(l) zap.NewNop() — accepts *zap.Logger

Features

  • Room-based routing — connections are partitioned into rooms; broadcast targets a single room.
  • Pluggable authConnectFunc runs during HTTP Upgrade, before any WebSocket frames are exchanged.
  • Session resumption — opt-in via WithResumeWindow(d). When a transport drops, the session is suspended for d before firing OnDisconnect. If the client reconnects with the same connectionID within that window, the new WebSocket is swapped in transparently — no OnConnect / OnDisconnect callbacks fire, and buffered frames are replayed in order. Disabled by default.
  • Automatic heartbeat — server-side Ping / Pong with configurable intervals (WithHeartbeat).
  • Backpressure — bounded per-connection send buffer; oldest frame is dropped on overflow during broadcast.
  • Swappable codec — JSON by default; implement the Codec interface to plug in any encoding (binary, Protobuf, MessagePack, etc.).
  • KickServer.Kick(connectionID) always destroys the session immediately, bypassing the resume window.
  • Graceful shutdownServer.Close() sends close frames to all connected clients, drains in-flight registrations, and fires OnDisconnect for every session.

Frame Routing with core/router

The router from wspulse/core integrates directly with WithOnMessage. It dispatches each incoming frame to its handler based on the "event" field in the JSON message:

{
  "id": "msg-001",
  "event": "chat.message",
  "payload": { "text": "hello" }
}

The value of "event" is frame.Event on the Go side, and is the key used to select the handler.

import (
    "github.com/wspulse/server" // package name: wspulse
    "github.com/wspulse/core/router"
)

var srv wspulse.Server

r := router.New()
r.Use(router.Recovery())
r.Use(func(c *router.Context) {
    // middleware runs before every handler
    c.Set("roomID", c.Connection.RoomID())
    c.Next()
})

// matches frames where "event" == "chat.message"
r.On("chat.message", func(c *router.Context) {
    srv.Broadcast(c.Connection.RoomID(), wspulse.Frame{
        Event:   "chat.message",
        Payload: c.Frame.Payload,
    })
})

// matches frames where "event" == "ping"
r.On("ping", func(c *router.Context) {
    _ = c.Connection.Send(wspulse.Frame{Event: "pong"})
})

srv = wspulse.NewServer(
    connectFn,
    wspulse.WithOnMessage(func(conn wspulse.Connection, f wspulse.Frame) {
        r.Dispatch(conn, f)
    }),
)

Development

make fmt        # auto-format source files (gofmt + goimports)
make check      # validate format, lint, test with race detector (pre-commit gate)
make test       # go test -race -count=3 ./...
make test-cover # go test with coverage report → coverage.html
make bench      # run benchmarks with memory allocation stats
make tidy       # go mod tidy (GOWORK=off)
make clean      # remove build artifacts and test cache

| --------------------------------------------------------- | ----------------------- | | wspulse/core | Shared types and codecs | | wspulse/client-go | Go WebSocket client |

Documentation

Index

Constants

View Source
const (
	TextMessage   = core.TextMessage
	BinaryMessage = core.BinaryMessage
)

WebSocket message type constants.

Variables

View Source
var (
	// ErrConnectionNotFound is returned by Server.Send and Server.Kick
	// when connectionID has no active connection.
	ErrConnectionNotFound = errors.New("wspulse: connection not found")

	// ErrDuplicateConnectionID is passed to the OnDisconnect callback when
	// an existing connection is kicked because a new connection registered
	// with the same connectionID.
	ErrDuplicateConnectionID = errors.New("wspulse: kicked: duplicate connection ID")

	// ErrServerClosed is returned by Server.Broadcast (and potentially
	// other methods) when the Server has already been shut down via Close().
	ErrServerClosed = errors.New("wspulse: server is closed")
)

Server-only sentinel errors. Shared errors (ErrConnectionClosed, ErrSendBufferFull) live in github.com/wspulse/core and are re-exported in types.go for convenience.

View Source
var (
	ErrConnectionClosed = core.ErrConnectionClosed
	ErrSendBufferFull   = core.ErrSendBufferFull
)

Re-exported sentinel errors from github.com/wspulse/core.

View Source
var JSONCodec = core.JSONCodec

JSONCodec is the default Codec. Frames are encoded as JSON text frames.

Functions

This section is empty.

Types

type Codec

type Codec = core.Codec

Codec encodes and decodes Frames for transmission.

type ConnectFunc

type ConnectFunc func(r *http.Request) (roomID, connectionID string, err error)

ConnectFunc authenticates an incoming HTTP upgrade request and provides the roomID and connectionID for the new connection. Returning a non-nil error rejects the upgrade with HTTP 401 and the error text as the body. If connectionID is empty the Server assigns a random UUID so that every connection has a unique, non-empty ID. Use a non-empty connectionID when the application needs deterministic IDs (e.g. for Server.Send and Server.Kick).

type Connection

type Connection interface {
	// ID returns the unique connection identifier provided by ConnectFunc.
	ID() string

	// RoomID returns the room this connection belongs to, as provided by ConnectFunc.
	RoomID() string

	// Send enqueues f for delivery to the remote peer.
	// Returns ErrConnectionClosed or ErrSendBufferFull on failure.
	Send(f Frame) error

	// Close initiates a graceful shutdown of the session.
	Close() error

	// Done returns a channel that is closed when the session is terminated.
	Done() <-chan struct{}
}

Connection represents a logical WebSocket session managed by the Server. The underlying physical WebSocket may be transparently swapped on reconnect when WithResumeWindow is configured. All exported methods are safe to call concurrently.

type Frame

type Frame = core.Frame

Frame is the minimal transport unit for WebSocket communication.

type Server

type Server interface {
	http.Handler

	// Send enqueues a Frame for the connection identified by connectionID.
	// Returns ErrConnectionNotFound if connectionID has no active connection.
	Send(connectionID string, f Frame) error

	// Broadcast enqueues a Frame for every active connection in roomID.
	Broadcast(roomID string, f Frame) error

	// Kick forcefully closes the connection identified by connectionID.
	// Kick always bypasses the resume window — the session is destroyed
	// immediately without entering the suspended state.
	// Returns ErrConnectionNotFound if connectionID has no active connection.
	Kick(connectionID string) error

	// GetConnections returns a snapshot of all registered connections in roomID.
	// This includes suspended sessions (within the resume window) that have no
	// active WebSocket transport.
	GetConnections(roomID string) []Connection

	// Close gracefully shuts down the server, terminating all connections.
	Close()
}

Server is the public interface for the WebSocket server. Callers depend on this interface rather than the concrete implementation.

func NewServer

func NewServer(connect ConnectFunc, options ...ServerOption) Server

NewServer creates and starts a Server. connect must not be nil.

type ServerOption

type ServerOption func(*serverConfig) //nolint:revive

ServerOption configures a Server.

func WithCheckOrigin

func WithCheckOrigin(fn func(r *http.Request) bool) ServerOption

WithCheckOrigin sets the origin validation function for WebSocket upgrades. Defaults to accepting all origins (permissive — tighten this in production). Panics if fn is nil; pass the default (accept-all) explicitly if desired:

server.WithCheckOrigin(func(*http.Request) bool { return true })

func WithCodec

func WithCodec(codec Codec) ServerOption

WithCodec replaces the default JSONCodec with the provided Codec. Panics if codec is nil.

func WithHeartbeat

func WithHeartbeat(pingPeriod, pongWait time.Duration) ServerOption

WithHeartbeat configures Ping/Pong heartbeat intervals. Defaults: pingPeriod=10 s, pongWait=30 s. pingPeriod must be in (0, 5m] and pongWait must be in (pingPeriod, 10m].

func WithLogger

func WithLogger(l *zap.Logger) ServerOption

WithLogger sets the zap logger used for internal diagnostics. Defaults to zap.NewNop() (silent). Pass the application logger to route wspulse transport logs through the same zap core (encoder, level, async writer). Panics if l is nil; pass zap.NewNop() explicitly if a no-op logger is desired.

func WithMaxMessageSize

func WithMaxMessageSize(n int64) ServerOption

WithMaxMessageSize sets the maximum size in bytes for inbound messages. n must be in [1, 67108864] (64 MiB).

func WithOnConnect

func WithOnConnect(fn func(Connection)) ServerOption

WithOnConnect registers a callback invoked after a connection is established and registered with the Server. The callback runs in a separate goroutine.

func WithOnDisconnect

func WithOnDisconnect(fn func(Connection, error)) ServerOption

WithOnDisconnect registers a callback invoked when a connection terminates. err is nil for a normal closure. The callback runs in a separate goroutine. When WithResumeWindow is configured, this fires only after the resume window expires without reconnection (not on every transport drop).

func WithOnMessage

func WithOnMessage(fn func(Connection, Frame)) ServerOption

WithOnMessage registers a callback invoked for every inbound Frame received from a connected client. The callback is called from the connection's readPump goroutine and must return quickly; use a goroutine for heavy work.

NOTE: fn is always called from a single readPump goroutine per Connection. On resume, the new readPump starts only after the old one has fully exited. Handlers should still be safe for concurrent use when application code accesses Connection from other goroutines (e.g. Send from an HTTP handler).

func WithOnTransportDrop added in v0.4.0

func WithOnTransportDrop(fn func(Connection, error)) ServerOption

WithOnTransportDrop registers a callback invoked when a connection's underlying WebSocket transport dies (network drop, read timeout, or peer close) and the session enters the suspended state because resumeWindow > 0.

The error parameter carries the cause of the transport failure when available (e.g. an i/o timeout from a missed Pong, or a close frame from the peer). For a normal or expected close, err may be nil, so callback implementations must not assume it is always non-nil.

This callback does NOT fire when:

  • resumeWindow is 0 (OnDisconnect fires directly instead).
  • the connection is removed via Kick() or Connection.Close() (OnDisconnect fires directly instead).

The callback runs in a separate goroutine; it must be safe for concurrent use.

func WithOnTransportRestore added in v0.4.0

func WithOnTransportRestore(fn func(Connection)) ServerOption

WithOnTransportRestore registers a callback invoked when a suspended session resumes after a client reconnects with the same connectionID within the resume window.

When this fires, OnConnect and OnDisconnect are NOT called — the session continues as if the transport had never dropped. Buffered frames are replayed to the new transport before the callback is invoked.

This callback does NOT fire when:

  • resumeWindow is 0 (session resumption is disabled).
  • the resume window expires before the client reconnects (OnDisconnect fires instead).

The callback runs in a separate goroutine; it must be safe for concurrent use.

func WithResumeWindow

func WithResumeWindow(d time.Duration) ServerOption

WithResumeWindow configures the session resumption window. When a transport drops, the session is suspended for d before firing OnDisconnect. If the same connectionID reconnects within that period, the session resumes transparently. Valid range: 0 (disabled) … no upper limit. Default is 0 (OnDisconnect fires immediately).

func WithSendBufferSize

func WithSendBufferSize(n int) ServerOption

WithSendBufferSize sets the per-connection outbound channel capacity (number of frames). n must be in [1, 4096].

func WithWriteWait

func WithWriteWait(d time.Duration) ServerOption

WithWriteWait sets the deadline for a single write operation on a connection. d must be in (0, 30s].

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL