http

package
v0.0.26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2026 License: Apache-2.0 Imports: 31 Imported by: 6

Documentation

Overview

Package http provides utilities for HTTP request handling and production-grade WebSocket connections.

This package contains two main areas of functionality:

  1. HTTP utilities for simplified request/response handling
  2. WebSocket abstractions built on Gorilla WebSocket for real-time applications

WebSocket Framework

The WebSocket framework provides a production-ready abstraction over Gorilla WebSocket with automatic connection management, heartbeat detection, and lifecycle hooks.

## Key Features

  • Production-grade connection management with automatic heartbeat detection
  • Lifecycle hooks for connection start, close, timeout, and error handling
  • Thread-safe message broadcasting to multiple clients
  • Automatic ping-pong mechanism to prevent connection timeouts
  • Type-safe message handling with Go generics
  • Pluggable Codec system for flexible message encoding (JSON, Protobuf)
  • Generic BaseConn[I, O] for typed input/output messages
  • Built-in JSON message support with JSONConn
  • Configurable timeouts and intervals for different deployment scenarios

## Quick Start

The simplest way to create a WebSocket endpoint is using the built-in JSONConn:

type EchoConn struct {
	gohttp.JSONConn
}

func (e *EchoConn) HandleMessage(msg any) error {
	// Echo the message back to the client
	e.Writer.Send(conc.Message[any]{Value: msg})
	return nil
}

type EchoHandler struct{}

func (h *EchoHandler) Validate(w http.ResponseWriter, r *http.Request) (*EchoConn, bool) {
	return &EchoConn{}, true // Accept all connections
}

// Register with HTTP router
router.HandleFunc("/echo", gohttp.WSServe(&EchoHandler{}, nil))

## Architecture

The framework uses three main interfaces:

### WSConn[I any] Represents a WebSocket connection that can handle typed messages:

type WSConn[I any] interface {
	BiDirStreamConn[I]
	ReadMessage(w *websocket.Conn) (I, error)
	OnStart(conn *websocket.Conn) error
}

### WSHandler[I any, S WSConn[I]] Validates HTTP requests and creates WebSocket connections:

type WSHandler[I any, S WSConn[I]] interface {
	Validate(w http.ResponseWriter, r *http.Request) (S, bool)
}

### BiDirStreamConn[I any] Provides lifecycle and message handling methods:

type BiDirStreamConn[I any] interface {
	SendPing() error
	Name() string
	ConnId() string
	HandleMessage(msg I) error
	OnError(err error) error
	OnClose()
	OnTimeout() bool
}

## Advanced Usage

### Multi-user Chat Server

type ChatServer struct {
	clients map[string]*ChatConn
	rooms   map[string]*Room
	mu      sync.RWMutex
}

type ChatConn struct {
	gohttp.JSONConn
	server   *ChatServer
	username string
	roomName string
}

func (c *ChatConn) OnStart(conn *websocket.Conn) error {
	if err := c.JSONConn.OnStart(conn); err != nil {
		return err
	}

	// Register with server
	c.server.mu.Lock()
	c.server.clients[c.ConnId()] = c
	c.server.mu.Unlock()

	return nil
}

func (c *ChatConn) HandleMessage(msg any) error {
	msgMap := msg.(map[string]any)

	switch msgMap["type"].(string) {
	case "chat":
		// Broadcast to all clients in room
		c.server.broadcastToRoom(c.roomName, msgMap)
	case "join_room":
		// Switch rooms
		c.joinRoom(msgMap["room"].(string))
	}

	return nil
}

### Authentication and Authorization

type SecureConn struct {
	gohttp.JSONConn
	userID string
	roles  []string
}

func (s *SecureConn) HandleMessage(msg any) error {
	msgMap := msg.(map[string]any)
	messageType := msgMap["type"].(string)

	if !s.hasPermission(messageType) {
		errorMsg := map[string]any{
			"type":  "error",
			"error": "Insufficient permissions",
		}
		s.Writer.Send(conc.Message[any]{Value: errorMsg})
		return nil
	}

	// Process authorized message
	return s.processMessage(msgMap)
}

type AuthHandler struct{}

func (h *AuthHandler) Validate(w http.ResponseWriter, r *http.Request) (*SecureConn, bool) {
	token := r.Header.Get("Authorization")

	// Validate JWT token
	claims, err := validateJWT(token)
	if err != nil {
		http.Error(w, "Invalid token", http.StatusUnauthorized)
		return nil, false
	}

	return &SecureConn{
		userID: claims["userID"].(string),
		roles:  claims["roles"].([]string),
	}, true
}

### Custom Ping-Pong with Latency Tracking

type MonitoredConn struct {
	gohttp.JSONConn
	lastPingTime time.Time
	pingCount    int64
}

func (m *MonitoredConn) SendPing() error {
	m.pingCount++
	m.lastPingTime = time.Now()

	pingMsg := map[string]any{
		"type":      "ping",
		"pingId":    m.pingCount,
		"timestamp": m.lastPingTime.Unix(),
	}

	m.Writer.Send(conc.Message[any]{Value: pingMsg})
	return nil
}

func (m *MonitoredConn) HandleMessage(msg any) error {
	msgMap := msg.(map[string]any)

	if msgMap["type"].(string) == "pong" {
		latency := time.Since(m.lastPingTime)
		log.Printf("Ping-pong latency: %v", latency)
	}

	return nil
}

## Configuration

### Production Configuration

config := &gohttp.WSConnConfig{
	BiDirStreamConfig: &gohttp.BiDirStreamConfig{
		PingPeriod: time.Second * 30,  // Send ping every 30 seconds
		PongPeriod: time.Second * 300, // Timeout after 5 minutes
	},
	Upgrader: websocket.Upgrader{
		ReadBufferSize:  4096,
		WriteBufferSize: 4096,
		CheckOrigin: func(r *http.Request) bool {
			// Implement proper origin checking
			return isValidOrigin(r.Header.Get("Origin"))
		},
	},
}

router.HandleFunc("/ws", gohttp.WSServe(handler, config))

### Development Configuration

config := gohttp.DefaultWSConnConfig()
config.PingPeriod = time.Second * 10  // Faster pings for development
config.PongPeriod = time.Second * 30  // Shorter timeout

## Frontend Integration

The framework is designed to work seamlessly with JavaScript WebSocket clients:

class WebSocketClient {
	constructor(url) {
		this.url = url;
		this.ws = null;
		this.pingInterval = null;
	}

	connect() {
		this.ws = new WebSocket(this.url);

		this.ws.onopen = () => {
			this.startPingInterval();
		};

		this.ws.onmessage = (event) => {
			const message = JSON.parse(event.data);
			this.handleMessage(message);
		};
	}

	handleMessage(message) {
		switch (message.type) {
			case 'ping':
				// Respond to server ping
				this.send({type: 'pong', pingId: message.pingId});
				break;
			default:
				this.onMessage(message);
		}
	}

	startPingInterval() {
		this.pingInterval = setInterval(() => {
			if (this.ws?.readyState === WebSocket.OPEN) {
				this.send({type: 'ping', timestamp: Date.now()});
			}
		}, 25000);
	}
}

## Error Handling and Resilience

The framework provides robust error handling:

type ResilientConn struct {
	gohttp.JSONConn
	errorCount int
	maxErrors  int
}

func (r *ResilientConn) OnError(err error) error {
	r.errorCount++
	log.Printf("WebSocket error #%d: %v", r.errorCount, err)

	if r.errorCount > r.maxErrors {
		return err // Close connection after too many errors
	}

	return nil // Continue with connection
}

func (r *ResilientConn) OnTimeout() bool {
	log.Printf("Connection timeout for %s", r.ConnId())
	return true // Close the connection
}

## Best Practices

  • Always call parent OnStart/OnClose when embedding JSONConn
  • Implement proper authentication in the Validate method
  • Use connection limits to prevent resource exhaustion
  • Handle errors gracefully without crashing the server
  • Clean up resources properly in OnClose methods
  • Use mutexes for thread-safe operations on shared state
  • Monitor connection health with metrics

## Common Patterns

### Hub Pattern for Broadcasting

type Hub struct {
	clients    map[*Client]bool
	broadcast  chan []byte
	register   chan *Client
	unregister chan *Client
}

func (h *Hub) run() {
	for {
		select {
		case client := <-h.register:
			h.clients[client] = true
		case client := <-h.unregister:
			delete(h.clients, client)
		case message := <-h.broadcast:
			for client := range h.clients {
				client.send <- message
			}
		}
	}
}

### Room-based Messaging

type Room struct {
	name    string
	clients map[string]*Client
	mu      sync.RWMutex
}

func (r *Room) Broadcast(message any, excludeId string) {
	r.mu.RLock()
	defer r.mu.RUnlock()

	for id, client := range r.clients {
		if id != excludeId {
			client.Send(message)
		}
	}
}

## Testing

The package includes comprehensive test utilities for WebSocket testing:

// Create test server
server := httptest.NewServer(router)
defer server.Close()

// Create WebSocket client
wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/endpoint"
conn, err := createTestClient(t, wsURL, nil)
if err != nil {
	t.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()

See the comprehensive test suite in ws2_test.go for complete examples of testing WebSocket applications including authentication, load testing, and error scenarios.

HTTP Utilities

The package also provides utilities for HTTP request/response handling:

  • JsonToQueryString: Convert maps to URL query strings
  • SendJsonResponse: Send JSON responses with proper error handling
  • ErrorToHttpCode: Convert Go errors to appropriate HTTP status codes
  • WSConnWriteMessage/WSConnWriteError: WebSocket message writing utilities
  • NormalizeWsUrl: Convert HTTP URLs to WebSocket URLs

Example HTTP utility usage:

func handleAPI(w http.ResponseWriter, r *http.Request) {
	data, err := processRequest(r)
	gohttp.SendJsonResponse(w, data, err)
}

This comprehensive framework enables building production-ready real-time applications with WebSocket communication, from simple echo servers to complex multi-user systems with authentication, rooms, and advanced connection management.

Example

Example demonstrating complete WebSocket workflow

// Create router and server
router := mux.NewRouter()

// Chat server setup
chatServer := &ChatServer{
	clients: make(map[string]*ChatConn),
	rooms:   make(map[string]*Room),
}

// Add handlers
router.HandleFunc("/echo", WSServe(&EchoHandler{}, nil))
router.HandleFunc("/chat", WSServe(&ChatHandler{server: chatServer}, nil))
router.HandleFunc("/secure", WSServe(&AuthHandler{}, nil))

// Custom configuration for production
config := &WSConnConfig{
	BiDirStreamConfig: &BiDirStreamConfig{
		PingPeriod: time.Second * 30,
		PongPeriod: time.Second * 300,
	},
	Upgrader: websocket.Upgrader{
		ReadBufferSize:  4096,
		WriteBufferSize: 4096,
		CheckOrigin: func(r *http.Request) bool {
			// Add proper origin checking in production
			return true
		},
	},
}

router.HandleFunc("/production", WSServe(&EchoHandler{}, config))

// In a real application, you would start the server:
// log.Fatal(http.ListenAndServe(":8080", router))

fmt.Println("WebSocket server configured with multiple endpoints:")
fmt.Println("- /echo: Simple echo server")
fmt.Println("- /chat: Multi-user chat with rooms")
fmt.Println("- /secure: Authenticated connections")
fmt.Println("- /production: Production-ready config")
Output:
WebSocket server configured with multiple endpoints:
- /echo: Simple echo server
- /chat: Multi-user chat with rooms
- /secure: Authenticated connections
- /production: Production-ready config

Index

Examples

Constants

View Source
const MaxErrorBodySize = 16 << 10 // 16 KB

MaxErrorBodySize is the maximum number of bytes read from an HTTP error response body. Prevents memory exhaustion from malicious or misconfigured servers returning oversized error payloads. Shared by DoWithAuthRetry and any other error-body readers in this package.

Variables

View Source
var DefaultHttpClient *http.Client
View Source
var HighQPSHttpClient *http.Client
View Source
var LowQPSHttpClient *http.Client
View Source
var MediumQPSHttpClient *http.Client

Functions

func CORS deprecated

func CORS(next http.Handler) http.Handler

CORS is a convenience wrapper for local development that allows all origins. For production use, prefer middleware.CORS with an OriginChecker.

Deprecated: Use middleware.CORS(nil) for allow-all, or middleware.CORS(middleware.NewOriginChecker(origins)) for production.

func Call

func Call(req *http.Request, client *http.Client) (response any, err error)

Makes a http with the tiven request and the http client. This is a wrapper over the standard library caller that creates a Client (if not provided), performs the request reads the entire body adn optionally converts the payload to an appropriate type based on the response' Content-Type header (for now only application/json is supported.

func DetectBatch added in v0.0.20

func DetectBatch(body []byte) bool

DetectBatch returns true if body is a JSON array, indicating a JSON-RPC 2.0 batch request. Skips leading whitespace before checking the first non-whitespace byte for '['.

Per JSON-RPC 2.0 spec (Section 6): "To send several Request objects at the same time, the Client MAY send an Array filled with Request objects."

func DoWithAuthRetry added in v0.0.18

func DoWithAuthRetry(
	cfg *AuthRetryConfig,
	buildReq func() (*http.Request, error),
	do func(*http.Request) (*http.Response, error),
) (*http.Response, error)

DoWithAuthRetry executes an HTTP request with automatic retry on 401/403.

Retry budget: max 1 retry for 401 (token refresh), max 1 retry for 403 (scope step-up). Total max 2 retries per request.

If cfg is nil, no auth handling is performed — 401/403 responses are returned as AuthRetryError.

buildReq must create a new *http.Request each call (the body may be consumed on the previous attempt). do is typically httpClient.Do.

func ErrorToHttpCode

func ErrorToHttpCode(err error) int

ErrorToHttpCode converts a Go error to an appropriate HTTP status code. If err is nil, returns http.StatusOK (200). If err contains a gRPC status, maps it to the corresponding HTTP code:

  • codes.PermissionDenied → 403 Forbidden
  • codes.NotFound → 404 Not Found
  • codes.AlreadyExists → 409 Conflict
  • codes.InvalidArgument → 400 Bad Request
  • Other errors → 500 Internal Server Error

func GenerateSessionID added in v0.0.17

func GenerateSessionID() string

GenerateSessionID returns a cryptographically random 32-character hex string (128 bits of entropy) suitable for HTTP session identifiers.

Uses crypto/rand for security — the output is unpredictable and safe for use as session tokens, CSRF tokens, or any context where an attacker should not be able to guess the value.

Panics if the system's cryptographic random number generator fails, which indicates a serious system-level problem.

func HTTPErrorCode

func HTTPErrorCode(err error) int

func IsHTTPTransient added in v0.0.17

func IsHTTPTransient(statusCode int) bool

IsHTTPTransient returns true if the HTTP status code indicates a transient server error (5xx) that may succeed on retry. Client errors (4xx) and successes (2xx) return false.

Use this to decide whether to retry a failed HTTP request:

  • 500, 502, 503, 504: transient (server overload, gateway timeout)
  • 400, 401, 403, 404: terminal (client error, won't change on retry)

func JsonGet

func JsonGet(url string, onReq func(req *http.Request)) (any, *http.Response, error)

A simple wrapper for performing JSON Get requests. The url is the full url once all query params have been added. The onReq callback allows customization of the http requests before it is sent.

func JsonToQueryString

func JsonToQueryString(json map[string]any) string

JsonToQueryString converts a map to a URL query string. Keys are sorted alphabetically for deterministic output. Values are converted to strings using fmt.Sprintf("%v", value).

Example:

params := map[string]any{"name": "John", "age": 30}
qs := JsonToQueryString(params) // "age=30&name=John"
Example
input := map[string]any{"a": 1, "b": 2}
queryStr := JsonToQueryString(input)
fmt.Println(queryStr)
Output:
a=1&b=2

func ListenAndServeGraceful added in v0.0.9

func ListenAndServeGraceful(srv *http.Server, opts ...GracefulOption) error

ListenAndServeGraceful starts the HTTP server and handles graceful shutdown. It blocks until a shutdown signal is received (OS signal or context cancellation), then drains active connections within the configured timeout.

Shutdown lifecycle:

  1. Start srv.ListenAndServe() in a goroutine
  2. Block until: OS signal, parent context cancellation, or server error
  3. Call OnShutdown callbacks (e.g. SSEHub.CloseAll()) — connections still open
  4. Call srv.Shutdown() with drain timeout — waits for in-flight handlers
  5. Return nil on clean shutdown, or the error

Example:

srv := &http.Server{Addr: ":8080", Handler: mux}
middleware.ApplyDefaults(srv)
srv.WriteTimeout = 0 // required for SSE

hub := gohttp.NewSSEHub[any]()
err := gohttp.ListenAndServeGraceful(srv,
    gohttp.WithDrainTimeout(10*time.Second),
    gohttp.WithOnShutdown(hub.CloseAll),
)
Example

ExampleListenAndServeGraceful demonstrates graceful shutdown with signal handling and SSEHub integration. OnShutdown callbacks run before drain, so they can send goodbye events while connections are still open.

package main

import (
	"log"
	"net/http"
	"time"

	gohttp "github.com/panyam/servicekit/http"
)

func main() {
	hub := gohttp.NewSSEHub[any]()

	srv := &http.Server{
		Addr:    ":8080",
		Handler: http.DefaultServeMux,
	}

	// Blocks until SIGTERM/SIGINT, then:
	// 1. Calls hub.CloseAll() (closes SSE connections)
	// 2. Drains HTTP requests for up to 10s
	// 3. Returns nil
	err := gohttp.ListenAndServeGraceful(srv,
		gohttp.WithDrainTimeout(10*time.Second),
		gohttp.WithOnShutdown(hub.CloseAll),
	)
	if err != nil {
		log.Fatal(err)
	}
}

func MakeUrl

func MakeUrl(host, path string, args string) (url string)

Creates a URL on a host, path and with optional query parameters

func NewBytesRequest

func NewBytesRequest(method string, endpoint string, body []byte) (req *http.Request, err error)

Wraps the NewRequest helper to create request to set the body from a byte array.

func NewJsonRequest

func NewJsonRequest(method string, endpoint string, body map[string]any) (req *http.Request, err error)

Wraps the NewRequest helper to create a request with the payload marshalled as JSON.

func NewRequest

func NewRequest(method string, endpoint string, bodyReader io.Reader) (req *http.Request, err error)

Creates a new http request with the given method, endpoint and a bodyready that provides the content the request body.

func NormalizeWsUrl

func NormalizeWsUrl(httpOrWsUrl string) string

NormalizeWsUrl converts an HTTP(S) URL to its WebSocket equivalent. It performs the following transformations:

  • Removes trailing slashes
  • Converts "http:" to "ws:"
  • Converts "https:" to "wss:"

URLs that are already WebSocket URLs (ws: or wss:) are returned unchanged after removing any trailing slash.

Example:

NormalizeWsUrl("https://example.com/ws/") // "wss://example.com/ws"
Example
fmt.Println(NormalizeWsUrl("http://google.com"))
fmt.Println(NormalizeWsUrl("https://github.com"))
Output:
ws://google.com
wss://github.com

func ParseAcceptTypes added in v0.0.17

func ParseAcceptTypes(accept string) (acceptsJSON, acceptsSSE bool)

ParseAcceptTypes parses an HTTP Accept header and returns whether application/json and text/event-stream are present.

Handles quality values (;q=N) and whitespace per RFC 7231 §5.3.2. The wildcard type */* matches both JSON and SSE.

See: https://www.rfc-editor.org/rfc/rfc7231#section-5.3.2

func ParseWWWAuthenticate added in v0.0.18

func ParseWWWAuthenticate(header string) (resourceMetadata string, scopes []string, err error)

ParseWWWAuthenticate extracts the resource_metadata URL and scopes from a WWW-Authenticate: Bearer header value per RFC 6750 §3.

Used by HTTP clients to:

  • Discover the Protected Resource Metadata (PRM) endpoint from a 401 response
  • Parse required scopes from a 403 insufficient_scope response

Handles both quoted ("value") and unquoted (value) parameter formats.

See: https://www.rfc-editor.org/rfc/rfc6750#section-3

func ReadFrame added in v0.0.17

func ReadFrame(r *bufio.Reader) ([]byte, error)

ReadFrame reads a Content-Length framed message from a buffered reader. It parses HTTP-style headers until an empty line (\r\n\r\n), extracts the Content-Length value, and reads exactly that many bytes as the message body.

Headers other than Content-Length are accepted but ignored, matching the LSP spec which allows Content-Type as an optional header.

Returns io.EOF if the reader is empty (clean stream shutdown).

See: https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#headerPart

func ResolveURL added in v0.0.17

func ResolveURL(baseURL, ref string) (string, error)

ResolveURL resolves a URL reference against a base URL per RFC 3986 §5.2.2.

This handles three cases:

  • Absolute URL ("http://host/path"): returned unchanged
  • Absolute path ("/path?q=1"): inherits scheme+host from base
  • Relative path ("path?q=1"): inherits scheme+host+directory from base

Used by SSE clients to resolve endpoint event URLs against the SSE connection URL, and generally useful for any URL reference resolution.

See: https://www.rfc-editor.org/rfc/rfc3986#section-5.2.2

func SSEServe added in v0.0.8

func SSEServe[O any, S SSEConn[O]](handler SSEHandler[O, S], config *SSEConnConfig) http.HandlerFunc

SSEServe creates an http.HandlerFunc that establishes SSE connections and manages their lifecycle. This is the primary entry point for creating SSE endpoints, analogous to WSServe for WebSocket endpoints.

The handler validates incoming requests and creates connection instances. The config controls keepalive behavior; if nil, DefaultSSEConnConfig is used.

Example:

router.HandleFunc("/events", gohttp.SSEServe[MyEvent](&MySSEHandler{}, nil))

The lifecycle is:

  1. handler.Validate() is called to check the request
  2. SSE headers are set (Content-Type, Cache-Control, etc.)
  3. conn.OnStart() is called to initialize the connection
  4. Keepalive comments are sent at the configured interval
  5. On client disconnect (context cancellation), conn.OnClose() is called

Important: Set http.Server.WriteTimeout = 0 for SSE endpoints to prevent the server from closing long-lived connections. See middleware.ApplyDefaults.

Per WHATWG SSE spec: https://html.spec.whatwg.org/multipage/server-sent-events.html

Example

ExampleSSEServe demonstrates setting up an SSE endpoint using SSEServe. The handler creates a connection that pushes events to the client. SSE endpoints require http.Server.WriteTimeout = 0 for long-lived streams.

package main

import (
	"log"
	"net/http"

	"github.com/gorilla/mux"
	gohttp "github.com/panyam/servicekit/http"
)

func main() {
	router := mux.NewRouter()
	router.HandleFunc("/events", gohttp.SSEServe[any](&gohttp.JSONSSEHandler{}, nil))

	srv := &http.Server{
		Addr:         ":8080",
		Handler:      router,
		WriteTimeout: 0, // Required for SSE!
	}
	log.Fatal(srv.ListenAndServe())
}

func SendJsonResponse

func SendJsonResponse(writer http.ResponseWriter, resp any, err error)

SendJsonResponse writes a JSON response to the http.ResponseWriter. If err is nil, resp is marshaled to JSON and written with status 200 OK. If err is non-nil, an appropriate HTTP error code is set based on the gRPC status code (if present), and an error object is returned in the response body.

The function handles gRPC status errors by extracting the code and message, and maps them to appropriate HTTP status codes via ErrorToHttpCode.

func SplitBatch added in v0.0.20

func SplitBatch(body []byte) ([]json.RawMessage, error)

SplitBatch splits a JSON array into individual raw JSON messages. Returns an error if the body is not a valid JSON array.

Each element in the returned slice is a complete JSON value (object or otherwise) that can be independently unmarshaled as a JSON-RPC request.

func StreamableServe added in v0.0.9

func StreamableServe(handler StreamableHandlerFunc, config *StreamableConfig) http.HandlerFunc

StreamableServe creates an http.HandlerFunc that supports both synchronous JSON responses and SSE event streaming from a single endpoint. This is the "POST-that-optionally-streams" pattern used by MCP 2025-03-26 Streamable HTTP.

The handler function decides per-request whether to return a SingleResponse (JSON) or StreamResponse (SSE stream). StreamableServe handles content-type negotiation, SSE formatting, flushing, and client disconnect detection.

Example:

router.HandleFunc("/rpc", gohttp.StreamableServe(myHandler, nil))

For SSE streaming, set http.Server.WriteTimeout = 0 to prevent the server from closing long-lived connections.

Per WHATWG SSE spec: https://html.spec.whatwg.org/multipage/server-sent-events.html

Example

ExampleStreamableServe demonstrates the POST-that-optionally-streams pattern from MCP 2025-03-26. The handler decides per-request whether to return a JSON response or an SSE event stream.

package main

import (
	"context"
	"fmt"
	"net/http"

	gohttp "github.com/panyam/servicekit/http"
)

func main() {
	handler := gohttp.StreamableServe(
		func(ctx context.Context, r *http.Request) gohttp.StreamableResponse {
			// Check if client wants streaming
			if r.Header.Get("Accept") == "text/event-stream" {
				ch := make(chan gohttp.SSEEvent)
				go func() {
					defer close(ch)
					ch <- gohttp.SSEEvent{Event: "progress", Data: map[string]any{"pct": 50}}
					ch <- gohttp.SSEEvent{Event: "result", Data: map[string]any{"done": true}}
				}()
				return gohttp.StreamResponse{Events: ch}
			}
			// Default: synchronous JSON response
			return gohttp.SingleResponse{Body: map[string]any{"result": "ok"}}
		},
		nil, // default config (JSONCodec)
	)

	http.HandleFunc("/rpc", handler)
	fmt.Println("Streamable handler registered")
}
Output:
Streamable handler registered

func WSConnJSONReaderWriter

func WSConnJSONReaderWriter(conn *websocket.Conn) (reader *conc.Reader[gut.StrMap], writer *conc.Writer[conc.Message[gut.StrMap]])

WSConnJSONReaderWriter creates concurrent reader and writer for JSON messages over a WebSocket connection. The reader decodes incoming JSON messages into StrMap (map[string]any), and the writer sends outgoing StrMap messages as JSON.

The reader handles connection close errors gracefully by converting them to net.ErrClosed. The writer handles io.EOF as a normal stream end and uses WSConnWriteError for error messages.

This is useful for creating bidirectional JSON message streams over WebSocket.

func WSConnWriteError

func WSConnWriteError(wsConn *websocket.Conn, err error) error

WSConnWriteError writes an error message to a WebSocket connection. If err is nil or io.EOF, no message is sent and nil is returned. For gRPC status errors, the error code is extracted and sent as JSON. The message is sent as a text frame containing JSON: {"error": <code>}

func WSConnWriteMessage

func WSConnWriteMessage(wsConn *websocket.Conn, msg any) error

WSConnWriteMessage writes a JSON message to a WebSocket connection. The message is marshaled to JSON and sent as a text frame. Returns any error from marshaling or writing.

func WSHandleConn

func WSHandleConn[I any, S WSConn[I]](conn *websocket.Conn, ctx S, config *WSConnConfig)

WSHandleConn manages the lifecycle of an established WebSocket connection. It handles:

  • Periodic ping messages for connection health checks
  • Timeout detection when no data is received within PongPeriod
  • Message reading and dispatching to ctx.HandleMessage()
  • Error handling via ctx.OnError()
  • Clean shutdown via ctx.OnClose()

This function is called automatically by WSServe, but can also be used directly when you have an established WebSocket connection from another source.

The function blocks until the connection is closed or an unrecoverable error occurs.

func WSServe

func WSServe[I any, S WSConn[I]](handler WSHandler[I, S], config *WSConnConfig) http.HandlerFunc

WSServe creates an http.HandlerFunc that upgrades HTTP requests to WebSocket connections and manages their lifecycle. This is the primary entry point for creating WebSocket endpoints.

The handler validates incoming requests and creates connection instances. The config controls upgrade behavior and timing; if nil, DefaultWSConnConfig is used.

Example:

router.HandleFunc("/ws", gohttp.WSServe(&MyHandler{}, nil))

The lifecycle is:

  1. handler.Validate() is called to check the request
  2. If valid, the connection is upgraded to WebSocket
  3. conn.OnStart() is called to initialize the connection
  4. Messages are read and passed to conn.HandleMessage()
  5. On close, conn.OnClose() is called for cleanup
Example

ExampleWSServe demonstrates the basic pattern for creating a WebSocket endpoint. The handler validates connections (auth, etc.) and WSServe manages the lifecycle (upgrade, heartbeats, message dispatch, cleanup).

r := mux.NewRouter()
r.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
	fmt.Fprintf(w, "Publishing Custom Message")
})

r.HandleFunc("/subscribe", WSServe(&JSONHandler{}, nil))
srv := http.Server{Handler: r}
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
	log.Fatal(err)
}

func WriteFrame added in v0.0.17

func WriteFrame(w io.Writer, data []byte) error

WriteFrame writes a Content-Length framed message in LSP format:

Content-Length: <n>\r\n\r\n<body>

This framing is used by the Language Server Protocol (LSP) and MCP's stdio transport to delimit JSON-RPC messages over byte streams (pipes, stdin/stdout).

See: https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#headerPart

Types

type AtomicIDGen added in v0.0.19

type AtomicIDGen struct {
	// contains filtered or unexported fields
}

AtomicIDGen is an IDGen backed by an atomic counter. It produces decimal string IDs ("1", "2", "3", ...). Suitable for per-session or per-stream ID generation where global uniqueness is not required.

Zero value is ready to use.

func (*AtomicIDGen) Next added in v0.0.19

func (g *AtomicIDGen) Next() string

Next returns the next unique ID as a decimal string.

type AuthRetryConfig added in v0.0.18

type AuthRetryConfig struct {
	// SetAuth injects authentication credentials into a request.
	// Typically sets the Authorization: Bearer header.
	// Called before each request attempt (including retries).
	SetAuth func(req *http.Request) error

	// OnUnauthorized is called when the server returns 401.
	// Should refresh the token so the next SetAuth call uses fresh credentials.
	// Return nil to retry, non-nil error to give up.
	OnUnauthorized func(resp *http.Response) error

	// OnForbidden is called when the server returns 403.
	// The response is provided so the handler can parse WWW-Authenticate
	// for required scopes. Return nil to retry, non-nil error to give up.
	OnForbidden func(resp *http.Response) error
}

AuthRetryConfig configures automatic retry on HTTP 401 (Unauthorized) and 403 (Forbidden) responses. This implements the standard OAuth 2.0 Bearer token retry pattern: refresh on 401, scope step-up on 403.

All fields are optional. If nil, the corresponding retry is skipped.

type AuthRetryError added in v0.0.18

type AuthRetryError struct {
	// StatusCode is the HTTP status (401 or 403).
	StatusCode int
	// Message describes the failure (response body or callback error message).
	Message string
	// WWWAuthenticate is the raw WWW-Authenticate header from the server response.
	WWWAuthenticate string
	// RequiredScopes are the scopes parsed from WWW-Authenticate per RFC 6750.
	// Populated automatically from the "scope" parameter. Empty if not present.
	RequiredScopes []string
	// Cause is the underlying error from the callback, if the failure was due
	// to a callback error rather than retry exhaustion. Nil when retries were
	// simply exhausted.
	Cause error
}

AuthRetryError is returned when authentication retry fails (401/403 exhausted or callback returned an error). Captures the HTTP response metadata for diagnostic and programmatic use.

RequiredScopes is automatically parsed from the WWW-Authenticate header's "scope" parameter per RFC 6750 §3, so callers don't need to parse it themselves.

func (*AuthRetryError) Error added in v0.0.18

func (e *AuthRetryError) Error() string

func (*AuthRetryError) Unwrap added in v0.0.18

func (e *AuthRetryError) Unwrap() error

type BaseConn added in v0.0.4

type BaseConn[I any, O any] struct {
	// Codec handles message encoding/decoding.
	// Must be set before the connection is used.
	Codec Codec[I, O]

	// Writer is the output channel for sending messages.
	// Handles all outgoing messages: data, pings, and errors.
	// Initialized in OnStart.
	Writer *conc.Writer[OutgoingMessage[O]]

	// NameStr is an optional human-readable name for this connection.
	NameStr string

	// ConnIdStr is a unique identifier for this connection.
	// Auto-generated if not set.
	ConnIdStr string

	// PingId tracks the current ping sequence number.
	PingId int64
	// contains filtered or unexported fields
}

BaseConn is a generic WebSocket connection that separates transport from encoding. It uses a Codec to handle message serialization/deserialization.

Type parameters:

  • I: Input message type (received from client)
  • O: Output message type (sent to client)

Usage:

type MyConn struct {
    gohttp.BaseConn[MyInput, MyOutput]
}

func (c *MyConn) HandleMessage(msg MyInput) error {
    // msg is already typed!
    return nil
}

func (*BaseConn[I, O]) ConnId added in v0.0.4

func (b *BaseConn[I, O]) ConnId() string

ConnId returns the connection ID, generating one if not set.

func (*BaseConn[I, O]) DebugInfo added in v0.0.4

func (b *BaseConn[I, O]) DebugInfo() any

DebugInfo returns debug information about the connection.

func (*BaseConn[I, O]) HandleMessage added in v0.0.4

func (b *BaseConn[I, O]) HandleMessage(msg I) error

HandleMessage processes an incoming message. Default implementation just logs; override in embedding struct.

func (*BaseConn[I, O]) InputChan added in v0.0.4

func (b *BaseConn[I, O]) InputChan() chan<- OutgoingMessage[O]

InputChan returns the Writer's input channel for use with FanOut.

func (*BaseConn[I, O]) Name added in v0.0.4

func (b *BaseConn[I, O]) Name() string

Name returns the connection name.

func (*BaseConn[I, O]) OnClose added in v0.0.4

func (b *BaseConn[I, O]) OnClose()

OnClose cleans up when the connection closes.

func (*BaseConn[I, O]) OnError added in v0.0.4

func (b *BaseConn[I, O]) OnError(err error) error

OnError handles connection errors. Return nil to suppress the error and continue, or return the error to close.

func (*BaseConn[I, O]) OnStart added in v0.0.4

func (b *BaseConn[I, O]) OnStart(conn *websocket.Conn) error

OnStart initializes the connection after WebSocket upgrade. Creates the Writer with codec-aware encoding.

func (*BaseConn[I, O]) OnTimeout added in v0.0.4

func (b *BaseConn[I, O]) OnTimeout() bool

OnTimeout handles read timeout. Return true to close the connection, false to keep it alive.

func (*BaseConn[I, O]) ReadMessage added in v0.0.4

func (b *BaseConn[I, O]) ReadMessage(conn *websocket.Conn) (I, error)

ReadMessage reads and decodes the next message from the WebSocket connection. Uses the configured Codec to decode the raw bytes.

func (*BaseConn[I, O]) SendError added in v0.0.4

func (b *BaseConn[I, O]) SendError(err error)

SendError sends an error to the client.

func (*BaseConn[I, O]) SendOutput added in v0.0.4

func (b *BaseConn[I, O]) SendOutput(msg O)

SendOutput sends a typed output message to the client. This is a convenience method that wraps the Writer.Send call.

func (*BaseConn[I, O]) SendPing added in v0.0.4

func (b *BaseConn[I, O]) SendPing() error

SendPing sends a ping message through the Writer. This ensures thread-safe writes by going through the serialized Writer.

type BaseHandler added in v0.0.4

type BaseHandler[I any, O any] struct {
	// Codec is used for all connections created by this handler.
	Codec Codec[I, O]

	// Name is the optional name for connections.
	Name string
}

BaseHandler is a simple handler that creates BaseConn instances. Useful for quick prototyping or when you don't need custom validation.

func (*BaseHandler[I, O]) Validate added in v0.0.4

func (h *BaseHandler[I, O]) Validate(w http.ResponseWriter, r *http.Request) (*BaseConn[I, O], bool)

Validate implements WSHandler. Creates a new BaseConn with the configured codec.

type BaseSSEConn added in v0.0.8

type BaseSSEConn[O any] struct {
	// Codec handles output message serialization.
	// Only Encode() is used (SSE has no input decoding).
	// Must be set before the connection is used.
	Codec Codec[any, O]

	// Writer serializes all outgoing messages through a single goroutine.
	// Initialized in OnStart. Use SendOutput/SendEvent instead of direct access.
	Writer *conc.Writer[SSEOutgoingMessage[O]]

	// NameStr is an optional human-readable name for this connection.
	NameStr string

	// ConnIdStr is a unique identifier for this connection.
	// Auto-generated if not set.
	ConnIdStr string
	// contains filtered or unexported fields
}

BaseSSEConn is a generic SSE connection that handles message serialization and thread-safe writes. It mirrors BaseConn[I, O] but for write-only SSE streams.

Type parameter O is the output message type sent to clients.

Usage:

type MySSEConn struct {
    gohttp.BaseSSEConn[MyEvent]
}

func (c *MySSEConn) OnStart(w http.ResponseWriter, r *http.Request) error {
    if err := c.BaseSSEConn.OnStart(w, r); err != nil {
        return err
    }
    // custom initialization
    return nil
}

Important: For SSE endpoints, set http.Server.WriteTimeout = 0 to prevent the server from closing long-lived SSE connections. See middleware.ApplyDefaults documentation for details.

func (*BaseSSEConn[O]) Close added in v0.0.8

func (b *BaseSSEConn[O]) Close()

Close signals the connection to terminate. SSEServe will detect this via the Done channel and call OnClose.

func (*BaseSSEConn[O]) ConnId added in v0.0.8

func (b *BaseSSEConn[O]) ConnId() string

ConnId returns the connection ID, generating one if not set.

func (*BaseSSEConn[O]) Done added in v0.0.8

func (b *BaseSSEConn[O]) Done() <-chan struct{}

Done returns a channel that is closed when the connection should terminate. Use this to signal SSEServe to exit the event loop from application code.

func (*BaseSSEConn[O]) InputChan added in v0.0.8

func (b *BaseSSEConn[O]) InputChan() chan<- SSEOutgoingMessage[O]

InputChan returns the Writer's input channel for use with FanOut.

func (*BaseSSEConn[O]) Name added in v0.0.8

func (b *BaseSSEConn[O]) Name() string

Name returns the connection name.

func (*BaseSSEConn[O]) OnClose added in v0.0.8

func (b *BaseSSEConn[O]) OnClose()

OnClose cleans up the SSE connection by stopping the Writer and closing the done channel. Always call the parent OnClose when overriding:

func (c *MySSEConn) OnClose() {
    // custom cleanup
    c.BaseSSEConn.OnClose()
}

func (*BaseSSEConn[O]) OnStart added in v0.0.8

func (b *BaseSSEConn[O]) OnStart(w http.ResponseWriter, r *http.Request) error

OnStart initializes the SSE connection. It asserts that the ResponseWriter supports http.Flusher (required for streaming), then creates the Writer with SSE-format dispatch.

The Writer callback formats each message according to the SSE wire protocol:

  • Comments: ": {text}\n\n"
  • Data events: "event: {type}\nid: {id}\ndata: {json}\n\n"

Per WHATWG SSE spec, multi-line data is split into separate "data:" lines.

func (*BaseSSEConn[O]) Ready added in v0.0.10

func (b *BaseSSEConn[O]) Ready() <-chan struct{}

Ready returns a channel that is closed when OnStart completes and the Writer is initialized. Safe to call before OnStart (the channel is created lazily). Use this to wait before sending messages:

<-conn.Ready()
conn.SendOutput(msg)

func (*BaseSSEConn[O]) SendEvent added in v0.0.8

func (b *BaseSSEConn[O]) SendEvent(event string, msg O)

SendEvent sends a named event to the client. The event type is set via the SSE "event:" field, allowing clients to listen with addEventListener.

Per WHATWG SSE spec, if no event type is specified, clients receive the event via the "message" event handler.

func (*BaseSSEConn[O]) SendEventWithID added in v0.0.8

func (b *BaseSSEConn[O]) SendEventWithID(event string, id string, msg O)

SendEventWithID sends a named event with an ID. The ID is set via the SSE "id:" field, enabling client-side reconnection via the Last-Event-ID header.

Per WHATWG SSE spec (https://html.spec.whatwg.org/multipage/server-sent-events.html#the-last-event-id-string): on reconnection, the browser includes "Last-Event-ID: {id}" so the server can resume the event stream from the correct position.

func (*BaseSSEConn[O]) SendKeepalive added in v0.0.8

func (b *BaseSSEConn[O]) SendKeepalive()

SendKeepalive sends an SSE comment as a keepalive signal. The comment format (": keepalive\n\n") is ignored by EventSource clients but prevents intermediate proxies from closing idle connections.

func (*BaseSSEConn[O]) SendOutput added in v0.0.8

func (b *BaseSSEConn[O]) SendOutput(msg O)

SendOutput sends a data message to the client. The message is serialized using the configured Codec and formatted as an SSE data event.

This is a convenience method for sending events without a named type. For named events, use SendEvent or SendEventWithID.

func (*BaseSSEConn[O]) SendRetry added in v0.0.23

func (b *BaseSSEConn[O]) SendRetry(ms int)

SendRetry emits a bare SSE "retry:" field to change the client's reconnection delay. Used for server-initiated disconnect hints — e.g., a long-running tool handler tells the client to back off for N milliseconds before reconnecting. Has no effect if ms <= 0 or the Writer is nil.

Per WHATWG SSE spec (https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream): the retry field sets the client's reconnection time in integer milliseconds. Clients that do not support the field ignore it.

To combine a retry hint with a data delivery in one event, set Retry on an SSEOutgoingMessage that also has Data set, and pass it to the Writer directly.

type BiDirStreamConfig

type BiDirStreamConfig struct {
	// PingPeriod specifies how often to send ping messages to the remote peer.
	// Pings are used as heartbeat messages to verify the connection is alive.
	// Default: 30 seconds.
	PingPeriod time.Duration

	// PongPeriod specifies the maximum time to wait for any data (ping, pong, or
	// regular messages) from the remote peer before considering the connection dead.
	// If no data is received within this duration, OnTimeout() is called.
	// Default: 300 seconds (5 minutes).
	PongPeriod time.Duration
}

BiDirStreamConfig provides configuration for bidirectional stream connections. It controls the timing of health checks and connection timeout detection.

func DefaultBiDirStreamConfig

func DefaultBiDirStreamConfig() *BiDirStreamConfig

DefaultBiDirStreamConfig returns a BiDirStreamConfig with sensible defaults:

  • PingPeriod: 30 seconds
  • PongPeriod: 300 seconds (5 minutes)

These defaults are suitable for most production deployments. For development or testing, you may want shorter periods for faster feedback.

type BiDirStreamConn

type BiDirStreamConn[I any] interface {
	// SendPing sends a heartbeat ping message to the remote peer.
	// Called periodically based on BiDirStreamConfig.PingPeriod.
	// The implementation should encode and send an appropriate ping message.
	SendPing() error

	// Name returns an optional human-readable name for this connection type.
	// Used for logging and debugging purposes.
	Name() string

	// ConnId returns a unique identifier for this specific connection instance.
	// Useful for tracking individual connections in logs and metrics.
	ConnId() string

	// HandleMessage processes an incoming message of type I.
	// Called for each message received from the remote peer.
	// Return an error to signal that the connection should be closed.
	HandleMessage(msg I) error

	// OnError is called when an error occurs during connection operation.
	// Return nil to suppress the error and continue the connection.
	// Return the error (or a different error) to close the connection.
	OnError(err error) error

	// OnClose is called when the connection is closing for any reason.
	// Use this hook to clean up resources (timers, goroutines, etc.)
	// and perform any final actions (logging, metrics, etc.).
	OnClose()

	// OnTimeout is called when no data has been received within the PongPeriod.
	// Return true to close the connection, false to continue waiting.
	// Typically returns true, but may return false for connections that
	// expect long periods of silence.
	OnTimeout() bool
}

BiDirStreamConn defines the lifecycle and message handling interface for bidirectional stream connections. Implementations handle messages of type I and manage connection state through lifecycle hooks.

The typical lifecycle is:

  1. Connection established, OnStart() called (not part of this interface)
  2. Messages received and processed via HandleMessage()
  3. Periodic SendPing() calls to maintain connection health
  4. On errors, OnError() is called to determine if connection should continue
  5. On timeout (no data received within PongPeriod), OnTimeout() is called
  6. Connection ends, OnClose() is called for cleanup

type BinaryProtoCodec added in v0.0.4

type BinaryProtoCodec[I proto.Message, O proto.Message] struct {
	// NewInput is an optional factory function to create new input instances.
	// When provided, this is used for optimal performance.
	NewInput func() I

	// Input is an exemplar instance used as fallback when NewInput is nil.
	Input I
}

BinaryProtoCodec handles encoding/decoding of protobuf messages using binary format. This provides maximum efficiency for high-throughput scenarios.

For best performance, provide NewInput factory function. If not provided, the codec falls back to reflection using the Input exemplar (slower).

func (*BinaryProtoCodec[I, O]) Decode added in v0.0.4

func (c *BinaryProtoCodec[I, O]) Decode(data []byte, msgType MessageType) (I, error)

Decode unmarshals binary protobuf data into a new message instance.

func (*BinaryProtoCodec[I, O]) Encode added in v0.0.4

func (c *BinaryProtoCodec[I, O]) Encode(msg O) ([]byte, MessageType, error)

Encode marshals a protobuf message to binary bytes.

type Codec added in v0.0.4

type Codec[I any, O any] interface {
	// Decode converts raw WebSocket data into a typed input message.
	// msgType indicates whether the data was received as text or binary.
	Decode(data []byte, msgType MessageType) (I, error)

	// Encode converts a typed output message to raw bytes for sending.
	// Returns the encoded bytes and the appropriate message type (text/binary).
	Encode(msg O) ([]byte, MessageType, error)
}

Codec handles encoding/decoding of messages over WebSocket. The type parameters I and O represent input (received) and output (sent) message types. Note: Pings are handled at the transport layer (BaseConn), not by codecs.

type EventStore added in v0.0.19

type EventStore interface {
	// Store persists an event for the given stream. Events are ordered by
	// insertion time within a stream.
	Store(streamID string, event StoredEvent) error

	// Replay returns all events stored after the event with the given
	// lastEventID. If lastEventID is not found in the stream, all stored
	// events are returned (conservative fallback — the client may have
	// been disconnected long enough for the anchor event to be evicted).
	//
	// Returns an empty slice if the stream does not exist or if
	// lastEventID is the most recent event.
	Replay(streamID string, lastEventID string) ([]StoredEvent, error)

	// Trim removes all stored events for the given stream. Call this
	// when a session is destroyed to prevent unbounded memory growth.
	Trim(streamID string) error
}

EventStore persists SSE events for stream resumption. When a client reconnects with a Last-Event-ID header, the server replays missed events from the store.

Implementations must be safe for concurrent use by multiple goroutines.

The streamID parameter groups events by logical stream (e.g., session ID). Event IDs are opaque strings — ordering is determined by insertion order, not by parsing the ID value.

type GracefulOption added in v0.0.9

type GracefulOption func(*gracefulConfig)

GracefulOption configures ListenAndServeGraceful behavior.

func WithContext added in v0.0.9

func WithContext(ctx context.Context) GracefulOption

WithContext sets a parent context. Shutdown is triggered when this context is cancelled, in addition to OS signals. This is useful for programmatic shutdown in tests or when embedding the server in a larger application.

func WithDrainTimeout added in v0.0.9

func WithDrainTimeout(d time.Duration) GracefulOption

WithDrainTimeout sets the maximum time to wait for in-flight requests to complete during shutdown. Default: 30 seconds.

After this timeout expires, remaining connections are forcefully closed.

func WithOnShutdown added in v0.0.9

func WithOnShutdown(fn func()) GracefulOption

WithOnShutdown registers a callback to be invoked when shutdown is triggered, BEFORE the server begins draining connections. Multiple callbacks are called in registration order.

Use this to notify long-lived connections before they are closed:

  • SSEHub.CloseAll() — closes SSE Done channels, unblocking SSEServe handlers
  • Custom WebSocket goodbye messages
  • Flush logs, metrics, or other state

Callbacks run while connections are still open, so they can send final messages to clients.

func WithSignals added in v0.0.9

func WithSignals(sigs ...os.Signal) GracefulOption

WithSignals sets the OS signals that trigger shutdown. Default: SIGTERM, SIGINT.

type HTTPError

type HTTPError struct {
	Code    int
	Message string
}

Representing HTTP specific errors

func (*HTTPError) Error

func (t *HTTPError) Error() string

type HTTPStatusError added in v0.0.17

type HTTPStatusError struct {
	StatusCode int
	Header     http.Header
	Body       string
}

HTTPStatusError represents an HTTP response with a non-2xx status code. It captures the status code, response headers, and response body for error reporting, classification (e.g., 5xx errors are typically transient/retriable), and programmatic inspection (e.g., reading WWW-Authenticate or Retry-After).

func (*HTTPStatusError) Error added in v0.0.17

func (e *HTTPStatusError) Error() string

type IDGen added in v0.0.19

type IDGen interface {
	Next() string
}

IDGen generates unique string IDs. Implementations must be safe for concurrent use. IDs are opaque — callers must not assume any ordering, format, or structure beyond uniqueness within the generator's scope.

type JSONCodec added in v0.0.4

type JSONCodec struct{}

JSONCodec handles encoding/decoding of arbitrary JSON messages. This is useful for dynamic message handling where the structure isn't known at compile time.

Example

ExampleJSONCodec demonstrates the default untyped JSON codec. Use this for dynamic messages where the structure isn't known at compile time. This is what JSONConn uses internally.

package main

import (
	"fmt"

	gohttp "github.com/panyam/servicekit/http"
)

func main() {
	codec := &gohttp.JSONCodec{}

	// Encode any value
	data, _, err := codec.Encode(map[string]any{"action": "join", "room": "lobby"})
	if err != nil {
		panic(err)
	}
	fmt.Printf("Encoded: %s\n", data)

	// Decode to any
	msg, err := codec.Decode(data, gohttp.TextMessage)
	if err != nil {
		panic(err)
	}
	m := msg.(map[string]any)
	fmt.Printf("Action: %s\n", m["action"])

}
Output:
Encoded: {"action":"join","room":"lobby"}
Action: join

func (*JSONCodec) Decode added in v0.0.4

func (c *JSONCodec) Decode(data []byte, msgType MessageType) (any, error)

Decode unmarshals JSON data into an untyped any value.

func (*JSONCodec) Encode added in v0.0.4

func (c *JSONCodec) Encode(msg any) ([]byte, MessageType, error)

Encode marshals any value to JSON bytes.

type JSONConn

type JSONConn = BaseConn[any, any]

JSONConn is an alias for BaseConn with untyped JSON messages. This provides a simple way to handle dynamic JSON messages.

For typed messages, use BaseConn[YourInputType, YourOutputType] directly with an appropriate codec.

func NewJSONConn added in v0.0.4

func NewJSONConn() *JSONConn

NewJSONConn creates a new JSONConn with the default JSON codec.

type JSONHandler

type JSONHandler struct{}

JSONHandler is a simple handler that creates JSONConn instances. All connections are accepted (no validation).

func (*JSONHandler) Validate

func (j *JSONHandler) Validate(w http.ResponseWriter, r *http.Request) (*JSONConn, bool)

Validate implements WSHandler. Accepts all connections.

type JSONSSEConn added in v0.0.8

type JSONSSEConn = BaseSSEConn[any]

JSONSSEConn is an alias for BaseSSEConn with untyped JSON messages. This provides a simple way to send dynamic JSON data over SSE.

For typed messages, use BaseSSEConn[YourOutputType] directly with an appropriate codec.

type JSONSSEHandler added in v0.0.8

type JSONSSEHandler struct{}

JSONSSEHandler is a simple handler that creates JSONSSEConn instances. All connections are accepted (no validation).

func (*JSONSSEHandler) Validate added in v0.0.8

Validate implements SSEHandler. Accepts all connections.

type MemoryEventStore added in v0.0.19

type MemoryEventStore struct {
	// contains filtered or unexported fields
}

MemoryEventStore is an in-memory EventStore backed by bounded per-stream slices. When a stream exceeds maxPerStream events, the oldest events are dropped (FIFO eviction).

This implementation is suitable for single-process deployments. For multi-process or persistent resumption, use a Redis or database-backed EventStore.

Thread-safe: all methods are protected by a sync.RWMutex.

func NewMemoryEventStore added in v0.0.19

func NewMemoryEventStore(maxPerStream int) *MemoryEventStore

NewMemoryEventStore creates a MemoryEventStore with the given maximum events per stream. If maxPerStream <= 0, it defaults to 1000.

func (*MemoryEventStore) Replay added in v0.0.19

func (s *MemoryEventStore) Replay(streamID string, lastEventID string) ([]StoredEvent, error)

Replay returns all events after lastEventID by scanning for it in the stream's insertion-ordered slice. If lastEventID is not found (e.g., evicted), all stored events are returned as a conservative fallback.

func (*MemoryEventStore) Store added in v0.0.19

func (s *MemoryEventStore) Store(streamID string, event StoredEvent) error

Store appends an event to the stream. If the stream exceeds maxPerStream, the oldest event is dropped.

func (*MemoryEventStore) Trim added in v0.0.19

func (s *MemoryEventStore) Trim(streamID string) error

Trim removes all stored events for the given stream.

type MessageType added in v0.0.4

type MessageType int

MessageType represents the WebSocket frame type

const (
	// TextMessage denotes a text data message (UTF-8 encoded)
	TextMessage MessageType = websocket.TextMessage // 1

	// BinaryMessage denotes a binary data message
	BinaryMessage MessageType = websocket.BinaryMessage // 2
)

type OutgoingMessage added in v0.0.4

type OutgoingMessage[O any] struct {
	// Data is a regular output message (mutually exclusive with Ping/Error)
	Data *O

	// Ping is a heartbeat message (mutually exclusive with Data/Error)
	Ping *PingData

	// Error is an error message (mutually exclusive with Data/Ping)
	Error error
}

OutgoingMessage represents any message that can be sent over the WebSocket. This union type allows pings, errors, and data messages to all go through the same Writer, avoiding concurrent write issues.

type PingData added in v0.0.4

type PingData struct {
	PingId int64
	ConnId string
	Name   string
}

PingData contains ping message metadata.

type ProtoJSONCodec added in v0.0.4

type ProtoJSONCodec[I proto.Message, O proto.Message] struct {
	// NewInput is an optional factory function to create new input instances.
	// When provided, this is used for optimal performance.
	// Example: func() *pb.PlayerAction { return &pb.PlayerAction{} }
	NewInput func() I

	// Input is an exemplar instance used as fallback when NewInput is nil.
	// The codec uses reflection to create new instances of the same type.
	// Example: &pb.PlayerAction{}
	Input I

	// MarshalOptions configures protojson marshaling behavior.
	MarshalOptions protojson.MarshalOptions

	// UnmarshalOptions configures protojson unmarshaling behavior.
	UnmarshalOptions protojson.UnmarshalOptions
}

ProtoJSONCodec handles encoding/decoding of protobuf messages using JSON format. This provides human-readable wire format while maintaining proto type safety.

For best performance, provide NewInput factory function. If not provided, the codec falls back to reflection using the Input exemplar (slower).

func (*ProtoJSONCodec[I, O]) Decode added in v0.0.4

func (c *ProtoJSONCodec[I, O]) Decode(data []byte, msgType MessageType) (I, error)

Decode unmarshals JSON data into a new protobuf message instance.

func (*ProtoJSONCodec[I, O]) Encode added in v0.0.4

func (c *ProtoJSONCodec[I, O]) Encode(msg O) ([]byte, MessageType, error)

Encode marshals a protobuf message to JSON bytes.

type SSEConn added in v0.0.8

type SSEConn[O any] interface {
	// Name returns a human-readable name for this connection type.
	// Used for logging and debugging.
	Name() string

	// ConnId returns a unique identifier for this connection instance.
	// Used for tracking in SSEHub and logging.
	ConnId() string

	// OnStart is called when the SSE connection is established.
	// Receives the ResponseWriter (which must implement http.Flusher) and
	// the Request (whose Context signals client disconnect).
	// Return an error to reject the connection.
	OnStart(w http.ResponseWriter, r *http.Request) error

	// OnClose is called when the connection ends (client disconnect or server close).
	// Use this to clean up resources.
	OnClose()

	// SendKeepalive sends an SSE comment as a keepalive signal.
	// Called automatically by SSEServe at the configured interval.
	SendKeepalive()

	// Done returns a channel that is closed when the connection should terminate.
	// SSEServe monitors this channel to detect programmatic close requests.
	Done() <-chan struct{}
}

SSEConn represents a server-sent events connection. It is the SSE counterpart to WSConn — write-only (server to client), with lifecycle hooks for setup and teardown.

Implementations typically embed BaseSSEConn[O] and may override OnStart/OnClose for custom initialization and cleanup logic.

SSE connections are unidirectional (server → client) per the WHATWG spec: https://html.spec.whatwg.org/multipage/server-sent-events.html

type SSEConnConfig added in v0.0.8

type SSEConnConfig struct {
	// KeepalivePeriod specifies how often to send SSE comment keepalives.
	// SSE comments (lines starting with ":") are ignored by EventSource clients
	// but keep the TCP connection alive through proxies (nginx default idle
	// timeout: 60s, AWS ALB: 60s).
	//
	// Set to 0 to disable keepalives. Default: 30 seconds.
	//
	// See WHATWG SSE spec: https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
	KeepalivePeriod time.Duration
}

SSEConnConfig controls the behavior of SSE connections.

func DefaultSSEConnConfig added in v0.0.8

func DefaultSSEConnConfig() *SSEConnConfig

DefaultSSEConnConfig returns an SSEConnConfig with sensible defaults:

  • KeepalivePeriod: 30 seconds

These defaults keep connections alive through most reverse proxies. For SSE endpoints behind proxies with shorter idle timeouts, reduce the keepalive period accordingly.

type SSEEvent added in v0.0.9

type SSEEvent struct {
	// Event is the optional SSE event type ("event:" field).
	// If empty, the event type defaults to "message" on the client side.
	Event string

	// Data is the event payload. JSON-marshaled into the "data:" field.
	Data any

	// ID is the optional SSE event ID ("id:" field).
	// Enables client reconnection via the Last-Event-ID header.
	ID string
}

SSEEvent represents a single Server-Sent Event to be streamed. Used by StreamResponse to describe each event in the stream.

Per WHATWG SSE spec:

  • Event sets the "event:" field (clients listen via addEventListener)
  • Data is JSON-marshaled into the "data:" field
  • ID sets the "id:" field (enables reconnection via Last-Event-ID)

type SSEEventReader added in v0.0.16

type SSEEventReader struct {
	// contains filtered or unexported fields
}

SSEEventReader reads Server-Sent Events from an io.Reader.

Usage:

reader := NewSSEEventReader(resp.Body)
for {
    ev, err := reader.ReadEvent()
    if err != nil {
        break
    }
    // process ev
}

The reader handles all WHATWG SSE spec details: field parsing, multi-line data concatenation, comment skipping, retry parsing, and BOM stripping.

See: https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream

func NewSSEEventReader added in v0.0.16

func NewSSEEventReader(r io.Reader) *SSEEventReader

NewSSEEventReader creates an SSEEventReader that parses SSE events from r.

func (*SSEEventReader) ReadEvent added in v0.0.16

func (r *SSEEventReader) ReadEvent() (SSEReadEvent, error)

ReadEvent reads the next SSE event from the stream.

It blocks until a complete event (terminated by a blank line) is available, or an error occurs. Empty events (no fields accumulated between blank lines) are skipped automatically per the WHATWG dispatch algorithm: https://html.spec.whatwg.org/multipage/server-sent-events.html#dispatchMessage

On EOF mid-event (fields accumulated but no trailing blank line), the partial event is returned along with io.EOF. This supports servers that close the response body without a trailing blank line.

On EOF with no fields accumulated, returns a zero SSEReadEvent and io.EOF.

type SSEHandler added in v0.0.8

type SSEHandler[O any, S SSEConn[O]] interface {
	// Validate checks if the HTTP request should establish an SSE stream.
	// Return (connection, true) to proceed with the SSE connection.
	// Return (nil, false) to reject (the handler should write the error response).
	Validate(w http.ResponseWriter, r *http.Request) (S, bool)
}

SSEHandler validates HTTP requests and creates SSE connections. It acts as a factory for SSEConn instances, analogous to WSHandler for WebSocket connections.

Type parameters:

  • O: The output message type sent to the client
  • S: The specific SSEConn implementation type

type SSEHub added in v0.0.8

type SSEHub[O any] struct {
	// contains filtered or unexported fields
}

SSEHub manages a collection of SSE connections, providing session tracking, targeted delivery, and broadcast capabilities. It is the SSE counterpart to the WebSocket hub/room pattern used in the grpcws-demo's GameHub.

SSEHub is instance-based (not a package-level global) for testability and to support multiple independent hubs in the same process.

Type parameter O is the output message type sent to clients.

Usage:

hub := gohttp.NewSSEHub[MyEvent]()

// In your SSEConn.OnStart:
hub.Register(conn)

// In your SSEConn.OnClose:
hub.Unregister(conn.ConnId())

// From application code:
hub.Broadcast(MyEvent{Type: "update", Data: ...})
hub.Send(sessionId, MyEvent{Type: "direct", Data: ...})

// On graceful shutdown:
hub.CloseAll()
Example

ExampleSSEHub demonstrates using SSEHub for broadcasting events to multiple SSE connections. The hub tracks connections by ID and supports targeted delivery and broadcast.

package main

import (
	"fmt"

	gohttp "github.com/panyam/servicekit/http"
)

func main() {
	hub := gohttp.NewSSEHub[any]()

	// Broadcast to all connected clients
	hub.Broadcast(map[string]any{"type": "alert", "msg": "hello"})

	// Send to a specific client
	hub.Send("session-123", map[string]any{"type": "direct"})

	// Send a named event
	hub.BroadcastEvent("notification", map[string]any{"text": "update"})

	// On shutdown, close all connections
	hub.CloseAll()

	fmt.Println("Hub operations complete")
}
Output:
Hub operations complete

func NewSSEHub added in v0.0.8

func NewSSEHub[O any]() *SSEHub[O]

NewSSEHub creates a new SSEHub for managing SSE connections.

func (*SSEHub[O]) Broadcast added in v0.0.8

func (h *SSEHub[O]) Broadcast(msg O)

Broadcast sends a message to all registered connections. The message is queued to each connection's Writer independently, so a slow connection does not block others.

func (*SSEHub[O]) BroadcastEvent added in v0.0.8

func (h *SSEHub[O]) BroadcastEvent(event string, msg O)

BroadcastEvent sends a named event to all registered connections. The event type is set via the SSE "event:" field.

func (*SSEHub[O]) BroadcastEventWithID added in v0.0.19

func (h *SSEHub[O]) BroadcastEventWithID(event, id string, msg O)

BroadcastEventWithID sends a named event with an ID to all registered connections. The ID is set via the SSE "id:" field.

func (*SSEHub[O]) CloseAll added in v0.0.8

func (h *SSEHub[O]) CloseAll()

CloseAll closes all registered connections by calling OnClose on each, then clears the connection map. Use this for graceful shutdown.

After CloseAll, the hub is empty and can be reused.

func (*SSEHub[O]) Count added in v0.0.8

func (h *SSEHub[O]) Count() int

Count returns the number of currently registered connections.

func (*SSEHub[O]) Register added in v0.0.8

func (h *SSEHub[O]) Register(conn *BaseSSEConn[O])

Register adds an SSE connection to the hub, keyed by its ConnId. If a connection with the same ID already exists, it is replaced (the old connection is NOT closed — the caller is responsible for lifecycle management).

func (*SSEHub[O]) Send added in v0.0.8

func (h *SSEHub[O]) Send(connId string, msg O) bool

Send delivers a message to a specific connection by ID. Returns true if the connection was found and the message was queued. Returns false if the connection ID does not exist (no error, no panic).

func (*SSEHub[O]) SendEvent added in v0.0.8

func (h *SSEHub[O]) SendEvent(connId string, event string, msg O) bool

SendEvent delivers a named event to a specific connection by ID. The event type is set via the SSE "event:" field. Returns true if the connection was found and the message was queued. Returns false if the connection ID does not exist.

func (*SSEHub[O]) SendEventWithID added in v0.0.19

func (h *SSEHub[O]) SendEventWithID(connId, event, id string, msg O) bool

SendEventWithID delivers a named event with an ID to a specific connection. The ID is set via the SSE "id:" field, enabling client reconnection via the Last-Event-ID header. Returns true if the connection was found and the message was queued. Returns false if the connection ID does not exist.

func (*SSEHub[O]) Unregister added in v0.0.8

func (h *SSEHub[O]) Unregister(connId string)

Unregister removes an SSE connection from the hub by its ConnId. The connection's OnClose is NOT called — the caller manages the connection lifecycle (typically SSEServe handles OnClose via defer).

Unregistering a nonexistent ID is a no-op.

type SSEOutgoingMessage added in v0.0.8

type SSEOutgoingMessage[O any] struct {
	// Data is a regular output message. Encoded via Codec, sent as SSE "data:" field.
	// Mutually exclusive with Comment.
	Data *O

	// Event is the SSE event type ("event:" field). Only used with Data.
	// If empty, the event type defaults to "message" on the client side.
	Event string

	// ID is the SSE event ID ("id:" field). Only used with Data.
	// Enables client reconnection via the Last-Event-ID header.
	ID string

	// Retry is the reconnection delay hint in milliseconds, emitted as the
	// SSE "retry:" field. When non-zero, the client uses this value as its
	// next reconnection delay if the connection drops. Zero or negative
	// values are ignored (no retry line written).
	//
	// Can be sent alongside Data (as a combined event+hint) or as a bare
	// hint via SendRetry (no Data, just the retry line).
	Retry int

	// Comment is an SSE comment line (for keepalive). Mutually exclusive with Data.
	// Sent as ": {comment}\n\n".
	Comment string
}

SSEOutgoingMessage represents a message to be sent over an SSE connection. This union type allows data messages and keepalive comments to go through the same Writer, avoiding concurrent write issues.

Per WHATWG SSE spec (https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream):

  • "event:" sets the event type (clients listen via addEventListener)
  • "data:" carries the payload (multiple lines joined with "\n")
  • "id:" sets the last event ID (used for reconnection via Last-Event-ID header)
  • "retry:" sets the client's reconnection delay in milliseconds
  • Lines starting with ":" are comments (used for keepalive)

type SSEReadEvent added in v0.0.16

type SSEReadEvent struct {
	Event   string // "event:" field value
	Data    string // "data:" field(s), joined with "\n" for multi-line
	ID      string // "id:" field value
	Retry   int    // "retry:" field value in ms (0 = not set)
	Comment string // Comment text (lines starting with ":")
}

SSEReadEvent represents a single parsed event from an SSE stream.

Fields follow the WHATWG Server-Sent Events spec: https://html.spec.whatwg.org/multipage/server-sent-events.html#concept-event-stream-event-type

  • Event: the "event:" field (empty = unnamed, defaults to "message" on client side)
  • Data: the "data:" field(s), joined with "\n" for multi-line data
  • ID: the "id:" field (empty = not set)
  • Retry: the "retry:" field in milliseconds (0 = not set or invalid)
  • Comment: text from comment lines (lines starting with ":"), last value kept

type SingleResponse added in v0.0.9

type SingleResponse struct {
	StatusCode int // HTTP status code (default 200)
	Body       any // JSON-marshaled response body
}

SingleResponse sends a single JSON response. This is the synchronous path — the server marshals Body as JSON and returns it with Content-Type: application/json.

If StatusCode is 0, it defaults to 200.

type StoredEvent added in v0.0.19

type StoredEvent struct {
	// ID is the SSE event ID ("id:" field). Used by clients to resume
	// via the Last-Event-ID header on reconnection.
	ID string

	// Event is the SSE event type ("event:" field). If empty, clients
	// receive the event via the default "message" handler.
	Event string

	// Data is the raw event payload (pre-serialized bytes). Stored as-is
	// to avoid re-serialization on replay.
	Data []byte
}

StoredEvent represents a single SSE event that has been persisted for potential replay. It captures the three fields needed to reconstruct the original SSE wire format: event type, event ID, and raw data.

type StreamResponse added in v0.0.9

type StreamResponse struct {
	Events <-chan SSEEvent
}

StreamResponse streams SSE events from the Events channel. This is the asynchronous path — the server sets Content-Type: text/event-stream and writes each event from the channel until it is closed or the client disconnects.

The handler creates the channel, spawns a goroutine to write events, and closes the channel when done. StreamableServe handles SSE formatting, flushing, and client disconnect detection.

Per WHATWG SSE spec: https://html.spec.whatwg.org/multipage/server-sent-events.html

type StreamableConfig added in v0.0.9

type StreamableConfig struct {
	// Codec for serializing SSE event Data fields.
	// Only Encode() is used. Default: JSONCodec.
	Codec Codec[any, any]
}

StreamableConfig controls StreamableServe behavior.

func DefaultStreamableConfig added in v0.0.9

func DefaultStreamableConfig() *StreamableConfig

DefaultStreamableConfig returns a StreamableConfig with sensible defaults.

type StreamableHandlerFunc added in v0.0.9

type StreamableHandlerFunc func(ctx context.Context, r *http.Request) StreamableResponse

StreamableHandlerFunc processes an HTTP request and returns either a SingleResponse (synchronous JSON) or StreamResponse (SSE event stream).

The handler receives the request context and the original request. It decides whether to respond synchronously or stream based on the request content, Accept header, or application logic.

Example:

func myHandler(ctx context.Context, r *http.Request) gohttp.StreamableResponse {
    if wantsStreaming(r) {
        ch := make(chan gohttp.SSEEvent)
        go func() {
            defer close(ch)
            ch <- gohttp.SSEEvent{Data: map[string]any{"progress": 50}}
            ch <- gohttp.SSEEvent{Data: map[string]any{"progress": 100}}
        }()
        return gohttp.StreamResponse{Events: ch}
    }
    return gohttp.SingleResponse{Body: map[string]any{"result": "ok"}}
}

type StreamableResponse added in v0.0.9

type StreamableResponse interface {
	// contains filtered or unexported methods
}

StreamableResponse is the return type for StreamableHandlerFunc. It is a sealed interface — only SingleResponse and StreamResponse are valid implementations.

This pattern enables the "POST-that-optionally-streams" transport introduced by MCP 2025-03-26, where a single endpoint returns either a synchronous JSON response or an SSE event stream.

type TypedJSONCodec added in v0.0.4

type TypedJSONCodec[I any, O any] struct{}

TypedJSONCodec handles encoding/decoding of strongly-typed JSON messages. Use this when you have known Go struct types for your messages.

Example

ExampleTypedJSONCodec demonstrates using TypedJSONCodec for strongly-typed JSON message encoding/decoding. Use this when your message types are known Go structs, for compile-time type safety.

package main

import (
	"fmt"

	gohttp "github.com/panyam/servicekit/http"
)

func main() {
	type ChatMessage struct {
		User string `json:"user"`
		Text string `json:"text"`
	}
	type ChatResponse struct {
		Status string `json:"status"`
		ID     int    `json:"id"`
	}

	codec := &gohttp.TypedJSONCodec[ChatMessage, ChatResponse]{}

	// Encode a response
	data, msgType, err := codec.Encode(ChatResponse{Status: "sent", ID: 42})
	if err != nil {
		panic(err)
	}
	fmt.Printf("Type: %d, Data: %s\n", msgType, data)

	// Decode an incoming message
	msg, err := codec.Decode([]byte(`{"user":"alice","text":"hello"}`), gohttp.TextMessage)
	if err != nil {
		panic(err)
	}
	fmt.Printf("User: %s, Text: %s\n", msg.User, msg.Text)

}
Output:
Type: 1, Data: {"status":"sent","id":42}
User: alice, Text: hello

func (*TypedJSONCodec[I, O]) Decode added in v0.0.4

func (c *TypedJSONCodec[I, O]) Decode(data []byte, msgType MessageType) (I, error)

Decode unmarshals JSON data into a typed value. Creates a new instance of I using Go's zero value mechanism.

func (*TypedJSONCodec[I, O]) Encode added in v0.0.4

func (c *TypedJSONCodec[I, O]) Encode(msg O) ([]byte, MessageType, error)

Encode marshals a typed value to JSON bytes.

type URLWaiter

type URLWaiter struct {
	Method             string
	Url                string
	Headers            map[string]string
	Payload            map[string]any
	DelayBetweenChecks time.Duration

	// Func versions of above so we can do something dynamcially on each iteration
	RequestFunc  func(iter int, prevError error) (*http.Request, error)
	ValidateFunc func(req *http.Request, resp *http.Response) error
}

A simple utility that waits for a url to return a successful response before proceeding. This can be used for things like waiting for a database or another service to become available before performing other activities.

func (*URLWaiter) Run

func (u *URLWaiter) Run() (success bool, iter int, err error)

Runs the "waiter".

Returns a tuple of:

	success - whether the waited-upon eventually became active.
	iter    - How many iterations where run before a success/failure
 	err     - Error encountered if the "wait" was a failure

type WSConn

type WSConn[I any] interface {
	BiDirStreamConn[I]

	// ReadMessage reads and decodes the next message from the WebSocket connection.
	// This is called in a loop by WSHandleConn to process incoming messages.
	// Returns the decoded message or an error (including io.EOF on close).
	ReadMessage(w *websocket.Conn) (I, error)

	// OnStart is called when the WebSocket connection is established.
	// Use this to initialize the connection (e.g., set up writers, start goroutines).
	// Return an error to reject and close the connection.
	OnStart(conn *websocket.Conn) error
}

WSConn represents a bidirectional WebSocket connection that can handle typed messages of type I. It extends BiDirStreamConn with WebSocket-specific functionality for reading messages and connection initialization.

Implementations typically embed BaseConn[I, O] and override HandleMessage. The type parameter I represents the input message type received from clients.

type WSConnConfig

type WSConnConfig struct {
	*BiDirStreamConfig
	// Upgrader handles the HTTP to WebSocket protocol upgrade.
	// Configure ReadBufferSize, WriteBufferSize, and CheckOrigin as needed.
	Upgrader websocket.Upgrader
}

WSConnConfig combines BiDirStreamConfig with WebSocket-specific settings. It controls connection upgrade behavior and lifecycle timing.

func DefaultWSConnConfig

func DefaultWSConnConfig() *WSConnConfig

DefaultWSConnConfig returns a WSConnConfig with sensible defaults:

  • ReadBufferSize: 1024 bytes
  • WriteBufferSize: 1024 bytes
  • CheckOrigin: allows all origins (configure for production!)
  • PingPeriod: 30 seconds
  • PongPeriod: 300 seconds (5 minutes)

type WSHandler

type WSHandler[I any, S WSConn[I]] interface {
	// Validate checks if the HTTP request should be upgraded to a WebSocket.
	// Return (connection, true) to proceed with the upgrade.
	// Return (nil, false) to reject (the handler should write the error response).
	Validate(w http.ResponseWriter, r *http.Request) (S, bool)
}

WSHandler validates HTTP requests and creates WebSocket connections. It acts as a factory for WSConn instances, typically performing authentication and authorization before allowing the upgrade.

Type parameters:

  • I: The input message type that the connection will handle
  • S: The specific WSConn implementation type (must implement WSConn[I])

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL