server

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: MIT Imports: 10 Imported by: 0

README

wspulse/server

A minimal, production-ready WebSocket server for Go with room-based routing, session resumption, and pluggable codecs.

Status: v0 — API is being stabilized. Module path: github.com/wspulse/server.


Design Goals

  • Thin transport layer: no business logic, no auth, no message history
  • Three public concepts: Server, Connection, Frame
  • Plug in any HTTP router via http.Handler
  • Swappable codecs: JSON (default) or binary (e.g. Protobuf)
  • Transparent session resumption with configurable grace window

Install

go get github.com/wspulse/server

Quick Start

import "github.com/wspulse/server"

srv := server.NewServer(
    // ConnectFunc: authenticate and assign room + connection IDs
    func(r *http.Request) (roomID, connectionID string, err error) {
        token := r.URL.Query().Get("token")
        userID, err := myauth.Verify(token)
        if err != nil {
            return "", "", err // → HTTP 401
        }
        return r.URL.Query().Get("room"), userID, nil
    },
    server.WithOnMessage(func(connection server.Connection, f server.Frame) {
        // echo back to the whole room
        srv.Broadcast(connection.RoomID(), f)
    }),
    server.WithOnDisconnect(func(connection server.Connection, err error) {
        log.Printf("disconnected: %s", connection.ID())
    }),
    server.WithHeartbeat(10*time.Second, 30*time.Second),
    server.WithResumeWindow(30),
)

// Standard library
http.Handle("/ws", srv)

// Gin
router.GET("/ws", func(c *gin.Context) {
    srv.ServeHTTP(c.Writer, c.Request)
})
Server-initiated push
// Send to a specific connection
srv.Send(connectionID, server.Frame{Event: "sys", Payload: []byte(`{"event":"welcome"}`)})

// Broadcast to a room
payload, _ := json.Marshal(myMessage)
srv.Broadcast(roomID, server.Frame{Event: "msg", Payload: payload})

// Kick a connection
srv.Kick(connectionID)

// List connections in a room
connections := srv.GetConnections(roomID)

Public API Surface

Symbol Description
Server Manages sessions, heartbeats, and room routing
Connection A logical WebSocket session (ID, RoomID, Send, Close, Done)
Frame Transport unit (ID, Event, Payload []byte) — re-exported from core
ConnectFunc func(*http.Request) (roomID, connectionID string, err error)
Codec Interface: Encode(Frame), Decode([]byte), FrameType() — from core
JSONCodec Default codec — text frames, JSON payload — re-exported from core
Server options
Option Default
WithOnConnect(fn)
WithOnMessage(fn)
WithOnDisconnect(fn)
WithResumeWindow(seconds) 0 (disabled)
WithHeartbeat(ping, pong) 10 s / 30 s
WithWriteWait(d) 10 s
WithMaxMessageSize(n) 512 B
WithSendBufferSize(n) 256 frames
WithCodec(c) JSONCodec
WithCheckOrigin(fn) allow all
WithLogger(l) zap.NewNop() — accepts *zap.Logger

Features

  • Room-based routing — connections are partitioned into rooms; broadcast targets a single room.
  • Pluggable authConnectFunc runs during HTTP Upgrade, before any WebSocket frames are exchanged.
  • Session resumption — opt-in via WithResumeWindow(seconds). When a transport drops, the session is suspended for seconds before firing OnDisconnect. If the client reconnects with the same connectionID within that window, the new WebSocket is swapped in transparently — no OnConnect / OnDisconnect callbacks fire, and buffered frames are replayed in order. Disabled by default.
  • Automatic heartbeat — server-side Ping / Pong with configurable intervals (WithHeartbeat).
  • Backpressure — bounded per-connection send buffer; oldest frame is dropped on overflow during broadcast.
  • Swappable codec — JSON by default; implement the Codec interface to plug in any encoding (binary, Protobuf, MessagePack, etc.).
  • KickServer.Kick(connectionID) always destroys the session immediately, bypassing the resume window.
  • Graceful shutdownServer.Close() sends close frames to all connected clients, drains in-flight registrations, and fires OnDisconnect for every session.

Frame Routing with core/router

The router from wspulse/core integrates directly with WithOnMessage. It dispatches each incoming frame to its handler based on the "event" field in the JSON message:

{
  "id": "msg-001",
  "event": "chat.message",
  "payload": { "text": "hello" }
}

The value of "event" is frame.Event on the Go side, and is the key used to select the handler.

import (
    "github.com/wspulse/server"
    "github.com/wspulse/core/router"
)

var srv server.Server

r := router.New()
r.Use(router.Recovery())
r.Use(func(c *router.Context) {
    // middleware runs before every handler
    c.Set("roomID", c.Connection.RoomID())
    c.Next()
})

// matches frames where "event" == "chat.message"
r.On("chat.message", func(c *router.Context) {
    srv.Broadcast(c.Connection.RoomID(), server.Frame{
        Event:   "chat.message",
        Payload: c.Frame.Payload,
    })
})

// matches frames where "event" == "ping"
r.On("ping", func(c *router.Context) {
    _ = c.Connection.Send(server.Frame{Event: "pong"})
})

srv = server.NewServer(
    connectFn,
    server.WithOnMessage(func(conn server.Connection, f server.Frame) {
        r.Dispatch(conn, f)
    }),
)

Wire Protocol

See doc/protocol.md for the JSON frame format.

Internals

See doc/internals.md for the goroutine model, heartbeat mechanism, backpressure, and session resumption implementation details.


Module Description
wspulse/core Shared types and codecs
wspulse/client-go Go WebSocket client

Documentation

Index

Constants

View Source
const (
	TextMessage   = wspulse.TextMessage
	BinaryMessage = wspulse.BinaryMessage
)

WebSocket message type constants.

Variables

View Source
var (
	// ErrConnectionNotFound is returned by Server.Send and Server.Kick
	// when connectionID has no active connection.
	ErrConnectionNotFound = errors.New("wspulse: connection not found")

	// ErrDuplicateConnectionID is passed to the OnDisconnect callback when
	// an existing connection is kicked because a new connection registered
	// with the same connectionID.
	ErrDuplicateConnectionID = errors.New("wspulse: kicked: duplicate connection ID")

	// ErrServerClosed is returned by Server.Broadcast (and potentially
	// other methods) when the Server has already been shut down via Close().
	ErrServerClosed = errors.New("wspulse: server is closed")
)

Server-only sentinel errors. Shared errors (ErrConnectionClosed, ErrSendBufferFull) live in github.com/wspulse/core and are re-exported in types.go for convenience.

View Source
var (
	ErrConnectionClosed = wspulse.ErrConnectionClosed
	ErrSendBufferFull   = wspulse.ErrSendBufferFull
)

Re-exported sentinel errors from github.com/wspulse/core.

View Source
var JSONCodec = wspulse.JSONCodec

JSONCodec is the default Codec. Frames are encoded as JSON text frames.

Functions

This section is empty.

Types

type Codec

type Codec = wspulse.Codec

Codec encodes and decodes Frames for transmission.

type ConnectFunc

type ConnectFunc func(r *http.Request) (roomID, connectionID string, err error)

ConnectFunc authenticates an incoming HTTP upgrade request and provides the roomID and connectionID for the new connection. Returning a non-nil error rejects the upgrade with HTTP 401 and the error text as the body. If connectionID is empty the Server assigns a random UUID so that every connection has a unique, non-empty ID. Use a non-empty connectionID when the application needs deterministic IDs (e.g. for Server.Send and Server.Kick).

type Connection

type Connection interface {
	// ID returns the unique connection identifier provided by ConnectFunc.
	ID() string

	// RoomID returns the room this connection belongs to, as provided by ConnectFunc.
	RoomID() string

	// Send enqueues f for delivery to the remote peer.
	// Returns ErrConnectionClosed or ErrSendBufferFull on failure.
	Send(f Frame) error

	// Close initiates a graceful shutdown of the session.
	Close() error

	// Done returns a channel that is closed when the session is terminated.
	Done() <-chan struct{}
}

Connection represents a logical WebSocket session managed by the Server. The underlying physical WebSocket may be transparently swapped on reconnect when WithResumeWindow is configured. All exported methods are safe to call concurrently.

type Frame

type Frame = wspulse.Frame

Frame is the minimal transport unit for WebSocket communication.

type Server

type Server interface {
	http.Handler

	// Send enqueues a Frame for the connection identified by connectionID.
	// Returns ErrConnectionNotFound if connectionID has no active connection.
	Send(connectionID string, f Frame) error

	// Broadcast enqueues a Frame for every active connection in roomID.
	Broadcast(roomID string, f Frame) error

	// Kick forcefully closes the connection identified by connectionID.
	// Kick always bypasses the resume window — the session is destroyed
	// immediately without entering the suspended state.
	// Returns ErrConnectionNotFound if connectionID has no active connection.
	Kick(connectionID string) error

	// GetConnections returns a snapshot of all registered connections in roomID.
	// This includes suspended sessions (within the resume window) that have no
	// active WebSocket transport.
	GetConnections(roomID string) []Connection

	// Close gracefully shuts down the server, terminating all connections.
	Close()
}

Server is the public interface for the WebSocket server. Callers depend on this interface rather than the concrete implementation.

func NewServer

func NewServer(connect ConnectFunc, options ...ServerOption) Server

NewServer creates and starts a Server. connect must not be nil.

type ServerOption

type ServerOption func(*serverConfig) //nolint:revive

ServerOption configures a Server.

func WithCheckOrigin

func WithCheckOrigin(fn func(r *http.Request) bool) ServerOption

WithCheckOrigin sets the origin validation function for WebSocket upgrades. Defaults to accepting all origins (permissive — tighten this in production). Panics if fn is nil; pass the default (accept-all) explicitly if desired:

server.WithCheckOrigin(func(*http.Request) bool { return true })

func WithCodec

func WithCodec(codec Codec) ServerOption

WithCodec replaces the default JSONCodec with the provided Codec. Panics if codec is nil.

func WithHeartbeat

func WithHeartbeat(pingPeriod, pongWait time.Duration) ServerOption

WithHeartbeat configures Ping/Pong heartbeat intervals. Defaults: pingPeriod=10 s, pongWait=30 s. pingPeriod must be in (0, 5m] and pongWait must be in (pingPeriod, 10m].

func WithLogger

func WithLogger(l *zap.Logger) ServerOption

WithLogger sets the zap logger used for internal diagnostics. Defaults to zap.NewNop() (silent). Pass the application logger to route wspulse transport logs through the same zap core (encoder, level, async writer). Panics if l is nil; pass zap.NewNop() explicitly if a no-op logger is desired.

func WithMaxMessageSize

func WithMaxMessageSize(n int64) ServerOption

WithMaxMessageSize sets the maximum size in bytes for inbound messages. n must be in [1, 67108864] (64 MiB).

func WithOnConnect

func WithOnConnect(fn func(Connection)) ServerOption

WithOnConnect registers a callback invoked after a connection is established and registered with the Server. The callback runs in a separate goroutine.

func WithOnDisconnect

func WithOnDisconnect(fn func(Connection, error)) ServerOption

WithOnDisconnect registers a callback invoked when a connection terminates. err is nil for a normal closure. The callback runs in a separate goroutine. When WithResumeWindow is configured, this fires only after the resume window expires without reconnection (not on every transport drop).

func WithOnMessage

func WithOnMessage(fn func(Connection, Frame)) ServerOption

WithOnMessage registers a callback invoked for every inbound Frame received from a connected client. The callback is called from the connection's readPump goroutine and must return quickly; use a goroutine for heavy work.

NOTE: fn is always called from a single readPump goroutine per Connection. On resume, the new readPump starts only after the old one has fully exited. Handlers should still be safe for concurrent use when application code accesses Connection from other goroutines (e.g. Send from an HTTP handler).

func WithResumeWindow

func WithResumeWindow(seconds int) ServerOption

WithResumeWindow configures the session resumption window. When a transport drops, the session is suspended for seconds before firing OnDisconnect. If the same connectionID reconnects within that period, the session resumes transparently. seconds is an integer number of seconds; passing 30 means a 30-second window. Valid range: 0 (disabled) … 180 (3 min). Default is 0 (OnDisconnect fires immediately).

func WithSendBufferSize

func WithSendBufferSize(n int) ServerOption

WithSendBufferSize sets the per-connection outbound channel capacity (number of frames). n must be in [1, 4096].

func WithWriteWait

func WithWriteWait(d time.Duration) ServerOption

WithWriteWait sets the deadline for a single write operation on a connection. d must be in (0, 30s].

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL