ws

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: MIT Imports: 5 Imported by: 0

README

sup-ws

Go Reference Test License

sup-ws is a high-reliability WebSocket client implementation for Go, built on top of the sup actor library. It provides a thread-safe, supervised, and observable way to interact with WebSocket endpoints.

Why this exists?

WebSocket connections are long-lived and inherently stateful. If two goroutines try to write to the same connection simultaneously, the result is a data race and a broken stream.

This library solves that by treating the WebSocket connection as an Actor. All outbound messages are queued in a mailbox and written sequentially by the actor loop, ensuring writes are perfectly serialized. Inbound messages are delivered to a handler function as they arrive.

Features

  • Actor-Based Concurrency: Thread-safe outbound writes via Send. Multiple goroutines can call Send safely; the actor serializes all writes.
  • Supervised Lifecycle: Designed to run under a sup.Supervisor. Any connection failure causes the actor to return a fatal error, letting the supervisor handle reconnection.
  • Binary and Text Support: Exposes the WebSocket message type (MessageText / MessageBinary) alongside the payload.
  • Idle Timeout: Configurable timeout that triggers a reconnect if no message is received within the window.
  • Keepalive Pings: Configurable ping interval to detect silent connection drops before the idle timeout fires.

Quick start

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/webermarci/sup"
	ws "github.com/webermarci/sup-ws"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	handler := func(message ws.Message) {
		fmt.Println(message)
	}

	actor := ws.NewActor("actor", "wss://example.com/stream", handler,
		ws.WithTimeout(30*time.Second),
		ws.WithPingInterval(15*time.Second),
	)

	supervisor := sup.NewSupervisor(
		sup.WithActor(actor),
		sup.WithPolicy(sup.Permanent),
		sup.WithRestartDelay(time.Second),
		sup.WithRestartLimit(5, 10*time.Second),
	)

	go supervisor.Run(ctx)

	_ = client.Send(ws.MessageText, []byte(`{"action":"subscribe","channel":"updates"}`))
	_ = client.Send(ws.MessageBinary, []byte{0x01, 0x02, 0x03})

	supervisor.Wait()
}

Using it with pubsub

package main

import (
  "context"
  "fmt"
  "time"

  "github.com/webermarci/pubsub"
  "github.com/webermarci/sup"
  ws "github.com/webermarci/sup-ws"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	pubsub := pubsub.New[string, ws.Message](10)
  
	handler := func(message ws.Message) {
		pubsub.Publish("ws", message)
	}
	
	actor := ws.NewActor("actor", "wss://example.com/stream", handler,
		ws.WithTimeout(30*time.Second),
		ws.WithPingInterval(15*time.Second),
	)
	
	supervisor := sup.NewSupervisor(
		sup.WithActor(actor),
		sup.WithPolicy(sup.Permanent),
		sup.WithRestartDelay(time.Second),
		sup.WithRestartLimit(5, 10 * time.Second),
	)
	
	go supervisor.Run(ctx)
	
	messages := pubsub.Subscribe(ctx, "ws")
	
	go func() {
		for message := range messages {
			fmt.Println(message)
		}
	}()
	
	supervisor.Wait()
	pubsub.Close()
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Actor

type Actor struct {
	*sup.BaseActor
	// contains filtered or unexported fields
}

Actor connects to a WebSocket endpoint, delivers inbound messages to a handler, and exposes a thread-safe Send method for outbound messages. It is designed to run under a sup.Supervisor, which handles reconnection on failure.

func NewActor

func NewActor(name string, url string, handler func(Message), opts ...ActorOption) *Actor

NewActor creates a new Actor with the specified URL, inbound message handler, and optional configuration options.

func (*Actor) Run

func (a *Actor) Run(ctx context.Context) error

Run establishes the WebSocket connection and drives concurrent concerns: reading inbound frames, writing outbound frames, and maintaining keep-alive pings. Any failure causes Run to return an error, triggering a supervisor restart.

func (*Actor) Send

func (a *Actor) Send(msgType MessageType, data []byte) error

Send enqueues an outbound message to be written by the actor's run loop. It is safe to call from any goroutine.

type ActorOption

type ActorOption func(*Actor)

ActorOption defines a function type for configuring the Actor.

func WithHTTPClient

func WithHTTPClient(c *http.Client) ActorOption

WithHTTPClient allows providing a custom http.Client for the WebSocket dial.

func WithMailboxSize

func WithMailboxSize(size int) ActorOption

WithMailboxSize sets the size of the actor's outbound mailbox. Default is 10.

func WithOnConnect added in v0.0.3

func WithOnConnect(handler func(url string)) ActorOption

WithOnConnect sets a callback that is invoked with the URL whenever a connection is successfully established.

func WithOnError added in v0.0.3

func WithOnError(handler func(err error)) ActorOption

WithOnError sets a callback that is invoked with any error that causes the actor to fail and trigger a supervisor restart.

func WithOnMessage added in v0.0.3

func WithOnMessage(handler func(msg Message, duration time.Duration)) ActorOption

WithOnMessage sets a callback that is invoked with each received message and the duration since the last message was processed.

func WithPingInterval

func WithPingInterval(d time.Duration) ActorOption

WithPingInterval sets how often the actor sends pings to keep the connection alive and detect silent drops. Default is 15 seconds.

type Message

type Message struct {
	Type MessageType
	Data []byte
}

Message represents a single WebSocket message with its type and payload.

type MessageType

type MessageType int

MessageType represents the type of a WebSocket message frame.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL