Documentation
¶
Index ¶
- Constants
- Variables
- type Codec
- type ConnectFunc
- type Connection
- type DisconnectReason
- type Frame
- type MetricsCollector
- type NoopCollector
- func (NoopCollector) ConnectionClosed(_, _ string, _ time.Duration, _ DisconnectReason)
- func (NoopCollector) ConnectionOpened(_, _ string)
- func (NoopCollector) FrameDropped(_, _ string)
- func (NoopCollector) MessageBroadcast(_ string, _ int, _ int)
- func (NoopCollector) MessageReceived(_ string, _ int)
- func (NoopCollector) MessageSent(_, _ string, _ int)
- func (NoopCollector) PongTimeout(_, _ string)
- func (NoopCollector) ResumeAttempt(_, _ string)
- func (NoopCollector) RoomCreated(_ string)
- func (NoopCollector) RoomDestroyed(_ string)
- func (NoopCollector) SendBufferUtilization(_, _ string, _, _ int)
- type PanicError
- type Server
- type ServerOption
- func WithCheckOrigin(fn func(r *http.Request) bool) ServerOption
- func WithCodec(codec Codec) ServerOption
- func WithHeartbeat(pingPeriod, pongWait time.Duration) ServerOption
- func WithLogger(l *zap.Logger) ServerOption
- func WithMaxMessageSize(n int64) ServerOption
- func WithMetrics(collector MetricsCollector) ServerOption
- func WithOnConnect(fn func(Connection)) ServerOption
- func WithOnDisconnect(fn func(Connection, error)) ServerOption
- func WithOnMessage(fn func(Connection, Frame)) ServerOption
- func WithOnTransportDrop(fn func(Connection, error)) ServerOption
- func WithOnTransportRestore(fn func(Connection)) ServerOption
- func WithResumeWindow(d time.Duration) ServerOption
- func WithSendBufferSize(n int) ServerOption
- func WithUpgraderBufferSize(readSize, writeSize int) ServerOption
- func WithWriteWait(d time.Duration) ServerOption
Constants ¶
const ( TextMessage = core.TextMessage BinaryMessage = core.BinaryMessage )
WebSocket message type constants.
Variables ¶
var ( // ErrConnectionNotFound is returned by Server.Send and Server.Kick // when connectionID has no active connection. ErrConnectionNotFound = errors.New("wspulse: connection not found") // ErrDuplicateConnectionID is passed to the OnDisconnect callback when // an existing connection is kicked because a new connection registered // with the same connectionID. ErrDuplicateConnectionID = errors.New("wspulse: kicked: duplicate connection ID") // ErrServerClosed is returned by Server.Broadcast (and potentially // other methods) when the Server has already been shut down via Close(). ErrServerClosed = errors.New("wspulse: server is closed") )
Server-only sentinel errors. Shared errors (ErrConnectionClosed, ErrSendBufferFull) live in github.com/wspulse/core and are re-exported in types.go for convenience.
var ( ErrConnectionClosed = core.ErrConnectionClosed ErrSendBufferFull = core.ErrSendBufferFull )
Re-exported sentinel errors from github.com/wspulse/core.
var JSONCodec = core.JSONCodec
JSONCodec is the default Codec. Frames are encoded as JSON text frames.
Functions ¶
This section is empty.
Types ¶
type ConnectFunc ¶
ConnectFunc authenticates an incoming HTTP upgrade request and provides the roomID and connectionID for the new connection. Returning a non-nil error rejects the upgrade with HTTP 401. If connectionID is empty the Server assigns a random UUID so that every connection has a unique, non-empty ID. Use a non-empty connectionID when the application needs deterministic IDs (e.g. for Server.Send and Server.Kick).
On session resumption (reconnect of a suspended session within the resume window), the roomID returned by ConnectFunc is ignored — the session retains its original room assignment from the initial connection. For new sessions (including when a duplicate connectionID replaces an existing connected session), the roomID from ConnectFunc determines the room.
type Connection ¶
type Connection interface {
// ID returns the unique connection identifier provided by ConnectFunc.
ID() string
// RoomID returns the room this connection belongs to, as provided by ConnectFunc.
RoomID() string
// Send enqueues f for delivery to the remote peer.
// Returns ErrConnectionClosed or ErrSendBufferFull on failure.
Send(f Frame) error
// Close initiates a graceful shutdown of the session.
Close() error
// Done returns a channel that is closed when the session is terminated.
Done() <-chan struct{}
}
Connection represents a logical WebSocket session managed by the Server. The underlying physical WebSocket may be transparently swapped on reconnect when WithResumeWindow is configured. All exported methods are safe to call concurrently.
type DisconnectReason ¶ added in v0.5.0
type DisconnectReason string
DisconnectReason describes why a connection was closed. Used as a label-friendly parameter in ConnectionClosed to let metrics backends (Prometheus, OTel, etc.) distinguish disconnect causes.
const ( // DisconnectNormal indicates the connection was closed normally: // the transport died (no resume configured), or the application // called Connection.Close() on a suspended session. DisconnectNormal DisconnectReason = "normal" // DisconnectKick indicates the connection was terminated by // an explicit Server.Kick() call. DisconnectKick DisconnectReason = "kick" // DisconnectGraceExpired indicates the resume window elapsed // without the client reconnecting. DisconnectGraceExpired DisconnectReason = "grace_expired" // DisconnectServerClose indicates the connection was terminated // because Server.Close() shut down the server. DisconnectServerClose DisconnectReason = "server_close" // DisconnectDuplicate indicates the connection was replaced by // a new connection with the same connectionID. DisconnectDuplicate DisconnectReason = "duplicate" )
type MetricsCollector ¶ added in v0.5.0
type MetricsCollector interface {
// ConnectionOpened is called when a new session is created and registered.
ConnectionOpened(roomID, connectionID string)
// ConnectionClosed is called when a session is terminated. duration is the
// total logical session lifetime from creation to destruction, including
// any time spent in the suspended state. reason indicates why the session
// was closed (see DisconnectReason constants).
ConnectionClosed(roomID, connectionID string, duration time.Duration, reason DisconnectReason)
// ResumeAttempt is called when a suspended session is successfully resumed
// by a reconnecting client. Only fires on successful resume — a reconnect
// after grace expiry creates a new session (ConnectionOpened) and is not
// tracked as a failed resume.
ResumeAttempt(roomID, connectionID string)
// RoomCreated is called when the first connection joins a room,
// causing the room to be allocated.
RoomCreated(roomID string)
// RoomDestroyed is called when the last connection leaves a room,
// causing the room to be deallocated.
RoomDestroyed(roomID string)
// MessageReceived is called in the readPump when an inbound WebSocket
// message is read, before decoding. sizeBytes is the raw wire size.
MessageReceived(roomID string, sizeBytes int)
// MessageBroadcast is called after a broadcast frame is fanned out to
// all sessions in a room. sizeBytes is the pre-encoded frame size;
// fanOut is the number of recipient sessions (including suspended ones).
MessageBroadcast(roomID string, sizeBytes int, fanOut int)
// MessageSent is called in the writePump after a frame is successfully
// written to the WebSocket transport. sizeBytes is the encoded frame size.
MessageSent(roomID, connectionID string, sizeBytes int)
// FrameDropped is called whenever a frame is discarded due to send buffer
// backpressure. This covers drops from Send (buffer full), Broadcast
// (drop-oldest), and resume drain (buffer full during replay).
// In the drop-oldest path, two FrameDropped events may fire: one for the
// oldest frame evicted and one for the new frame if it still cannot be
// enqueued — both represent real frame loss.
FrameDropped(roomID, connectionID string)
// SendBufferUtilization is called in the writePump after every successful
// write. used and capacity report the current send channel occupancy.
//
// This method is called once per message write. For high-throughput
// connections (e.g. 10k msg/s), expect the same call rate per connection.
// Implementations should apply sampling, batching, or throttling as needed.
SendBufferUtilization(roomID, connectionID string, used, capacity int)
// PongTimeout is called in the readPump when a read deadline expires,
// indicating the remote peer failed to respond to a Ping in time.
PongTimeout(roomID, connectionID string)
}
MetricsCollector defines instrumentation hooks for wspulse server. Each method corresponds to a single lifecycle or throughput event.
Implementations must be safe for concurrent use. Methods are called from the hub goroutine, readPump goroutines, and writePump goroutines.
All methods are fire-and-forget: they do not return values. If the underlying metrics backend encounters an error, the implementation should handle it internally (e.g. log and skip).
Hooks are invoked synchronously on hot paths; implementations must return quickly and must not panic. Implementations must not call back into the same Server synchronously (e.g. Kick, Send, Broadcast) as this can deadlock the hub event loop.
For forward-compatible custom implementations, embed NoopCollector:
type MyCollector struct {
wspulse.NoopCollector // provides no-op defaults for future methods
}
func (c *MyCollector) ConnectionOpened(roomID, connectionID string) {
// custom implementation
}
This ensures new methods added to MetricsCollector in future versions are automatically satisfied by the embedded no-op defaults.
type NoopCollector ¶ added in v0.5.0
type NoopCollector struct{}
NoopCollector is the default MetricsCollector that discards all events. All methods are value-receiver no-ops with minimal overhead.
Embed NoopCollector in custom implementations for forward-compatible additions to the MetricsCollector interface:
type MyCollector struct {
wspulse.NoopCollector
}
func (NoopCollector) ConnectionClosed ¶ added in v0.5.0
func (NoopCollector) ConnectionClosed(_, _ string, _ time.Duration, _ DisconnectReason)
ConnectionClosed is a no-op. See MetricsCollector.ConnectionClosed.
func (NoopCollector) ConnectionOpened ¶ added in v0.5.0
func (NoopCollector) ConnectionOpened(_, _ string)
ConnectionOpened is a no-op. See MetricsCollector.ConnectionOpened.
func (NoopCollector) FrameDropped ¶ added in v0.5.0
func (NoopCollector) FrameDropped(_, _ string)
FrameDropped is a no-op. See MetricsCollector.FrameDropped.
func (NoopCollector) MessageBroadcast ¶ added in v0.5.0
func (NoopCollector) MessageBroadcast(_ string, _ int, _ int)
MessageBroadcast is a no-op. See MetricsCollector.MessageBroadcast.
func (NoopCollector) MessageReceived ¶ added in v0.5.0
func (NoopCollector) MessageReceived(_ string, _ int)
MessageReceived is a no-op. See MetricsCollector.MessageReceived.
func (NoopCollector) MessageSent ¶ added in v0.5.0
func (NoopCollector) MessageSent(_, _ string, _ int)
MessageSent is a no-op. See MetricsCollector.MessageSent.
func (NoopCollector) PongTimeout ¶ added in v0.5.0
func (NoopCollector) PongTimeout(_, _ string)
PongTimeout is a no-op. See MetricsCollector.PongTimeout.
func (NoopCollector) ResumeAttempt ¶ added in v0.5.0
func (NoopCollector) ResumeAttempt(_, _ string)
ResumeAttempt is a no-op. See MetricsCollector.ResumeAttempt.
func (NoopCollector) RoomCreated ¶ added in v0.5.0
func (NoopCollector) RoomCreated(_ string)
RoomCreated is a no-op. See MetricsCollector.RoomCreated.
func (NoopCollector) RoomDestroyed ¶ added in v0.5.0
func (NoopCollector) RoomDestroyed(_ string)
RoomDestroyed is a no-op. See MetricsCollector.RoomDestroyed.
func (NoopCollector) SendBufferUtilization ¶ added in v0.5.0
func (NoopCollector) SendBufferUtilization(_, _ string, _, _ int)
SendBufferUtilization is a no-op. See MetricsCollector.SendBufferUtilization.
type PanicError ¶ added in v0.5.0
type PanicError struct {
// Value is the value passed to panic().
Value any
// Stack is the goroutine stack trace captured at the point of recovery.
Stack []byte
}
PanicError wraps a panic recovered from an OnMessage callback. When an OnMessage handler panics, the readPump recovers the panic and terminates the connection (or suspends it when session resumption is enabled).
Delivery:
- When resumption is disabled (resumeWindow == 0, the default), the connection is closed immediately and the PanicError is delivered via the OnDisconnect callback's error parameter. OnTransportDrop is not invoked in this mode.
- When resumption is enabled (resumeWindow > 0), the transport error (including PanicError) is reported to the OnTransportDrop callback so applications can distinguish transport failures from handler panics using errors.As. OnDisconnect may later be invoked on grace expiry with a nil error; callers must not rely on PanicError always being delivered via OnDisconnect.
Behaviour: a panic in OnMessage always kills the connection. This is intentional — corrupted handler state should not process further messages.
func (*PanicError) Error ¶ added in v0.5.0
func (e *PanicError) Error() string
type Server ¶
type Server interface {
http.Handler
// Send enqueues a Frame for the connection identified by connectionID.
// Returns ErrConnectionNotFound if connectionID has no active connection.
Send(connectionID string, f Frame) error
// Broadcast enqueues a Frame for every active connection in roomID.
Broadcast(roomID string, f Frame) error
// Kick forcefully closes the connection identified by connectionID.
// Kick always bypasses the resume window — the session is destroyed
// immediately without entering the suspended state.
// Returns ErrConnectionNotFound if connectionID has no active connection.
Kick(connectionID string) error
// GetConnections returns a snapshot of all registered connections in roomID.
// This includes suspended sessions (within the resume window) that have no
// active WebSocket transport.
GetConnections(roomID string) []Connection
// Close gracefully shuts down the server, terminating all connections.
Close()
}
Server is the public interface for the WebSocket server. Callers depend on this interface rather than the concrete implementation.
func NewServer ¶
func NewServer(connect ConnectFunc, options ...ServerOption) Server
NewServer creates and starts a Server. connect must not be nil.
type ServerOption ¶
type ServerOption func(*serverConfig) //nolint:revive
ServerOption configures a Server.
func WithCheckOrigin ¶
func WithCheckOrigin(fn func(r *http.Request) bool) ServerOption
WithCheckOrigin sets the origin validation function for WebSocket upgrades. Defaults to accepting all origins (permissive — tighten this in production). Panics if fn is nil; pass the default (accept-all) explicitly if desired:
server.WithCheckOrigin(func(*http.Request) bool { return true })
func WithCodec ¶
func WithCodec(codec Codec) ServerOption
WithCodec replaces the default JSONCodec with the provided Codec. Panics if codec is nil.
func WithHeartbeat ¶
func WithHeartbeat(pingPeriod, pongWait time.Duration) ServerOption
WithHeartbeat configures Ping/Pong heartbeat intervals. Defaults: pingPeriod=10 s, pongWait=30 s. pingPeriod must be in (0, 5m] and pongWait must be in (pingPeriod, 10m].
func WithLogger ¶
func WithLogger(l *zap.Logger) ServerOption
WithLogger sets the zap logger used for internal diagnostics. Defaults to zap.NewNop() (silent). Pass the application logger to route wspulse transport logs through the same zap core (encoder, level, async writer). Panics if l is nil; pass zap.NewNop() explicitly if a no-op logger is desired.
func WithMaxMessageSize ¶
func WithMaxMessageSize(n int64) ServerOption
WithMaxMessageSize sets the maximum size in bytes for inbound messages. n must be in [1, 67108864] (64 MiB).
func WithMetrics ¶ added in v0.5.0
func WithMetrics(collector MetricsCollector) ServerOption
WithMetrics configures the MetricsCollector used by the Server. Defaults to NoopCollector{} if not set. Panics if collector is nil.
func WithOnConnect ¶
func WithOnConnect(fn func(Connection)) ServerOption
WithOnConnect registers a callback invoked after a connection is established and registered with the Server. The callback runs in a separate goroutine.
func WithOnDisconnect ¶
func WithOnDisconnect(fn func(Connection, error)) ServerOption
WithOnDisconnect registers a callback invoked when a connection terminates. err is nil for a normal closure. The callback runs in a separate goroutine. When WithResumeWindow is configured, this fires only after the resume window expires without reconnection (not on every transport drop).
func WithOnMessage ¶
func WithOnMessage(fn func(Connection, Frame)) ServerOption
WithOnMessage registers a callback invoked for every inbound Frame received from a connected client. The callback is called from the connection's readPump goroutine and must return quickly; use a goroutine for heavy work.
NOTE: fn is always called from a single readPump goroutine per Connection. On resume, the new readPump starts only after the old one has fully exited. Handlers should still be safe for concurrent use when application code accesses Connection from other goroutines (e.g. Send from an HTTP handler).
func WithOnTransportDrop ¶ added in v0.4.0
func WithOnTransportDrop(fn func(Connection, error)) ServerOption
WithOnTransportDrop registers a callback invoked when a connection's underlying WebSocket transport dies (network drop, read timeout, or peer close) and the session enters the suspended state because resumeWindow > 0.
The error parameter carries the cause of the transport failure when available (e.g. an i/o timeout from a missed Pong, or a close frame from the peer). For a normal or expected close, err may be nil, so callback implementations must not assume it is always non-nil.
This callback does NOT fire when:
- resumeWindow is 0 (OnDisconnect fires directly instead).
- the connection is removed via Kick() or Connection.Close() (OnDisconnect fires directly instead).
The callback runs in a separate goroutine; it must be safe for concurrent use.
func WithOnTransportRestore ¶ added in v0.4.0
func WithOnTransportRestore(fn func(Connection)) ServerOption
WithOnTransportRestore registers a callback invoked when a suspended session resumes after a client reconnects with the same connectionID within the resume window.
When this fires, OnConnect and OnDisconnect are NOT called — the session continues as if the transport had never dropped. Buffered frames are replayed to the new transport before the callback is invoked.
This callback does NOT fire when:
- resumeWindow is 0 (session resumption is disabled).
- the resume window expires before the client reconnects (OnDisconnect fires instead).
The callback runs in a separate goroutine; it must be safe for concurrent use.
func WithResumeWindow ¶
func WithResumeWindow(d time.Duration) ServerOption
WithResumeWindow configures the session resumption window. When a transport drops, the session is suspended for d before firing OnDisconnect. If the same connectionID reconnects within that period, the session resumes transparently. Valid range: 0 (disabled) … no upper limit. Default is 0 (OnDisconnect fires immediately).
func WithSendBufferSize ¶
func WithSendBufferSize(n int) ServerOption
WithSendBufferSize sets the per-connection outbound channel capacity (number of frames). n must be in [1, 4096].
func WithUpgraderBufferSize ¶ added in v0.5.0
func WithUpgraderBufferSize(readSize, writeSize int) ServerOption
WithUpgraderBufferSize sets the I/O buffer sizes for the WebSocket upgrader. Larger buffers reduce per-write allocations for applications that send large messages. Default: 1024 bytes for both read and write. Panics if either size is not positive.
func WithWriteWait ¶
func WithWriteWait(d time.Duration) ServerOption
WithWriteWait sets the deadline for a single write operation on a connection. d must be in (0, 30s].