Documentation
¶
Overview ¶
Package http provides utilities for HTTP request handling and production-grade WebSocket connections.
This package contains two main areas of functionality:
- HTTP utilities for simplified request/response handling
- WebSocket abstractions built on Gorilla WebSocket for real-time applications
WebSocket Framework ¶
The WebSocket framework provides a production-ready abstraction over Gorilla WebSocket with automatic connection management, heartbeat detection, and lifecycle hooks.
## Key Features
- Production-grade connection management with automatic heartbeat detection
- Lifecycle hooks for connection start, close, timeout, and error handling
- Thread-safe message broadcasting to multiple clients
- Automatic ping-pong mechanism to prevent connection timeouts
- Type-safe message handling with Go generics
- Pluggable Codec system for flexible message encoding (JSON, Protobuf)
- Generic BaseConn[I, O] for typed input/output messages
- Built-in JSON message support with JSONConn
- Configurable timeouts and intervals for different deployment scenarios
## Quick Start
The simplest way to create a WebSocket endpoint is using the built-in JSONConn:
type EchoConn struct {
gohttp.JSONConn
}
func (e *EchoConn) HandleMessage(msg any) error {
// Echo the message back to the client
e.Writer.Send(conc.Message[any]{Value: msg})
return nil
}
type EchoHandler struct{}
func (h *EchoHandler) Validate(w http.ResponseWriter, r *http.Request) (*EchoConn, bool) {
return &EchoConn{}, true // Accept all connections
}
// Register with HTTP router
router.HandleFunc("/echo", gohttp.WSServe(&EchoHandler{}, nil))
## Architecture
The framework uses three main interfaces:
### WSConn[I any] Represents a WebSocket connection that can handle typed messages:
type WSConn[I any] interface {
BiDirStreamConn[I]
ReadMessage(w *websocket.Conn) (I, error)
OnStart(conn *websocket.Conn) error
}
### WSHandler[I any, S WSConn[I]] Validates HTTP requests and creates WebSocket connections:
type WSHandler[I any, S WSConn[I]] interface {
Validate(w http.ResponseWriter, r *http.Request) (S, bool)
}
### BiDirStreamConn[I any] Provides lifecycle and message handling methods:
type BiDirStreamConn[I any] interface {
SendPing() error
Name() string
ConnId() string
HandleMessage(msg I) error
OnError(err error) error
OnClose()
OnTimeout() bool
}
## Advanced Usage
### Multi-user Chat Server
type ChatServer struct {
clients map[string]*ChatConn
rooms map[string]*Room
mu sync.RWMutex
}
type ChatConn struct {
gohttp.JSONConn
server *ChatServer
username string
roomName string
}
func (c *ChatConn) OnStart(conn *websocket.Conn) error {
if err := c.JSONConn.OnStart(conn); err != nil {
return err
}
// Register with server
c.server.mu.Lock()
c.server.clients[c.ConnId()] = c
c.server.mu.Unlock()
return nil
}
func (c *ChatConn) HandleMessage(msg any) error {
msgMap := msg.(map[string]any)
switch msgMap["type"].(string) {
case "chat":
// Broadcast to all clients in room
c.server.broadcastToRoom(c.roomName, msgMap)
case "join_room":
// Switch rooms
c.joinRoom(msgMap["room"].(string))
}
return nil
}
### Authentication and Authorization
type SecureConn struct {
gohttp.JSONConn
userID string
roles []string
}
func (s *SecureConn) HandleMessage(msg any) error {
msgMap := msg.(map[string]any)
messageType := msgMap["type"].(string)
if !s.hasPermission(messageType) {
errorMsg := map[string]any{
"type": "error",
"error": "Insufficient permissions",
}
s.Writer.Send(conc.Message[any]{Value: errorMsg})
return nil
}
// Process authorized message
return s.processMessage(msgMap)
}
type AuthHandler struct{}
func (h *AuthHandler) Validate(w http.ResponseWriter, r *http.Request) (*SecureConn, bool) {
token := r.Header.Get("Authorization")
// Validate JWT token
claims, err := validateJWT(token)
if err != nil {
http.Error(w, "Invalid token", http.StatusUnauthorized)
return nil, false
}
return &SecureConn{
userID: claims["userID"].(string),
roles: claims["roles"].([]string),
}, true
}
### Custom Ping-Pong with Latency Tracking
type MonitoredConn struct {
gohttp.JSONConn
lastPingTime time.Time
pingCount int64
}
func (m *MonitoredConn) SendPing() error {
m.pingCount++
m.lastPingTime = time.Now()
pingMsg := map[string]any{
"type": "ping",
"pingId": m.pingCount,
"timestamp": m.lastPingTime.Unix(),
}
m.Writer.Send(conc.Message[any]{Value: pingMsg})
return nil
}
func (m *MonitoredConn) HandleMessage(msg any) error {
msgMap := msg.(map[string]any)
if msgMap["type"].(string) == "pong" {
latency := time.Since(m.lastPingTime)
log.Printf("Ping-pong latency: %v", latency)
}
return nil
}
## Configuration
### Production Configuration
config := &gohttp.WSConnConfig{
BiDirStreamConfig: &gohttp.BiDirStreamConfig{
PingPeriod: time.Second * 30, // Send ping every 30 seconds
PongPeriod: time.Second * 300, // Timeout after 5 minutes
},
Upgrader: websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool {
// Implement proper origin checking
return isValidOrigin(r.Header.Get("Origin"))
},
},
}
router.HandleFunc("/ws", gohttp.WSServe(handler, config))
### Development Configuration
config := gohttp.DefaultWSConnConfig() config.PingPeriod = time.Second * 10 // Faster pings for development config.PongPeriod = time.Second * 30 // Shorter timeout
## Frontend Integration
The framework is designed to work seamlessly with JavaScript WebSocket clients:
class WebSocketClient {
constructor(url) {
this.url = url;
this.ws = null;
this.pingInterval = null;
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
this.startPingInterval();
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
}
handleMessage(message) {
switch (message.type) {
case 'ping':
// Respond to server ping
this.send({type: 'pong', pingId: message.pingId});
break;
default:
this.onMessage(message);
}
}
startPingInterval() {
this.pingInterval = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.send({type: 'ping', timestamp: Date.now()});
}
}, 25000);
}
}
## Error Handling and Resilience
The framework provides robust error handling:
type ResilientConn struct {
gohttp.JSONConn
errorCount int
maxErrors int
}
func (r *ResilientConn) OnError(err error) error {
r.errorCount++
log.Printf("WebSocket error #%d: %v", r.errorCount, err)
if r.errorCount > r.maxErrors {
return err // Close connection after too many errors
}
return nil // Continue with connection
}
func (r *ResilientConn) OnTimeout() bool {
log.Printf("Connection timeout for %s", r.ConnId())
return true // Close the connection
}
## Best Practices
- Always call parent OnStart/OnClose when embedding JSONConn
- Implement proper authentication in the Validate method
- Use connection limits to prevent resource exhaustion
- Handle errors gracefully without crashing the server
- Clean up resources properly in OnClose methods
- Use mutexes for thread-safe operations on shared state
- Monitor connection health with metrics
## Common Patterns
### Hub Pattern for Broadcasting
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
}
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
delete(h.clients, client)
case message := <-h.broadcast:
for client := range h.clients {
client.send <- message
}
}
}
}
### Room-based Messaging
type Room struct {
name string
clients map[string]*Client
mu sync.RWMutex
}
func (r *Room) Broadcast(message any, excludeId string) {
r.mu.RLock()
defer r.mu.RUnlock()
for id, client := range r.clients {
if id != excludeId {
client.Send(message)
}
}
}
## Testing
The package includes comprehensive test utilities for WebSocket testing:
// Create test server
server := httptest.NewServer(router)
defer server.Close()
// Create WebSocket client
wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/endpoint"
conn, err := createTestClient(t, wsURL, nil)
if err != nil {
t.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
See the comprehensive test suite in ws2_test.go for complete examples of testing WebSocket applications including authentication, load testing, and error scenarios.
HTTP Utilities ¶
The package also provides utilities for HTTP request/response handling:
- JsonToQueryString: Convert maps to URL query strings
- SendJsonResponse: Send JSON responses with proper error handling
- ErrorToHttpCode: Convert Go errors to appropriate HTTP status codes
- WSConnWriteMessage/WSConnWriteError: WebSocket message writing utilities
- NormalizeWsUrl: Convert HTTP URLs to WebSocket URLs
Example HTTP utility usage:
func handleAPI(w http.ResponseWriter, r *http.Request) {
data, err := processRequest(r)
gohttp.SendJsonResponse(w, data, err)
}
This comprehensive framework enables building production-ready real-time applications with WebSocket communication, from simple echo servers to complex multi-user systems with authentication, rooms, and advanced connection management.
Example ¶
Example demonstrating complete WebSocket workflow
// Create router and server
router := mux.NewRouter()
// Chat server setup
chatServer := &ChatServer{
clients: make(map[string]*ChatConn),
rooms: make(map[string]*Room),
}
// Add handlers
router.HandleFunc("/echo", WSServe(&EchoHandler{}, nil))
router.HandleFunc("/chat", WSServe(&ChatHandler{server: chatServer}, nil))
router.HandleFunc("/secure", WSServe(&AuthHandler{}, nil))
// Custom configuration for production
config := &WSConnConfig{
BiDirStreamConfig: &BiDirStreamConfig{
PingPeriod: time.Second * 30,
PongPeriod: time.Second * 300,
},
Upgrader: websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool {
// Add proper origin checking in production
return true
},
},
}
router.HandleFunc("/production", WSServe(&EchoHandler{}, config))
// In a real application, you would start the server:
// log.Fatal(http.ListenAndServe(":8080", router))
fmt.Println("WebSocket server configured with multiple endpoints:")
fmt.Println("- /echo: Simple echo server")
fmt.Println("- /chat: Multi-user chat with rooms")
fmt.Println("- /secure: Authenticated connections")
fmt.Println("- /production: Production-ready config")
Output: WebSocket server configured with multiple endpoints: - /echo: Simple echo server - /chat: Multi-user chat with rooms - /secure: Authenticated connections - /production: Production-ready config
Index ¶
- Constants
- Variables
- func CORS(next http.Handler) http.Handlerdeprecated
- func Call(req *http.Request, client *http.Client) (response any, err error)
- func DetectBatch(body []byte) bool
- func DoWithAuthRetry(cfg *AuthRetryConfig, buildReq func() (*http.Request, error), ...) (*http.Response, error)
- func ErrorToHttpCode(err error) int
- func GenerateSessionID() string
- func HTTPErrorCode(err error) int
- func IsHTTPTransient(statusCode int) bool
- func JsonGet(url string, onReq func(req *http.Request)) (any, *http.Response, error)
- func JsonToQueryString(json map[string]any) string
- func ListenAndServeGraceful(srv *http.Server, opts ...GracefulOption) error
- func MakeUrl(host, path string, args string) (url string)
- func NewBytesRequest(method string, endpoint string, body []byte) (req *http.Request, err error)
- func NewJsonRequest(method string, endpoint string, body map[string]any) (req *http.Request, err error)
- func NewRequest(method string, endpoint string, bodyReader io.Reader) (req *http.Request, err error)
- func NormalizeWsUrl(httpOrWsUrl string) string
- func ParseAcceptTypes(accept string) (acceptsJSON, acceptsSSE bool)
- func ParseWWWAuthenticate(header string) (resourceMetadata string, scopes []string, err error)
- func ReadFrame(r *bufio.Reader) ([]byte, error)
- func ResolveURL(baseURL, ref string) (string, error)
- func SSEServe[O any, S SSEConn[O]](handler SSEHandler[O, S], config *SSEConnConfig) http.HandlerFunc
- func SendJsonResponse(writer http.ResponseWriter, resp any, err error)
- func SplitBatch(body []byte) ([]json.RawMessage, error)
- func StreamableServe(handler StreamableHandlerFunc, config *StreamableConfig) http.HandlerFunc
- func WSConnJSONReaderWriter(conn *websocket.Conn) (reader *conc.Reader[gut.StrMap], writer *conc.Writer[conc.Message[gut.StrMap]])
- func WSConnWriteError(wsConn *websocket.Conn, err error) error
- func WSConnWriteMessage(wsConn *websocket.Conn, msg any) error
- func WSHandleConn[I any, S WSConn[I]](conn *websocket.Conn, ctx S, config *WSConnConfig)
- func WSServe[I any, S WSConn[I]](handler WSHandler[I, S], config *WSConnConfig) http.HandlerFunc
- func WriteFrame(w io.Writer, data []byte) error
- type AtomicIDGen
- type AuthRetryConfig
- type AuthRetryError
- type BaseConn
- func (b *BaseConn[I, O]) ConnId() string
- func (b *BaseConn[I, O]) DebugInfo() any
- func (b *BaseConn[I, O]) HandleMessage(msg I) error
- func (b *BaseConn[I, O]) InputChan() chan<- OutgoingMessage[O]
- func (b *BaseConn[I, O]) Name() string
- func (b *BaseConn[I, O]) OnClose()
- func (b *BaseConn[I, O]) OnError(err error) error
- func (b *BaseConn[I, O]) OnStart(conn *websocket.Conn) error
- func (b *BaseConn[I, O]) OnTimeout() bool
- func (b *BaseConn[I, O]) ReadMessage(conn *websocket.Conn) (I, error)
- func (b *BaseConn[I, O]) SendError(err error)
- func (b *BaseConn[I, O]) SendOutput(msg O)
- func (b *BaseConn[I, O]) SendPing() error
- type BaseHandler
- type BaseSSEConn
- func (b *BaseSSEConn[O]) Close()
- func (b *BaseSSEConn[O]) ConnId() string
- func (b *BaseSSEConn[O]) Done() <-chan struct{}
- func (b *BaseSSEConn[O]) InputChan() chan<- SSEOutgoingMessage[O]
- func (b *BaseSSEConn[O]) Name() string
- func (b *BaseSSEConn[O]) OnClose()
- func (b *BaseSSEConn[O]) OnStart(w http.ResponseWriter, r *http.Request) error
- func (b *BaseSSEConn[O]) Ready() <-chan struct{}
- func (b *BaseSSEConn[O]) SendEvent(event string, msg O)
- func (b *BaseSSEConn[O]) SendEventWithID(event string, id string, msg O)
- func (b *BaseSSEConn[O]) SendKeepalive()
- func (b *BaseSSEConn[O]) SendOutput(msg O)
- func (b *BaseSSEConn[O]) SendRetry(ms int)
- type BiDirStreamConfig
- type BiDirStreamConn
- type BinaryProtoCodec
- type Codec
- type EventStore
- type GracefulOption
- type HTTPError
- type HTTPStatusError
- type IDGen
- type JSONCodec
- type JSONConn
- type JSONHandler
- type JSONSSEConn
- type JSONSSEHandler
- type MemoryEventStore
- type MessageType
- type OutgoingMessage
- type PingData
- type ProtoJSONCodec
- type SSEConn
- type SSEConnConfig
- type SSEEvent
- type SSEEventReader
- type SSEHandler
- type SSEHub
- func (h *SSEHub[O]) Broadcast(msg O)
- func (h *SSEHub[O]) BroadcastEvent(event string, msg O)
- func (h *SSEHub[O]) BroadcastEventWithID(event, id string, msg O)
- func (h *SSEHub[O]) CloseAll()
- func (h *SSEHub[O]) Count() int
- func (h *SSEHub[O]) Register(conn *BaseSSEConn[O])
- func (h *SSEHub[O]) Send(connId string, msg O) bool
- func (h *SSEHub[O]) SendEvent(connId string, event string, msg O) bool
- func (h *SSEHub[O]) SendEventWithID(connId, event, id string, msg O) bool
- func (h *SSEHub[O]) Unregister(connId string)
- type SSEOutgoingMessage
- type SSEReadEvent
- type SingleResponse
- type StoredEvent
- type StreamResponse
- type StreamableConfig
- type StreamableHandlerFunc
- type StreamableResponse
- type TypedJSONCodec
- type URLWaiter
- type WSConn
- type WSConnConfig
- type WSHandler
Examples ¶
Constants ¶
const MaxErrorBodySize = 16 << 10 // 16 KB
MaxErrorBodySize is the maximum number of bytes read from an HTTP error response body. Prevents memory exhaustion from malicious or misconfigured servers returning oversized error payloads. Shared by DoWithAuthRetry and any other error-body readers in this package.
Variables ¶
var DefaultHttpClient *http.Client
var HighQPSHttpClient *http.Client
var LowQPSHttpClient *http.Client
var MediumQPSHttpClient *http.Client
Functions ¶
func CORS
deprecated
CORS is a convenience wrapper for local development that allows all origins. For production use, prefer middleware.CORS with an OriginChecker.
Deprecated: Use middleware.CORS(nil) for allow-all, or middleware.CORS(middleware.NewOriginChecker(origins)) for production.
func Call ¶
Makes a http with the tiven request and the http client. This is a wrapper over the standard library caller that creates a Client (if not provided), performs the request reads the entire body adn optionally converts the payload to an appropriate type based on the response' Content-Type header (for now only application/json is supported.
func DetectBatch ¶ added in v0.0.20
DetectBatch returns true if body is a JSON array, indicating a JSON-RPC 2.0 batch request. Skips leading whitespace before checking the first non-whitespace byte for '['.
Per JSON-RPC 2.0 spec (Section 6): "To send several Request objects at the same time, the Client MAY send an Array filled with Request objects."
func DoWithAuthRetry ¶ added in v0.0.18
func DoWithAuthRetry( cfg *AuthRetryConfig, buildReq func() (*http.Request, error), do func(*http.Request) (*http.Response, error), ) (*http.Response, error)
DoWithAuthRetry executes an HTTP request with automatic retry on 401/403.
Retry budget: max 1 retry for 401 (token refresh), max 1 retry for 403 (scope step-up). Total max 2 retries per request.
If cfg is nil, no auth handling is performed — 401/403 responses are returned as AuthRetryError.
buildReq must create a new *http.Request each call (the body may be consumed on the previous attempt). do is typically httpClient.Do.
func ErrorToHttpCode ¶
ErrorToHttpCode converts a Go error to an appropriate HTTP status code. If err is nil, returns http.StatusOK (200). If err contains a gRPC status, maps it to the corresponding HTTP code:
- codes.PermissionDenied → 403 Forbidden
- codes.NotFound → 404 Not Found
- codes.AlreadyExists → 409 Conflict
- codes.InvalidArgument → 400 Bad Request
- Other errors → 500 Internal Server Error
func GenerateSessionID ¶ added in v0.0.17
func GenerateSessionID() string
GenerateSessionID returns a cryptographically random 32-character hex string (128 bits of entropy) suitable for HTTP session identifiers.
Uses crypto/rand for security — the output is unpredictable and safe for use as session tokens, CSRF tokens, or any context where an attacker should not be able to guess the value.
Panics if the system's cryptographic random number generator fails, which indicates a serious system-level problem.
func HTTPErrorCode ¶
func IsHTTPTransient ¶ added in v0.0.17
IsHTTPTransient returns true if the HTTP status code indicates a transient server error (5xx) that may succeed on retry. Client errors (4xx) and successes (2xx) return false.
Use this to decide whether to retry a failed HTTP request:
- 500, 502, 503, 504: transient (server overload, gateway timeout)
- 400, 401, 403, 404: terminal (client error, won't change on retry)
func JsonGet ¶
A simple wrapper for performing JSON Get requests. The url is the full url once all query params have been added. The onReq callback allows customization of the http requests before it is sent.
func JsonToQueryString ¶
JsonToQueryString converts a map to a URL query string. Keys are sorted alphabetically for deterministic output. Values are converted to strings using fmt.Sprintf("%v", value).
Example:
params := map[string]any{"name": "John", "age": 30}
qs := JsonToQueryString(params) // "age=30&name=John"
Example ¶
input := map[string]any{"a": 1, "b": 2}
queryStr := JsonToQueryString(input)
fmt.Println(queryStr)
Output: a=1&b=2
func ListenAndServeGraceful ¶ added in v0.0.9
func ListenAndServeGraceful(srv *http.Server, opts ...GracefulOption) error
ListenAndServeGraceful starts the HTTP server and handles graceful shutdown. It blocks until a shutdown signal is received (OS signal or context cancellation), then drains active connections within the configured timeout.
Shutdown lifecycle:
- Start srv.ListenAndServe() in a goroutine
- Block until: OS signal, parent context cancellation, or server error
- Call OnShutdown callbacks (e.g. SSEHub.CloseAll()) — connections still open
- Call srv.Shutdown() with drain timeout — waits for in-flight handlers
- Return nil on clean shutdown, or the error
Example:
srv := &http.Server{Addr: ":8080", Handler: mux}
middleware.ApplyDefaults(srv)
srv.WriteTimeout = 0 // required for SSE
hub := gohttp.NewSSEHub[any]()
err := gohttp.ListenAndServeGraceful(srv,
gohttp.WithDrainTimeout(10*time.Second),
gohttp.WithOnShutdown(hub.CloseAll),
)
Example ¶
ExampleListenAndServeGraceful demonstrates graceful shutdown with signal handling and SSEHub integration. OnShutdown callbacks run before drain, so they can send goodbye events while connections are still open.
package main
import (
"log"
"net/http"
"time"
gohttp "github.com/panyam/servicekit/http"
)
func main() {
hub := gohttp.NewSSEHub[any]()
srv := &http.Server{
Addr: ":8080",
Handler: http.DefaultServeMux,
}
// Blocks until SIGTERM/SIGINT, then:
// 1. Calls hub.CloseAll() (closes SSE connections)
// 2. Drains HTTP requests for up to 10s
// 3. Returns nil
err := gohttp.ListenAndServeGraceful(srv,
gohttp.WithDrainTimeout(10*time.Second),
gohttp.WithOnShutdown(hub.CloseAll),
)
if err != nil {
log.Fatal(err)
}
}
Output:
func NewBytesRequest ¶
Wraps the NewRequest helper to create request to set the body from a byte array.
func NewJsonRequest ¶
func NewJsonRequest(method string, endpoint string, body map[string]any) (req *http.Request, err error)
Wraps the NewRequest helper to create a request with the payload marshalled as JSON.
func NewRequest ¶
func NewRequest(method string, endpoint string, bodyReader io.Reader) (req *http.Request, err error)
Creates a new http request with the given method, endpoint and a bodyready that provides the content the request body.
func NormalizeWsUrl ¶
NormalizeWsUrl converts an HTTP(S) URL to its WebSocket equivalent. It performs the following transformations:
- Removes trailing slashes
- Converts "http:" to "ws:"
- Converts "https:" to "wss:"
URLs that are already WebSocket URLs (ws: or wss:) are returned unchanged after removing any trailing slash.
Example:
NormalizeWsUrl("https://example.com/ws/") // "wss://example.com/ws"
Example ¶
fmt.Println(NormalizeWsUrl("http://google.com"))
fmt.Println(NormalizeWsUrl("https://github.com"))
Output: ws://google.com wss://github.com
func ParseAcceptTypes ¶ added in v0.0.17
ParseAcceptTypes parses an HTTP Accept header and returns whether application/json and text/event-stream are present.
Handles quality values (;q=N) and whitespace per RFC 7231 §5.3.2. The wildcard type */* matches both JSON and SSE.
func ParseWWWAuthenticate ¶ added in v0.0.18
ParseWWWAuthenticate extracts the resource_metadata URL and scopes from a WWW-Authenticate: Bearer header value per RFC 6750 §3.
Used by HTTP clients to:
- Discover the Protected Resource Metadata (PRM) endpoint from a 401 response
- Parse required scopes from a 403 insufficient_scope response
Handles both quoted ("value") and unquoted (value) parameter formats.
func ReadFrame ¶ added in v0.0.17
ReadFrame reads a Content-Length framed message from a buffered reader. It parses HTTP-style headers until an empty line (\r\n\r\n), extracts the Content-Length value, and reads exactly that many bytes as the message body.
Headers other than Content-Length are accepted but ignored, matching the LSP spec which allows Content-Type as an optional header.
Returns io.EOF if the reader is empty (clean stream shutdown).
func ResolveURL ¶ added in v0.0.17
ResolveURL resolves a URL reference against a base URL per RFC 3986 §5.2.2.
This handles three cases:
- Absolute URL ("http://host/path"): returned unchanged
- Absolute path ("/path?q=1"): inherits scheme+host from base
- Relative path ("path?q=1"): inherits scheme+host+directory from base
Used by SSE clients to resolve endpoint event URLs against the SSE connection URL, and generally useful for any URL reference resolution.
func SSEServe ¶ added in v0.0.8
func SSEServe[O any, S SSEConn[O]](handler SSEHandler[O, S], config *SSEConnConfig) http.HandlerFunc
SSEServe creates an http.HandlerFunc that establishes SSE connections and manages their lifecycle. This is the primary entry point for creating SSE endpoints, analogous to WSServe for WebSocket endpoints.
The handler validates incoming requests and creates connection instances. The config controls keepalive behavior; if nil, DefaultSSEConnConfig is used.
Example:
router.HandleFunc("/events", gohttp.SSEServe[MyEvent](&MySSEHandler{}, nil))
The lifecycle is:
- handler.Validate() is called to check the request
- SSE headers are set (Content-Type, Cache-Control, etc.)
- conn.OnStart() is called to initialize the connection
- Keepalive comments are sent at the configured interval
- On client disconnect (context cancellation), conn.OnClose() is called
Important: Set http.Server.WriteTimeout = 0 for SSE endpoints to prevent the server from closing long-lived connections. See middleware.ApplyDefaults.
Per WHATWG SSE spec: https://html.spec.whatwg.org/multipage/server-sent-events.html
Example ¶
ExampleSSEServe demonstrates setting up an SSE endpoint using SSEServe. The handler creates a connection that pushes events to the client. SSE endpoints require http.Server.WriteTimeout = 0 for long-lived streams.
package main
import (
"log"
"net/http"
"github.com/gorilla/mux"
gohttp "github.com/panyam/servicekit/http"
)
func main() {
router := mux.NewRouter()
router.HandleFunc("/events", gohttp.SSEServe[any](&gohttp.JSONSSEHandler{}, nil))
srv := &http.Server{
Addr: ":8080",
Handler: router,
WriteTimeout: 0, // Required for SSE!
}
log.Fatal(srv.ListenAndServe())
}
Output:
func SendJsonResponse ¶
func SendJsonResponse(writer http.ResponseWriter, resp any, err error)
SendJsonResponse writes a JSON response to the http.ResponseWriter. If err is nil, resp is marshaled to JSON and written with status 200 OK. If err is non-nil, an appropriate HTTP error code is set based on the gRPC status code (if present), and an error object is returned in the response body.
The function handles gRPC status errors by extracting the code and message, and maps them to appropriate HTTP status codes via ErrorToHttpCode.
func SplitBatch ¶ added in v0.0.20
func SplitBatch(body []byte) ([]json.RawMessage, error)
SplitBatch splits a JSON array into individual raw JSON messages. Returns an error if the body is not a valid JSON array.
Each element in the returned slice is a complete JSON value (object or otherwise) that can be independently unmarshaled as a JSON-RPC request.
func StreamableServe ¶ added in v0.0.9
func StreamableServe(handler StreamableHandlerFunc, config *StreamableConfig) http.HandlerFunc
StreamableServe creates an http.HandlerFunc that supports both synchronous JSON responses and SSE event streaming from a single endpoint. This is the "POST-that-optionally-streams" pattern used by MCP 2025-03-26 Streamable HTTP.
The handler function decides per-request whether to return a SingleResponse (JSON) or StreamResponse (SSE stream). StreamableServe handles content-type negotiation, SSE formatting, flushing, and client disconnect detection.
Example:
router.HandleFunc("/rpc", gohttp.StreamableServe(myHandler, nil))
For SSE streaming, set http.Server.WriteTimeout = 0 to prevent the server from closing long-lived connections.
Per WHATWG SSE spec: https://html.spec.whatwg.org/multipage/server-sent-events.html
Example ¶
ExampleStreamableServe demonstrates the POST-that-optionally-streams pattern from MCP 2025-03-26. The handler decides per-request whether to return a JSON response or an SSE event stream.
package main
import (
"context"
"fmt"
"net/http"
gohttp "github.com/panyam/servicekit/http"
)
func main() {
handler := gohttp.StreamableServe(
func(ctx context.Context, r *http.Request) gohttp.StreamableResponse {
// Check if client wants streaming
if r.Header.Get("Accept") == "text/event-stream" {
ch := make(chan gohttp.SSEEvent)
go func() {
defer close(ch)
ch <- gohttp.SSEEvent{Event: "progress", Data: map[string]any{"pct": 50}}
ch <- gohttp.SSEEvent{Event: "result", Data: map[string]any{"done": true}}
}()
return gohttp.StreamResponse{Events: ch}
}
// Default: synchronous JSON response
return gohttp.SingleResponse{Body: map[string]any{"result": "ok"}}
},
nil, // default config (JSONCodec)
)
http.HandleFunc("/rpc", handler)
fmt.Println("Streamable handler registered")
}
Output: Streamable handler registered
func WSConnJSONReaderWriter ¶
func WSConnJSONReaderWriter(conn *websocket.Conn) (reader *conc.Reader[gut.StrMap], writer *conc.Writer[conc.Message[gut.StrMap]])
WSConnJSONReaderWriter creates concurrent reader and writer for JSON messages over a WebSocket connection. The reader decodes incoming JSON messages into StrMap (map[string]any), and the writer sends outgoing StrMap messages as JSON.
The reader handles connection close errors gracefully by converting them to net.ErrClosed. The writer handles io.EOF as a normal stream end and uses WSConnWriteError for error messages.
This is useful for creating bidirectional JSON message streams over WebSocket.
func WSConnWriteError ¶
WSConnWriteError writes an error message to a WebSocket connection. If err is nil or io.EOF, no message is sent and nil is returned. For gRPC status errors, the error code is extracted and sent as JSON. The message is sent as a text frame containing JSON: {"error": <code>}
func WSConnWriteMessage ¶
WSConnWriteMessage writes a JSON message to a WebSocket connection. The message is marshaled to JSON and sent as a text frame. Returns any error from marshaling or writing.
func WSHandleConn ¶
func WSHandleConn[I any, S WSConn[I]](conn *websocket.Conn, ctx S, config *WSConnConfig)
WSHandleConn manages the lifecycle of an established WebSocket connection. It handles:
- Periodic ping messages for connection health checks
- Timeout detection when no data is received within PongPeriod
- Message reading and dispatching to ctx.HandleMessage()
- Error handling via ctx.OnError()
- Clean shutdown via ctx.OnClose()
This function is called automatically by WSServe, but can also be used directly when you have an established WebSocket connection from another source.
The function blocks until the connection is closed or an unrecoverable error occurs.
func WSServe ¶
func WSServe[I any, S WSConn[I]](handler WSHandler[I, S], config *WSConnConfig) http.HandlerFunc
WSServe creates an http.HandlerFunc that upgrades HTTP requests to WebSocket connections and manages their lifecycle. This is the primary entry point for creating WebSocket endpoints.
The handler validates incoming requests and creates connection instances. The config controls upgrade behavior and timing; if nil, DefaultWSConnConfig is used.
Example:
router.HandleFunc("/ws", gohttp.WSServe(&MyHandler{}, nil))
The lifecycle is:
- handler.Validate() is called to check the request
- If valid, the connection is upgraded to WebSocket
- conn.OnStart() is called to initialize the connection
- Messages are read and passed to conn.HandleMessage()
- On close, conn.OnClose() is called for cleanup
Example ¶
ExampleWSServe demonstrates the basic pattern for creating a WebSocket endpoint. The handler validates connections (auth, etc.) and WSServe manages the lifecycle (upgrade, heartbeats, message dispatch, cleanup).
r := mux.NewRouter()
r.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Publishing Custom Message")
})
r.HandleFunc("/subscribe", WSServe(&JSONHandler{}, nil))
srv := http.Server{Handler: r}
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal(err)
}
func WriteFrame ¶ added in v0.0.17
WriteFrame writes a Content-Length framed message in LSP format:
Content-Length: <n>\r\n\r\n<body>
This framing is used by the Language Server Protocol (LSP) and MCP's stdio transport to delimit JSON-RPC messages over byte streams (pipes, stdin/stdout).
Types ¶
type AtomicIDGen ¶ added in v0.0.19
type AtomicIDGen struct {
// contains filtered or unexported fields
}
AtomicIDGen is an IDGen backed by an atomic counter. It produces decimal string IDs ("1", "2", "3", ...). Suitable for per-session or per-stream ID generation where global uniqueness is not required.
Zero value is ready to use.
func (*AtomicIDGen) Next ¶ added in v0.0.19
func (g *AtomicIDGen) Next() string
Next returns the next unique ID as a decimal string.
type AuthRetryConfig ¶ added in v0.0.18
type AuthRetryConfig struct {
// SetAuth injects authentication credentials into a request.
// Typically sets the Authorization: Bearer header.
// Called before each request attempt (including retries).
SetAuth func(req *http.Request) error
// Should refresh the token so the next SetAuth call uses fresh credentials.
// Return nil to retry, non-nil error to give up.
OnUnauthorized func(resp *http.Response) error
// OnForbidden is called when the server returns 403.
// The response is provided so the handler can parse WWW-Authenticate
// for required scopes. Return nil to retry, non-nil error to give up.
OnForbidden func(resp *http.Response) error
}
AuthRetryConfig configures automatic retry on HTTP 401 (Unauthorized) and 403 (Forbidden) responses. This implements the standard OAuth 2.0 Bearer token retry pattern: refresh on 401, scope step-up on 403.
All fields are optional. If nil, the corresponding retry is skipped.
type AuthRetryError ¶ added in v0.0.18
type AuthRetryError struct {
// StatusCode is the HTTP status (401 or 403).
StatusCode int
// Message describes the failure (response body or callback error message).
Message string
// WWWAuthenticate is the raw WWW-Authenticate header from the server response.
WWWAuthenticate string
// RequiredScopes are the scopes parsed from WWW-Authenticate per RFC 6750.
// Populated automatically from the "scope" parameter. Empty if not present.
RequiredScopes []string
// Cause is the underlying error from the callback, if the failure was due
// to a callback error rather than retry exhaustion. Nil when retries were
// simply exhausted.
Cause error
}
AuthRetryError is returned when authentication retry fails (401/403 exhausted or callback returned an error). Captures the HTTP response metadata for diagnostic and programmatic use.
RequiredScopes is automatically parsed from the WWW-Authenticate header's "scope" parameter per RFC 6750 §3, so callers don't need to parse it themselves.
func (*AuthRetryError) Error ¶ added in v0.0.18
func (e *AuthRetryError) Error() string
func (*AuthRetryError) Unwrap ¶ added in v0.0.18
func (e *AuthRetryError) Unwrap() error
type BaseConn ¶ added in v0.0.4
type BaseConn[I any, O any] struct { // Codec handles message encoding/decoding. // Must be set before the connection is used. Codec Codec[I, O] // Writer is the output channel for sending messages. // Handles all outgoing messages: data, pings, and errors. // Initialized in OnStart. Writer *conc.Writer[OutgoingMessage[O]] // NameStr is an optional human-readable name for this connection. NameStr string // ConnIdStr is a unique identifier for this connection. // Auto-generated if not set. ConnIdStr string // PingId tracks the current ping sequence number. PingId int64 // contains filtered or unexported fields }
BaseConn is a generic WebSocket connection that separates transport from encoding. It uses a Codec to handle message serialization/deserialization.
Type parameters:
- I: Input message type (received from client)
- O: Output message type (sent to client)
Usage:
type MyConn struct {
gohttp.BaseConn[MyInput, MyOutput]
}
func (c *MyConn) HandleMessage(msg MyInput) error {
// msg is already typed!
return nil
}
func (*BaseConn[I, O]) ConnId ¶ added in v0.0.4
ConnId returns the connection ID, generating one if not set.
func (*BaseConn[I, O]) DebugInfo ¶ added in v0.0.4
DebugInfo returns debug information about the connection.
func (*BaseConn[I, O]) HandleMessage ¶ added in v0.0.4
HandleMessage processes an incoming message. Default implementation just logs; override in embedding struct.
func (*BaseConn[I, O]) InputChan ¶ added in v0.0.4
func (b *BaseConn[I, O]) InputChan() chan<- OutgoingMessage[O]
InputChan returns the Writer's input channel for use with FanOut.
func (*BaseConn[I, O]) OnClose ¶ added in v0.0.4
func (b *BaseConn[I, O]) OnClose()
OnClose cleans up when the connection closes.
func (*BaseConn[I, O]) OnError ¶ added in v0.0.4
OnError handles connection errors. Return nil to suppress the error and continue, or return the error to close.
func (*BaseConn[I, O]) OnStart ¶ added in v0.0.4
OnStart initializes the connection after WebSocket upgrade. Creates the Writer with codec-aware encoding.
func (*BaseConn[I, O]) OnTimeout ¶ added in v0.0.4
OnTimeout handles read timeout. Return true to close the connection, false to keep it alive.
func (*BaseConn[I, O]) ReadMessage ¶ added in v0.0.4
ReadMessage reads and decodes the next message from the WebSocket connection. Uses the configured Codec to decode the raw bytes.
func (*BaseConn[I, O]) SendOutput ¶ added in v0.0.4
func (b *BaseConn[I, O]) SendOutput(msg O)
SendOutput sends a typed output message to the client. This is a convenience method that wraps the Writer.Send call.
type BaseHandler ¶ added in v0.0.4
type BaseHandler[I any, O any] struct { // Codec is used for all connections created by this handler. Codec Codec[I, O] // Name is the optional name for connections. Name string }
BaseHandler is a simple handler that creates BaseConn instances. Useful for quick prototyping or when you don't need custom validation.
func (*BaseHandler[I, O]) Validate ¶ added in v0.0.4
func (h *BaseHandler[I, O]) Validate(w http.ResponseWriter, r *http.Request) (*BaseConn[I, O], bool)
Validate implements WSHandler. Creates a new BaseConn with the configured codec.
type BaseSSEConn ¶ added in v0.0.8
type BaseSSEConn[O any] struct { // Codec handles output message serialization. // Only Encode() is used (SSE has no input decoding). // Must be set before the connection is used. Codec Codec[any, O] // Writer serializes all outgoing messages through a single goroutine. // Initialized in OnStart. Use SendOutput/SendEvent instead of direct access. Writer *conc.Writer[SSEOutgoingMessage[O]] // NameStr is an optional human-readable name for this connection. NameStr string // ConnIdStr is a unique identifier for this connection. // Auto-generated if not set. ConnIdStr string // contains filtered or unexported fields }
BaseSSEConn is a generic SSE connection that handles message serialization and thread-safe writes. It mirrors BaseConn[I, O] but for write-only SSE streams.
Type parameter O is the output message type sent to clients.
Usage:
type MySSEConn struct {
gohttp.BaseSSEConn[MyEvent]
}
func (c *MySSEConn) OnStart(w http.ResponseWriter, r *http.Request) error {
if err := c.BaseSSEConn.OnStart(w, r); err != nil {
return err
}
// custom initialization
return nil
}
Important: For SSE endpoints, set http.Server.WriteTimeout = 0 to prevent the server from closing long-lived SSE connections. See middleware.ApplyDefaults documentation for details.
func (*BaseSSEConn[O]) Close ¶ added in v0.0.8
func (b *BaseSSEConn[O]) Close()
Close signals the connection to terminate. SSEServe will detect this via the Done channel and call OnClose.
func (*BaseSSEConn[O]) ConnId ¶ added in v0.0.8
func (b *BaseSSEConn[O]) ConnId() string
ConnId returns the connection ID, generating one if not set.
func (*BaseSSEConn[O]) Done ¶ added in v0.0.8
func (b *BaseSSEConn[O]) Done() <-chan struct{}
Done returns a channel that is closed when the connection should terminate. Use this to signal SSEServe to exit the event loop from application code.
func (*BaseSSEConn[O]) InputChan ¶ added in v0.0.8
func (b *BaseSSEConn[O]) InputChan() chan<- SSEOutgoingMessage[O]
InputChan returns the Writer's input channel for use with FanOut.
func (*BaseSSEConn[O]) Name ¶ added in v0.0.8
func (b *BaseSSEConn[O]) Name() string
Name returns the connection name.
func (*BaseSSEConn[O]) OnClose ¶ added in v0.0.8
func (b *BaseSSEConn[O]) OnClose()
OnClose cleans up the SSE connection by stopping the Writer and closing the done channel. Always call the parent OnClose when overriding:
func (c *MySSEConn) OnClose() {
// custom cleanup
c.BaseSSEConn.OnClose()
}
func (*BaseSSEConn[O]) OnStart ¶ added in v0.0.8
func (b *BaseSSEConn[O]) OnStart(w http.ResponseWriter, r *http.Request) error
OnStart initializes the SSE connection. It asserts that the ResponseWriter supports http.Flusher (required for streaming), then creates the Writer with SSE-format dispatch.
The Writer callback formats each message according to the SSE wire protocol:
- Comments: ": {text}\n\n"
- Data events: "event: {type}\nid: {id}\ndata: {json}\n\n"
Per WHATWG SSE spec, multi-line data is split into separate "data:" lines.
func (*BaseSSEConn[O]) Ready ¶ added in v0.0.10
func (b *BaseSSEConn[O]) Ready() <-chan struct{}
Ready returns a channel that is closed when OnStart completes and the Writer is initialized. Safe to call before OnStart (the channel is created lazily). Use this to wait before sending messages:
<-conn.Ready() conn.SendOutput(msg)
func (*BaseSSEConn[O]) SendEvent ¶ added in v0.0.8
func (b *BaseSSEConn[O]) SendEvent(event string, msg O)
SendEvent sends a named event to the client. The event type is set via the SSE "event:" field, allowing clients to listen with addEventListener.
Per WHATWG SSE spec, if no event type is specified, clients receive the event via the "message" event handler.
func (*BaseSSEConn[O]) SendEventWithID ¶ added in v0.0.8
func (b *BaseSSEConn[O]) SendEventWithID(event string, id string, msg O)
SendEventWithID sends a named event with an ID. The ID is set via the SSE "id:" field, enabling client-side reconnection via the Last-Event-ID header.
Per WHATWG SSE spec (https://html.spec.whatwg.org/multipage/server-sent-events.html#the-last-event-id-string): on reconnection, the browser includes "Last-Event-ID: {id}" so the server can resume the event stream from the correct position.
func (*BaseSSEConn[O]) SendKeepalive ¶ added in v0.0.8
func (b *BaseSSEConn[O]) SendKeepalive()
SendKeepalive sends an SSE comment as a keepalive signal. The comment format (": keepalive\n\n") is ignored by EventSource clients but prevents intermediate proxies from closing idle connections.
func (*BaseSSEConn[O]) SendOutput ¶ added in v0.0.8
func (b *BaseSSEConn[O]) SendOutput(msg O)
SendOutput sends a data message to the client. The message is serialized using the configured Codec and formatted as an SSE data event.
This is a convenience method for sending events without a named type. For named events, use SendEvent or SendEventWithID.
func (*BaseSSEConn[O]) SendRetry ¶ added in v0.0.23
func (b *BaseSSEConn[O]) SendRetry(ms int)
SendRetry emits a bare SSE "retry:" field to change the client's reconnection delay. Used for server-initiated disconnect hints — e.g., a long-running tool handler tells the client to back off for N milliseconds before reconnecting. Has no effect if ms <= 0 or the Writer is nil.
Per WHATWG SSE spec (https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream): the retry field sets the client's reconnection time in integer milliseconds. Clients that do not support the field ignore it.
To combine a retry hint with a data delivery in one event, set Retry on an SSEOutgoingMessage that also has Data set, and pass it to the Writer directly.
type BiDirStreamConfig ¶
type BiDirStreamConfig struct {
// PingPeriod specifies how often to send ping messages to the remote peer.
// Pings are used as heartbeat messages to verify the connection is alive.
// Default: 30 seconds.
PingPeriod time.Duration
// PongPeriod specifies the maximum time to wait for any data (ping, pong, or
// regular messages) from the remote peer before considering the connection dead.
// If no data is received within this duration, OnTimeout() is called.
// Default: 300 seconds (5 minutes).
PongPeriod time.Duration
}
BiDirStreamConfig provides configuration for bidirectional stream connections. It controls the timing of health checks and connection timeout detection.
func DefaultBiDirStreamConfig ¶
func DefaultBiDirStreamConfig() *BiDirStreamConfig
DefaultBiDirStreamConfig returns a BiDirStreamConfig with sensible defaults:
- PingPeriod: 30 seconds
- PongPeriod: 300 seconds (5 minutes)
These defaults are suitable for most production deployments. For development or testing, you may want shorter periods for faster feedback.
type BiDirStreamConn ¶
type BiDirStreamConn[I any] interface { // SendPing sends a heartbeat ping message to the remote peer. // Called periodically based on BiDirStreamConfig.PingPeriod. // The implementation should encode and send an appropriate ping message. SendPing() error // Name returns an optional human-readable name for this connection type. // Used for logging and debugging purposes. Name() string // ConnId returns a unique identifier for this specific connection instance. // Useful for tracking individual connections in logs and metrics. ConnId() string // HandleMessage processes an incoming message of type I. // Called for each message received from the remote peer. // Return an error to signal that the connection should be closed. HandleMessage(msg I) error // OnError is called when an error occurs during connection operation. // Return nil to suppress the error and continue the connection. // Return the error (or a different error) to close the connection. OnError(err error) error // OnClose is called when the connection is closing for any reason. // Use this hook to clean up resources (timers, goroutines, etc.) // and perform any final actions (logging, metrics, etc.). OnClose() // OnTimeout is called when no data has been received within the PongPeriod. // Return true to close the connection, false to continue waiting. // Typically returns true, but may return false for connections that // expect long periods of silence. OnTimeout() bool }
BiDirStreamConn defines the lifecycle and message handling interface for bidirectional stream connections. Implementations handle messages of type I and manage connection state through lifecycle hooks.
The typical lifecycle is:
- Connection established, OnStart() called (not part of this interface)
- Messages received and processed via HandleMessage()
- Periodic SendPing() calls to maintain connection health
- On errors, OnError() is called to determine if connection should continue
- On timeout (no data received within PongPeriod), OnTimeout() is called
- Connection ends, OnClose() is called for cleanup
type BinaryProtoCodec ¶ added in v0.0.4
type BinaryProtoCodec[I proto.Message, O proto.Message] struct { // NewInput is an optional factory function to create new input instances. // When provided, this is used for optimal performance. NewInput func() I // Input is an exemplar instance used as fallback when NewInput is nil. Input I }
BinaryProtoCodec handles encoding/decoding of protobuf messages using binary format. This provides maximum efficiency for high-throughput scenarios.
For best performance, provide NewInput factory function. If not provided, the codec falls back to reflection using the Input exemplar (slower).
func (*BinaryProtoCodec[I, O]) Decode ¶ added in v0.0.4
func (c *BinaryProtoCodec[I, O]) Decode(data []byte, msgType MessageType) (I, error)
Decode unmarshals binary protobuf data into a new message instance.
func (*BinaryProtoCodec[I, O]) Encode ¶ added in v0.0.4
func (c *BinaryProtoCodec[I, O]) Encode(msg O) ([]byte, MessageType, error)
Encode marshals a protobuf message to binary bytes.
type Codec ¶ added in v0.0.4
type Codec[I any, O any] interface { // Decode converts raw WebSocket data into a typed input message. // msgType indicates whether the data was received as text or binary. Decode(data []byte, msgType MessageType) (I, error) // Encode converts a typed output message to raw bytes for sending. // Returns the encoded bytes and the appropriate message type (text/binary). Encode(msg O) ([]byte, MessageType, error) }
Codec handles encoding/decoding of messages over WebSocket. The type parameters I and O represent input (received) and output (sent) message types. Note: Pings are handled at the transport layer (BaseConn), not by codecs.
type EventStore ¶ added in v0.0.19
type EventStore interface {
// Store persists an event for the given stream. Events are ordered by
// insertion time within a stream.
Store(streamID string, event StoredEvent) error
// Replay returns all events stored after the event with the given
// lastEventID. If lastEventID is not found in the stream, all stored
// events are returned (conservative fallback — the client may have
// been disconnected long enough for the anchor event to be evicted).
//
// Returns an empty slice if the stream does not exist or if
// lastEventID is the most recent event.
Replay(streamID string, lastEventID string) ([]StoredEvent, error)
// Trim removes all stored events for the given stream. Call this
// when a session is destroyed to prevent unbounded memory growth.
Trim(streamID string) error
}
EventStore persists SSE events for stream resumption. When a client reconnects with a Last-Event-ID header, the server replays missed events from the store.
Implementations must be safe for concurrent use by multiple goroutines.
The streamID parameter groups events by logical stream (e.g., session ID). Event IDs are opaque strings — ordering is determined by insertion order, not by parsing the ID value.
type GracefulOption ¶ added in v0.0.9
type GracefulOption func(*gracefulConfig)
GracefulOption configures ListenAndServeGraceful behavior.
func WithContext ¶ added in v0.0.9
func WithContext(ctx context.Context) GracefulOption
WithContext sets a parent context. Shutdown is triggered when this context is cancelled, in addition to OS signals. This is useful for programmatic shutdown in tests or when embedding the server in a larger application.
func WithDrainTimeout ¶ added in v0.0.9
func WithDrainTimeout(d time.Duration) GracefulOption
WithDrainTimeout sets the maximum time to wait for in-flight requests to complete during shutdown. Default: 30 seconds.
After this timeout expires, remaining connections are forcefully closed.
func WithOnShutdown ¶ added in v0.0.9
func WithOnShutdown(fn func()) GracefulOption
WithOnShutdown registers a callback to be invoked when shutdown is triggered, BEFORE the server begins draining connections. Multiple callbacks are called in registration order.
Use this to notify long-lived connections before they are closed:
- SSEHub.CloseAll() — closes SSE Done channels, unblocking SSEServe handlers
- Custom WebSocket goodbye messages
- Flush logs, metrics, or other state
Callbacks run while connections are still open, so they can send final messages to clients.
func WithSignals ¶ added in v0.0.9
func WithSignals(sigs ...os.Signal) GracefulOption
WithSignals sets the OS signals that trigger shutdown. Default: SIGTERM, SIGINT.
type HTTPStatusError ¶ added in v0.0.17
HTTPStatusError represents an HTTP response with a non-2xx status code. It captures the status code, response headers, and response body for error reporting, classification (e.g., 5xx errors are typically transient/retriable), and programmatic inspection (e.g., reading WWW-Authenticate or Retry-After).
func (*HTTPStatusError) Error ¶ added in v0.0.17
func (e *HTTPStatusError) Error() string
type IDGen ¶ added in v0.0.19
type IDGen interface {
Next() string
}
IDGen generates unique string IDs. Implementations must be safe for concurrent use. IDs are opaque — callers must not assume any ordering, format, or structure beyond uniqueness within the generator's scope.
type JSONCodec ¶ added in v0.0.4
type JSONCodec struct{}
JSONCodec handles encoding/decoding of arbitrary JSON messages. This is useful for dynamic message handling where the structure isn't known at compile time.
Example ¶
ExampleJSONCodec demonstrates the default untyped JSON codec. Use this for dynamic messages where the structure isn't known at compile time. This is what JSONConn uses internally.
package main
import (
"fmt"
gohttp "github.com/panyam/servicekit/http"
)
func main() {
codec := &gohttp.JSONCodec{}
// Encode any value
data, _, err := codec.Encode(map[string]any{"action": "join", "room": "lobby"})
if err != nil {
panic(err)
}
fmt.Printf("Encoded: %s\n", data)
// Decode to any
msg, err := codec.Decode(data, gohttp.TextMessage)
if err != nil {
panic(err)
}
m := msg.(map[string]any)
fmt.Printf("Action: %s\n", m["action"])
}
Output: Encoded: {"action":"join","room":"lobby"} Action: join
type JSONConn ¶
JSONConn is an alias for BaseConn with untyped JSON messages. This provides a simple way to handle dynamic JSON messages.
For typed messages, use BaseConn[YourInputType, YourOutputType] directly with an appropriate codec.
func NewJSONConn ¶ added in v0.0.4
func NewJSONConn() *JSONConn
NewJSONConn creates a new JSONConn with the default JSON codec.
type JSONHandler ¶
type JSONHandler struct{}
JSONHandler is a simple handler that creates JSONConn instances. All connections are accepted (no validation).
func (*JSONHandler) Validate ¶
func (j *JSONHandler) Validate(w http.ResponseWriter, r *http.Request) (*JSONConn, bool)
Validate implements WSHandler. Accepts all connections.
type JSONSSEConn ¶ added in v0.0.8
type JSONSSEConn = BaseSSEConn[any]
JSONSSEConn is an alias for BaseSSEConn with untyped JSON messages. This provides a simple way to send dynamic JSON data over SSE.
For typed messages, use BaseSSEConn[YourOutputType] directly with an appropriate codec.
type JSONSSEHandler ¶ added in v0.0.8
type JSONSSEHandler struct{}
JSONSSEHandler is a simple handler that creates JSONSSEConn instances. All connections are accepted (no validation).
func (*JSONSSEHandler) Validate ¶ added in v0.0.8
func (h *JSONSSEHandler) Validate(w http.ResponseWriter, r *http.Request) (*JSONSSEConn, bool)
Validate implements SSEHandler. Accepts all connections.
type MemoryEventStore ¶ added in v0.0.19
type MemoryEventStore struct {
// contains filtered or unexported fields
}
MemoryEventStore is an in-memory EventStore backed by bounded per-stream slices. When a stream exceeds maxPerStream events, the oldest events are dropped (FIFO eviction).
This implementation is suitable for single-process deployments. For multi-process or persistent resumption, use a Redis or database-backed EventStore.
Thread-safe: all methods are protected by a sync.RWMutex.
func NewMemoryEventStore ¶ added in v0.0.19
func NewMemoryEventStore(maxPerStream int) *MemoryEventStore
NewMemoryEventStore creates a MemoryEventStore with the given maximum events per stream. If maxPerStream <= 0, it defaults to 1000.
func (*MemoryEventStore) Replay ¶ added in v0.0.19
func (s *MemoryEventStore) Replay(streamID string, lastEventID string) ([]StoredEvent, error)
Replay returns all events after lastEventID by scanning for it in the stream's insertion-ordered slice. If lastEventID is not found (e.g., evicted), all stored events are returned as a conservative fallback.
func (*MemoryEventStore) Store ¶ added in v0.0.19
func (s *MemoryEventStore) Store(streamID string, event StoredEvent) error
Store appends an event to the stream. If the stream exceeds maxPerStream, the oldest event is dropped.
func (*MemoryEventStore) Trim ¶ added in v0.0.19
func (s *MemoryEventStore) Trim(streamID string) error
Trim removes all stored events for the given stream.
type MessageType ¶ added in v0.0.4
type MessageType int
MessageType represents the WebSocket frame type
const ( // TextMessage denotes a text data message (UTF-8 encoded) TextMessage MessageType = websocket.TextMessage // 1 // BinaryMessage denotes a binary data message BinaryMessage MessageType = websocket.BinaryMessage // 2 )
type OutgoingMessage ¶ added in v0.0.4
type OutgoingMessage[O any] struct { // Data is a regular output message (mutually exclusive with Ping/Error) Data *O // Ping is a heartbeat message (mutually exclusive with Data/Error) Ping *PingData // Error is an error message (mutually exclusive with Data/Ping) Error error }
OutgoingMessage represents any message that can be sent over the WebSocket. This union type allows pings, errors, and data messages to all go through the same Writer, avoiding concurrent write issues.
type ProtoJSONCodec ¶ added in v0.0.4
type ProtoJSONCodec[I proto.Message, O proto.Message] struct { // NewInput is an optional factory function to create new input instances. // When provided, this is used for optimal performance. // Example: func() *pb.PlayerAction { return &pb.PlayerAction{} } NewInput func() I // Input is an exemplar instance used as fallback when NewInput is nil. // The codec uses reflection to create new instances of the same type. // Example: &pb.PlayerAction{} Input I // MarshalOptions configures protojson marshaling behavior. MarshalOptions protojson.MarshalOptions // UnmarshalOptions configures protojson unmarshaling behavior. UnmarshalOptions protojson.UnmarshalOptions }
ProtoJSONCodec handles encoding/decoding of protobuf messages using JSON format. This provides human-readable wire format while maintaining proto type safety.
For best performance, provide NewInput factory function. If not provided, the codec falls back to reflection using the Input exemplar (slower).
func (*ProtoJSONCodec[I, O]) Decode ¶ added in v0.0.4
func (c *ProtoJSONCodec[I, O]) Decode(data []byte, msgType MessageType) (I, error)
Decode unmarshals JSON data into a new protobuf message instance.
func (*ProtoJSONCodec[I, O]) Encode ¶ added in v0.0.4
func (c *ProtoJSONCodec[I, O]) Encode(msg O) ([]byte, MessageType, error)
Encode marshals a protobuf message to JSON bytes.
type SSEConn ¶ added in v0.0.8
type SSEConn[O any] interface { // Name returns a human-readable name for this connection type. // Used for logging and debugging. Name() string // ConnId returns a unique identifier for this connection instance. // Used for tracking in SSEHub and logging. ConnId() string // OnStart is called when the SSE connection is established. // Receives the ResponseWriter (which must implement http.Flusher) and // the Request (whose Context signals client disconnect). // Return an error to reject the connection. OnStart(w http.ResponseWriter, r *http.Request) error // OnClose is called when the connection ends (client disconnect or server close). // Use this to clean up resources. OnClose() // SendKeepalive sends an SSE comment as a keepalive signal. // Called automatically by SSEServe at the configured interval. SendKeepalive() // Done returns a channel that is closed when the connection should terminate. // SSEServe monitors this channel to detect programmatic close requests. Done() <-chan struct{} }
SSEConn represents a server-sent events connection. It is the SSE counterpart to WSConn — write-only (server to client), with lifecycle hooks for setup and teardown.
Implementations typically embed BaseSSEConn[O] and may override OnStart/OnClose for custom initialization and cleanup logic.
SSE connections are unidirectional (server → client) per the WHATWG spec: https://html.spec.whatwg.org/multipage/server-sent-events.html
type SSEConnConfig ¶ added in v0.0.8
type SSEConnConfig struct {
// KeepalivePeriod specifies how often to send SSE comment keepalives.
// SSE comments (lines starting with ":") are ignored by EventSource clients
// but keep the TCP connection alive through proxies (nginx default idle
// timeout: 60s, AWS ALB: 60s).
//
// Set to 0 to disable keepalives. Default: 30 seconds.
//
// See WHATWG SSE spec: https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
KeepalivePeriod time.Duration
}
SSEConnConfig controls the behavior of SSE connections.
func DefaultSSEConnConfig ¶ added in v0.0.8
func DefaultSSEConnConfig() *SSEConnConfig
DefaultSSEConnConfig returns an SSEConnConfig with sensible defaults:
- KeepalivePeriod: 30 seconds
These defaults keep connections alive through most reverse proxies. For SSE endpoints behind proxies with shorter idle timeouts, reduce the keepalive period accordingly.
type SSEEvent ¶ added in v0.0.9
type SSEEvent struct {
// Event is the optional SSE event type ("event:" field).
// If empty, the event type defaults to "message" on the client side.
Event string
// Data is the event payload. JSON-marshaled into the "data:" field.
Data any
// ID is the optional SSE event ID ("id:" field).
// Enables client reconnection via the Last-Event-ID header.
ID string
}
SSEEvent represents a single Server-Sent Event to be streamed. Used by StreamResponse to describe each event in the stream.
Per WHATWG SSE spec:
- Event sets the "event:" field (clients listen via addEventListener)
- Data is JSON-marshaled into the "data:" field
- ID sets the "id:" field (enables reconnection via Last-Event-ID)
type SSEEventReader ¶ added in v0.0.16
type SSEEventReader struct {
// contains filtered or unexported fields
}
SSEEventReader reads Server-Sent Events from an io.Reader.
Usage:
reader := NewSSEEventReader(resp.Body)
for {
ev, err := reader.ReadEvent()
if err != nil {
break
}
// process ev
}
The reader handles all WHATWG SSE spec details: field parsing, multi-line data concatenation, comment skipping, retry parsing, and BOM stripping.
See: https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
func NewSSEEventReader ¶ added in v0.0.16
func NewSSEEventReader(r io.Reader) *SSEEventReader
NewSSEEventReader creates an SSEEventReader that parses SSE events from r.
func (*SSEEventReader) ReadEvent ¶ added in v0.0.16
func (r *SSEEventReader) ReadEvent() (SSEReadEvent, error)
ReadEvent reads the next SSE event from the stream.
It blocks until a complete event (terminated by a blank line) is available, or an error occurs. Empty events (no fields accumulated between blank lines) are skipped automatically per the WHATWG dispatch algorithm: https://html.spec.whatwg.org/multipage/server-sent-events.html#dispatchMessage
On EOF mid-event (fields accumulated but no trailing blank line), the partial event is returned along with io.EOF. This supports servers that close the response body without a trailing blank line.
On EOF with no fields accumulated, returns a zero SSEReadEvent and io.EOF.
type SSEHandler ¶ added in v0.0.8
type SSEHandler[O any, S SSEConn[O]] interface { // Validate checks if the HTTP request should establish an SSE stream. // Return (connection, true) to proceed with the SSE connection. // Return (nil, false) to reject (the handler should write the error response). Validate(w http.ResponseWriter, r *http.Request) (S, bool) }
SSEHandler validates HTTP requests and creates SSE connections. It acts as a factory for SSEConn instances, analogous to WSHandler for WebSocket connections.
Type parameters:
- O: The output message type sent to the client
- S: The specific SSEConn implementation type
type SSEHub ¶ added in v0.0.8
type SSEHub[O any] struct { // contains filtered or unexported fields }
SSEHub manages a collection of SSE connections, providing session tracking, targeted delivery, and broadcast capabilities. It is the SSE counterpart to the WebSocket hub/room pattern used in the grpcws-demo's GameHub.
SSEHub is instance-based (not a package-level global) for testability and to support multiple independent hubs in the same process.
Type parameter O is the output message type sent to clients.
Usage:
hub := gohttp.NewSSEHub[MyEvent]()
// In your SSEConn.OnStart:
hub.Register(conn)
// In your SSEConn.OnClose:
hub.Unregister(conn.ConnId())
// From application code:
hub.Broadcast(MyEvent{Type: "update", Data: ...})
hub.Send(sessionId, MyEvent{Type: "direct", Data: ...})
// On graceful shutdown:
hub.CloseAll()
Example ¶
ExampleSSEHub demonstrates using SSEHub for broadcasting events to multiple SSE connections. The hub tracks connections by ID and supports targeted delivery and broadcast.
package main
import (
"fmt"
gohttp "github.com/panyam/servicekit/http"
)
func main() {
hub := gohttp.NewSSEHub[any]()
// Broadcast to all connected clients
hub.Broadcast(map[string]any{"type": "alert", "msg": "hello"})
// Send to a specific client
hub.Send("session-123", map[string]any{"type": "direct"})
// Send a named event
hub.BroadcastEvent("notification", map[string]any{"text": "update"})
// On shutdown, close all connections
hub.CloseAll()
fmt.Println("Hub operations complete")
}
Output: Hub operations complete
func (*SSEHub[O]) Broadcast ¶ added in v0.0.8
func (h *SSEHub[O]) Broadcast(msg O)
Broadcast sends a message to all registered connections. The message is queued to each connection's Writer independently, so a slow connection does not block others.
func (*SSEHub[O]) BroadcastEvent ¶ added in v0.0.8
BroadcastEvent sends a named event to all registered connections. The event type is set via the SSE "event:" field.
func (*SSEHub[O]) BroadcastEventWithID ¶ added in v0.0.19
BroadcastEventWithID sends a named event with an ID to all registered connections. The ID is set via the SSE "id:" field.
func (*SSEHub[O]) CloseAll ¶ added in v0.0.8
func (h *SSEHub[O]) CloseAll()
CloseAll closes all registered connections by calling OnClose on each, then clears the connection map. Use this for graceful shutdown.
After CloseAll, the hub is empty and can be reused.
func (*SSEHub[O]) Count ¶ added in v0.0.8
Count returns the number of currently registered connections.
func (*SSEHub[O]) Register ¶ added in v0.0.8
func (h *SSEHub[O]) Register(conn *BaseSSEConn[O])
Register adds an SSE connection to the hub, keyed by its ConnId. If a connection with the same ID already exists, it is replaced (the old connection is NOT closed — the caller is responsible for lifecycle management).
func (*SSEHub[O]) Send ¶ added in v0.0.8
Send delivers a message to a specific connection by ID. Returns true if the connection was found and the message was queued. Returns false if the connection ID does not exist (no error, no panic).
func (*SSEHub[O]) SendEvent ¶ added in v0.0.8
SendEvent delivers a named event to a specific connection by ID. The event type is set via the SSE "event:" field. Returns true if the connection was found and the message was queued. Returns false if the connection ID does not exist.
func (*SSEHub[O]) SendEventWithID ¶ added in v0.0.19
SendEventWithID delivers a named event with an ID to a specific connection. The ID is set via the SSE "id:" field, enabling client reconnection via the Last-Event-ID header. Returns true if the connection was found and the message was queued. Returns false if the connection ID does not exist.
func (*SSEHub[O]) Unregister ¶ added in v0.0.8
Unregister removes an SSE connection from the hub by its ConnId. The connection's OnClose is NOT called — the caller manages the connection lifecycle (typically SSEServe handles OnClose via defer).
Unregistering a nonexistent ID is a no-op.
type SSEOutgoingMessage ¶ added in v0.0.8
type SSEOutgoingMessage[O any] struct { // Data is a regular output message. Encoded via Codec, sent as SSE "data:" field. // Mutually exclusive with Comment. Data *O // Event is the SSE event type ("event:" field). Only used with Data. // If empty, the event type defaults to "message" on the client side. Event string // ID is the SSE event ID ("id:" field). Only used with Data. // Enables client reconnection via the Last-Event-ID header. ID string // Retry is the reconnection delay hint in milliseconds, emitted as the // SSE "retry:" field. When non-zero, the client uses this value as its // next reconnection delay if the connection drops. Zero or negative // values are ignored (no retry line written). // // Can be sent alongside Data (as a combined event+hint) or as a bare // hint via SendRetry (no Data, just the retry line). Retry int // Comment is an SSE comment line (for keepalive). Mutually exclusive with Data. // Sent as ": {comment}\n\n". Comment string }
SSEOutgoingMessage represents a message to be sent over an SSE connection. This union type allows data messages and keepalive comments to go through the same Writer, avoiding concurrent write issues.
Per WHATWG SSE spec (https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream):
- "event:" sets the event type (clients listen via addEventListener)
- "data:" carries the payload (multiple lines joined with "\n")
- "id:" sets the last event ID (used for reconnection via Last-Event-ID header)
- "retry:" sets the client's reconnection delay in milliseconds
- Lines starting with ":" are comments (used for keepalive)
type SSEReadEvent ¶ added in v0.0.16
type SSEReadEvent struct {
Event string // "event:" field value
Data string // "data:" field(s), joined with "\n" for multi-line
ID string // "id:" field value
Retry int // "retry:" field value in ms (0 = not set)
Comment string // Comment text (lines starting with ":")
}
SSEReadEvent represents a single parsed event from an SSE stream.
Fields follow the WHATWG Server-Sent Events spec: https://html.spec.whatwg.org/multipage/server-sent-events.html#concept-event-stream-event-type
- Event: the "event:" field (empty = unnamed, defaults to "message" on client side)
- Data: the "data:" field(s), joined with "\n" for multi-line data
- ID: the "id:" field (empty = not set)
- Retry: the "retry:" field in milliseconds (0 = not set or invalid)
- Comment: text from comment lines (lines starting with ":"), last value kept
type SingleResponse ¶ added in v0.0.9
type SingleResponse struct {
StatusCode int // HTTP status code (default 200)
Body any // JSON-marshaled response body
}
SingleResponse sends a single JSON response. This is the synchronous path — the server marshals Body as JSON and returns it with Content-Type: application/json.
If StatusCode is 0, it defaults to 200.
type StoredEvent ¶ added in v0.0.19
type StoredEvent struct {
// ID is the SSE event ID ("id:" field). Used by clients to resume
// via the Last-Event-ID header on reconnection.
ID string
// Event is the SSE event type ("event:" field). If empty, clients
// receive the event via the default "message" handler.
Event string
// Data is the raw event payload (pre-serialized bytes). Stored as-is
// to avoid re-serialization on replay.
Data []byte
}
StoredEvent represents a single SSE event that has been persisted for potential replay. It captures the three fields needed to reconstruct the original SSE wire format: event type, event ID, and raw data.
type StreamResponse ¶ added in v0.0.9
type StreamResponse struct {
Events <-chan SSEEvent
}
StreamResponse streams SSE events from the Events channel. This is the asynchronous path — the server sets Content-Type: text/event-stream and writes each event from the channel until it is closed or the client disconnects.
The handler creates the channel, spawns a goroutine to write events, and closes the channel when done. StreamableServe handles SSE formatting, flushing, and client disconnect detection.
Per WHATWG SSE spec: https://html.spec.whatwg.org/multipage/server-sent-events.html
type StreamableConfig ¶ added in v0.0.9
type StreamableConfig struct {
// Codec for serializing SSE event Data fields.
// Only Encode() is used. Default: JSONCodec.
Codec Codec[any, any]
}
StreamableConfig controls StreamableServe behavior.
func DefaultStreamableConfig ¶ added in v0.0.9
func DefaultStreamableConfig() *StreamableConfig
DefaultStreamableConfig returns a StreamableConfig with sensible defaults.
type StreamableHandlerFunc ¶ added in v0.0.9
type StreamableHandlerFunc func(ctx context.Context, r *http.Request) StreamableResponse
StreamableHandlerFunc processes an HTTP request and returns either a SingleResponse (synchronous JSON) or StreamResponse (SSE event stream).
The handler receives the request context and the original request. It decides whether to respond synchronously or stream based on the request content, Accept header, or application logic.
Example:
func myHandler(ctx context.Context, r *http.Request) gohttp.StreamableResponse {
if wantsStreaming(r) {
ch := make(chan gohttp.SSEEvent)
go func() {
defer close(ch)
ch <- gohttp.SSEEvent{Data: map[string]any{"progress": 50}}
ch <- gohttp.SSEEvent{Data: map[string]any{"progress": 100}}
}()
return gohttp.StreamResponse{Events: ch}
}
return gohttp.SingleResponse{Body: map[string]any{"result": "ok"}}
}
type StreamableResponse ¶ added in v0.0.9
type StreamableResponse interface {
// contains filtered or unexported methods
}
StreamableResponse is the return type for StreamableHandlerFunc. It is a sealed interface — only SingleResponse and StreamResponse are valid implementations.
This pattern enables the "POST-that-optionally-streams" transport introduced by MCP 2025-03-26, where a single endpoint returns either a synchronous JSON response or an SSE event stream.
type TypedJSONCodec ¶ added in v0.0.4
TypedJSONCodec handles encoding/decoding of strongly-typed JSON messages. Use this when you have known Go struct types for your messages.
Example ¶
ExampleTypedJSONCodec demonstrates using TypedJSONCodec for strongly-typed JSON message encoding/decoding. Use this when your message types are known Go structs, for compile-time type safety.
package main
import (
"fmt"
gohttp "github.com/panyam/servicekit/http"
)
func main() {
type ChatMessage struct {
User string `json:"user"`
Text string `json:"text"`
}
type ChatResponse struct {
Status string `json:"status"`
ID int `json:"id"`
}
codec := &gohttp.TypedJSONCodec[ChatMessage, ChatResponse]{}
// Encode a response
data, msgType, err := codec.Encode(ChatResponse{Status: "sent", ID: 42})
if err != nil {
panic(err)
}
fmt.Printf("Type: %d, Data: %s\n", msgType, data)
// Decode an incoming message
msg, err := codec.Decode([]byte(`{"user":"alice","text":"hello"}`), gohttp.TextMessage)
if err != nil {
panic(err)
}
fmt.Printf("User: %s, Text: %s\n", msg.User, msg.Text)
}
Output: Type: 1, Data: {"status":"sent","id":42} User: alice, Text: hello
func (*TypedJSONCodec[I, O]) Decode ¶ added in v0.0.4
func (c *TypedJSONCodec[I, O]) Decode(data []byte, msgType MessageType) (I, error)
Decode unmarshals JSON data into a typed value. Creates a new instance of I using Go's zero value mechanism.
func (*TypedJSONCodec[I, O]) Encode ¶ added in v0.0.4
func (c *TypedJSONCodec[I, O]) Encode(msg O) ([]byte, MessageType, error)
Encode marshals a typed value to JSON bytes.
type URLWaiter ¶
type URLWaiter struct {
Method string
Url string
Headers map[string]string
Payload map[string]any
DelayBetweenChecks time.Duration
// Func versions of above so we can do something dynamcially on each iteration
RequestFunc func(iter int, prevError error) (*http.Request, error)
ValidateFunc func(req *http.Request, resp *http.Response) error
}
A simple utility that waits for a url to return a successful response before proceeding. This can be used for things like waiting for a database or another service to become available before performing other activities.
type WSConn ¶
type WSConn[I any] interface { BiDirStreamConn[I] // ReadMessage reads and decodes the next message from the WebSocket connection. // This is called in a loop by WSHandleConn to process incoming messages. // Returns the decoded message or an error (including io.EOF on close). ReadMessage(w *websocket.Conn) (I, error) // OnStart is called when the WebSocket connection is established. // Use this to initialize the connection (e.g., set up writers, start goroutines). // Return an error to reject and close the connection. OnStart(conn *websocket.Conn) error }
WSConn represents a bidirectional WebSocket connection that can handle typed messages of type I. It extends BiDirStreamConn with WebSocket-specific functionality for reading messages and connection initialization.
Implementations typically embed BaseConn[I, O] and override HandleMessage. The type parameter I represents the input message type received from clients.
type WSConnConfig ¶
type WSConnConfig struct {
*BiDirStreamConfig
// Upgrader handles the HTTP to WebSocket protocol upgrade.
// Configure ReadBufferSize, WriteBufferSize, and CheckOrigin as needed.
Upgrader websocket.Upgrader
}
WSConnConfig combines BiDirStreamConfig with WebSocket-specific settings. It controls connection upgrade behavior and lifecycle timing.
func DefaultWSConnConfig ¶
func DefaultWSConnConfig() *WSConnConfig
DefaultWSConnConfig returns a WSConnConfig with sensible defaults:
- ReadBufferSize: 1024 bytes
- WriteBufferSize: 1024 bytes
- CheckOrigin: allows all origins (configure for production!)
- PingPeriod: 30 seconds
- PongPeriod: 300 seconds (5 minutes)
type WSHandler ¶
type WSHandler[I any, S WSConn[I]] interface { // Validate checks if the HTTP request should be upgraded to a WebSocket. // Return (connection, true) to proceed with the upgrade. // Return (nil, false) to reject (the handler should write the error response). Validate(w http.ResponseWriter, r *http.Request) (S, bool) }
WSHandler validates HTTP requests and creates WebSocket connections. It acts as a factory for WSConn instances, typically performing authentication and authorization before allowing the upgrade.
Type parameters:
- I: The input message type that the connection will handle
- S: The specific WSConn implementation type (must implement WSConn[I])