servicekit

module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: Apache-2.0

README

ServiceKit

A comprehensive guide to using servicekit for production-grade real-time applications.

Overview

ServiceKit provides a robust, production-ready framework for WebSocket connections and Server-Sent Events (SSE) with automatic connection management, heartbeats, and lifecycle hooks. It's built on top of the Gorilla WebSocket library with additional abstractions for concurrent message handling, and includes SSE support for server-push scenarios.

Key Features

  • Production-grade connection management with automatic heartbeat detection
  • Lifecycle hooks for connection start, close, timeout, and error handling
  • Thread-safe message broadcasting to multiple clients
  • Automatic ping-pong mechanism to prevent connection timeouts
  • Type-safe message handling with generics
  • Pluggable Codec system for flexible message encoding (JSON, Protobuf, custom)
  • Generic BaseConn[I, O] for type-safe input/output message handling
  • Server-Sent Events (SSE) via SSEConn[O] and SSEHub[O] for server-push scenarios
  • Graceful shutdown via ListenAndServeGraceful with signal handling and connection draining
  • Streamable HTTP via StreamableServe for the POST-that-optionally-streams pattern (MCP 2025-03-26)
  • gRPC-over-WebSocket support via the grpcws package for all streaming modes
  • Configurable timeouts and intervals for different deployment scenarios

Architecture

Core Interfaces
WSConn[I any]

The main connection interface that applications must implement:

type WSConn[I any] interface {
    BiDirStreamConn[I]
    ReadMessage(w *websocket.Conn) (I, error)
    OnStart(conn *websocket.Conn) error
}
WSHandler[I any, S WSConn[I]]

Validates HTTP requests and creates WebSocket connections:

type WSHandler[I any, S WSConn[I]] interface {
    Validate(w http.ResponseWriter, r *http.Request) (S, bool)
}
BiDirStreamConn[I any]

Provides lifecycle and message handling methods:

type BiDirStreamConn[I any] interface {
    SendPing() error
    Name() string
    ConnId() string
    HandleMessage(msg I) error
    OnError(err error) error
    OnClose()
    OnTimeout() bool
}
Codec System

The Codec interface decouples message encoding from transport, allowing you to use different serialization formats:

type Codec[I any, O any] interface {
    Decode(data []byte, msgType MessageType) (I, error)
    Encode(msg O) ([]byte, MessageType, error)
}

Note: Ping/pong messages are handled at the transport layer (always JSON), not by the Codec. This separation ensures control messages work consistently even with binary codecs.

Built-in Codecs
Codec Input/Output Types Wire Format Use Case
JSONCodec any JSON text Dynamic messages, debugging
TypedJSONCodec[I, O] I, O JSON text Typed Go structs
ProtoJSONCodec[I, O] proto.Message JSON text Proto messages, human-readable
BinaryProtoCodec[I, O] proto.Message Binary Proto messages, max efficiency
BaseConn[I, O]

The generic BaseConn[I, O] is the foundation for all WebSocket connections:

type BaseConn[I any, O any] struct {
    Codec     Codec[I, O]
    Writer    *conc.Writer[OutgoingMessage[O]]
    NameStr   string
    ConnIdStr string
    PingId    int64
}

All writes go through OutgoingMessage[O], a union type that handles data messages (via codec), ping messages, and error messages (always JSON) through a single serialized writer, preventing concurrent write panics.

For simple JSON use cases, JSONConn is an alias for BaseConn[any, any]:

type JSONConn = BaseConn[any, any]

Basic Usage

1. Simple Echo Server
package main

import (
    "log"
    "net/http"
    "github.com/gorilla/mux"
    gohttp "github.com/panyam/servicekit/http"
)

// Use the built-in JSONConn for simple JSON message handling
type EchoConn struct {
    gohttp.JSONConn
}

func (e *EchoConn) HandleMessage(msg any) error {
    log.Printf("Received: %v", msg)
    // Echo the message back
    e.Writer.Send(gohttp.OutgoingMessage[any]{Data: &msg})
    return nil
}

type EchoHandler struct{}

func (h *EchoHandler) Validate(w http.ResponseWriter, r *http.Request) (*EchoConn, bool) {
    // Accept all connections - add authentication here in production
    return &EchoConn{
        JSONConn: gohttp.JSONConn{
            Codec:   &gohttp.JSONCodec{},
            NameStr: "EchoConn",
        },
    }, true
}

func main() {
    r := mux.NewRouter()
    r.HandleFunc("/echo", gohttp.WSServe(&EchoHandler{}, nil))
    
    log.Println("Echo server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", r))
}
2. Custom Message Types
// Define your message structure
type GameMessage struct {
    Type      string      `json:"type"`
    PlayerID  string      `json:"playerId"`
    Data      any `json:"data,omitempty"`
    Timestamp int64       `json:"timestamp"`
}

type GameConn struct {
    gohttp.JSONConn
    playerID string
    gameRoom string
}

func (g *GameConn) HandleMessage(msg any) error {
    // Parse the generic message into our GameMessage struct
    msgMap, ok := msg.(map[string]any)
    if !ok {
        return fmt.Errorf("invalid message format")
    }
    
    gameMsg := GameMessage{
        Type:      msgMap["type"].(string),
        PlayerID:  g.playerID,
        Timestamp: time.Now().Unix(),
    }
    
    switch gameMsg.Type {
    case "move":
        return g.handleMove(msgMap["data"])
    case "chat":
        return g.handleChat(msgMap["data"])
    default:
        log.Printf("Unknown message type: %s", gameMsg.Type)
    }
    
    return nil
}

func (g *GameConn) OnStart(conn *websocket.Conn) error {
    // Call parent OnStart first
    if err := g.JSONConn.OnStart(conn); err != nil {
        return err
    }
    
    // Send welcome message
    welcome := GameMessage{
        Type: "welcome",
        Data: map[string]any{
            "playerId": g.playerID,
            "gameRoom": g.gameRoom,
        },
        Timestamp: time.Now().Unix(),
    }
    
    g.Writer.Send(gohttp.OutgoingMessage[any]{Data: &welcome})
    return nil
}

Advanced Features

1. Connection Management and Broadcasting
type ChatServer struct {
    clients map[string]*ChatConn
    mu      sync.RWMutex
}

type ChatConn struct {
    gohttp.JSONConn
    server   *ChatServer
    username string
    id       string
}

func (c *ChatConn) OnStart(conn *websocket.Conn) error {
    if err := c.JSONConn.OnStart(conn); err != nil {
        return err
    }
    
    c.id = fmt.Sprintf("user_%s", conn.RemoteAddr().String())
    
    // Register this connection
    c.server.mu.Lock()
    c.server.clients[c.id] = c
    c.server.mu.Unlock()
    
    // Notify others of new user
    c.server.broadcast("user_joined", map[string]any{
        "username": c.username,
        "userId":   c.id,
    }, c.id) // Exclude self
    
    return nil
}

func (c *ChatConn) OnClose() {
    // Unregister this connection
    c.server.mu.Lock()
    delete(c.server.clients, c.id)
    c.server.mu.Unlock()
    
    // Notify others of user leaving
    c.server.broadcast("user_left", map[string]any{
        "username": c.username,
        "userId":   c.id,
    }, c.id)
    
    c.JSONConn.OnClose()
}

func (c *ChatConn) HandleMessage(msg any) error {
    msgMap := msg.(map[string]any)
    
    switch msgMap["type"].(string) {
    case "chat_message":
        c.server.broadcast("chat_message", map[string]any{
            "username": c.username,
            "message":  msgMap["message"],
            "timestamp": time.Now().Unix(),
        }, "")
    }
    
    return nil
}

func (s *ChatServer) broadcast(messageType string, data any, excludeId string) {
    message := map[string]any{
        "type": messageType,
        "data": data,
    }
    
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    for id, client := range s.clients {
        if id != excludeId {
            client.Writer.Send(gohttp.OutgoingMessage[any]{Data: &message})
        }
    }
}
2. Custom Ping-Pong Handling
type CustomConn struct {
    gohttp.JSONConn
    lastPingTime time.Time
    pingCount    int64
}

func (c *CustomConn) SendPing() error {
    c.pingCount++
    c.lastPingTime = time.Now()
    
    pingMsg := map[string]any{
        "type":      "ping",
        "pingId":    c.pingCount,
        "timestamp": c.lastPingTime.Unix(),
        "connId":    c.ConnId(),
    }
    
    c.Writer.Send(gohttp.OutgoingMessage[any]{Data: &pingMsg})
    return nil
}

func (c *CustomConn) HandleMessage(msg any) error {
    msgMap, ok := msg.(map[string]any)
    if !ok {
        return fmt.Errorf("invalid message format")
    }
    
    switch msgMap["type"].(string) {
    case "pong":
        // Handle pong response
        pingId := int64(msgMap["pingId"].(float64))
        latency := time.Now().Sub(c.lastPingTime)
        log.Printf("Received pong for ping %d, latency: %v", pingId, latency)
        return nil
    case "ping":
        // Respond to client ping
        pongMsg := map[string]any{
            "type":      "pong",
            "pingId":    msgMap["pingId"],
            "timestamp": time.Now().Unix(),
        }
        c.Writer.Send(gohttp.OutgoingMessage[any]{Data: &pongMsg})
        return nil
    default:
        // Handle other message types
        return c.handleBusinessLogic(msgMap)
    }
}

func (c *CustomConn) OnTimeout() bool {
    log.Printf("Connection %s timed out after %d pings", c.ConnId(), c.pingCount)
    return true // Close the connection
}
3. Authentication and Authorization
type AuthenticatedHandler struct {
    jwtSecret []byte
}

func (h *AuthenticatedHandler) Validate(w http.ResponseWriter, r *http.Request) (*SecureConn, bool) {
    // Extract token from header or query parameter
    token := r.Header.Get("Authorization")
    if token == "" {
        token = r.URL.Query().Get("token")
    }
    
    if token == "" {
        http.Error(w, "Missing authentication token", http.StatusUnauthorized)
        return nil, false
    }
    
    // Validate JWT token
    claims, err := h.validateJWT(token)
    if err != nil {
        http.Error(w, "Invalid token", http.StatusUnauthorized)
        return nil, false
    }
    
    // Create authenticated connection
    conn := &SecureConn{
        userID:   claims["userId"].(string),
        username: claims["username"].(string),
        roles:    claims["roles"].([]string),
    }
    
    return conn, true
}

type SecureConn struct {
    gohttp.JSONConn
    userID   string
    username string
    roles    []string
}

func (s *SecureConn) HandleMessage(msg any) error {
    msgMap := msg.(map[string]any)
    
    // Check permissions for the message type
    if !s.hasPermission(msgMap["type"].(string)) {
        return fmt.Errorf("insufficient permissions")
    }
    
    // Process authorized message
    return s.processMessage(msgMap)
}

func (s *SecureConn) hasPermission(messageType string) bool {
    // Implement role-based access control
    requiredRoles := map[string][]string{
        "admin_command": {"admin"},
        "moderate":      {"admin", "moderator"},
        "chat":          {"user", "moderator", "admin"},
    }
    
    required, exists := requiredRoles[messageType]
    if !exists {
        return false
    }
    
    for _, role := range s.roles {
        for _, req := range required {
            if role == req {
                return true
            }
        }
    }
    
    return false
}

Configuration

Custom Configuration
config := &gohttp.WSConnConfig{
    BiDirStreamConfig: &gohttp.BiDirStreamConfig{
        PingPeriod: time.Second * 25,  // Send ping every 25 seconds
        PongPeriod: time.Second * 300, // Timeout after 5 minutes of no activity
    },
    Upgrader: websocket.Upgrader{
        ReadBufferSize:  4096,
        WriteBufferSize: 4096,
        CheckOrigin: func(r *http.Request) bool {
            // Allow connections from specific origins
            origin := r.Header.Get("Origin")
            return origin == "https://yourdomain.com"
        },
    },
}

r.HandleFunc("/ws", gohttp.WSServe(handler, config))
Environment-specific Configurations
func getWSConfig() *gohttp.WSConnConfig {
    config := gohttp.DefaultWSConnConfig()
    
    if os.Getenv("ENV") == "production" {
        // More conservative timeouts for production
        config.PingPeriod = time.Second * 30
        config.PongPeriod = time.Second * 60
        config.Upgrader.CheckOrigin = func(r *http.Request) bool {
            // Strict origin checking in production
            origin := r.Header.Get("Origin")
            allowedOrigins := []string{
                "https://yourdomain.com",
                "https://app.yourdomain.com",
            }
            for _, allowed := range allowedOrigins {
                if origin == allowed {
                    return true
                }
            }
            return false
        }
    } else {
        // More lenient settings for development
        config.PingPeriod = time.Second * 10
        config.PongPeriod = time.Second * 30
    }
    
    return config
}

Frontend Integration

JavaScript Client with Ping-Pong
class WebSocketClient {
    constructor(url) {
        this.url = url;
        this.ws = null;
        this.pingInterval = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
    }
    
    connect() {
        this.ws = new WebSocket(this.url);
        
        this.ws.onopen = () => {
            console.log('WebSocket connected');
            this.reconnectAttempts = 0;
            this.startPingInterval();
        };
        
        this.ws.onmessage = (event) => {
            const message = JSON.parse(event.data);
            this.handleMessage(message);
        };
        
        this.ws.onclose = () => {
            console.log('WebSocket disconnected');
            this.stopPingInterval();
            this.attemptReconnect();
        };
        
        this.ws.onerror = (error) => {
            console.error('WebSocket error:', error);
        };
    }
    
    handleMessage(message) {
        switch (message.type) {
            case 'ping':
                // Respond to server ping
                this.send({
                    type: 'pong',
                    pingId: message.pingId,
                    timestamp: Date.now()
                });
                break;
            case 'pong':
                // Handle server pong response
                console.log('Received pong from server');
                break;
            default:
                // Handle application-specific messages
                this.onMessage(message);
        }
    }
    
    startPingInterval() {
        // Send ping every 25 seconds to keep connection alive
        this.pingInterval = setInterval(() => {
            if (this.ws && this.ws.readyState === WebSocket.OPEN) {
                this.send({
                    type: 'ping',
                    timestamp: Date.now()
                });
            }
        }, 25000);
    }
    
    stopPingInterval() {
        if (this.pingInterval) {
            clearInterval(this.pingInterval);
            this.pingInterval = null;
        }
    }
    
    send(message) {
        if (this.ws && this.ws.readyState === WebSocket.OPEN) {
            this.ws.send(JSON.stringify(message));
        }
    }
    
    attemptReconnect() {
        if (this.reconnectAttempts < this.maxReconnectAttempts) {
            this.reconnectAttempts++;
            console.log(`Attempting reconnect ${this.reconnectAttempts}/${this.maxReconnectAttempts}`);
            setTimeout(() => this.connect(), 3000 * this.reconnectAttempts);
        }
    }
    
    // Override this method to handle application messages
    onMessage(message) {
        console.log('Received message:', message);
    }
    
    disconnect() {
        this.stopPingInterval();
        if (this.ws) {
            this.ws.close();
            this.ws = null;
        }
    }
}

// Usage
const client = new WebSocketClient('ws://localhost:8080/ws');
client.onMessage = (message) => {
    // Handle your application-specific messages
    console.log('Application message:', message);
};
client.connect();

Error Handling and Resilience

Graceful Error Handling
type ResilientConn struct {
    gohttp.JSONConn
    errorCount    int
    maxErrors     int
    lastErrorTime time.Time
}

func (r *ResilientConn) OnError(err error) error {
    r.errorCount++
    r.lastErrorTime = time.Now()
    
    log.Printf("WebSocket error #%d: %v", r.errorCount, err)
    
    // Close connection after too many errors
    if r.errorCount > r.maxErrors {
        log.Printf("Too many errors (%d), closing connection", r.errorCount)
        return err // This will close the connection
    }
    
    // Reset error count after successful period
    if time.Since(r.lastErrorTime) > time.Minute*5 {
        r.errorCount = 0
    }
    
    return nil // Continue with connection
}

func (r *ResilientConn) OnTimeout() bool {
    log.Printf("Connection timeout for %s", r.ConnId())
    
    // Try to send a final message before closing
    r.Writer.Send(gohttp.OutgoingMessage[any]{Data: &map[string]any{
        "type": "timeout_warning",
        "message": "Connection will be closed due to inactivity",
    }})
    
    return true // Close the connection
}

Performance Optimization

Connection Pooling and Resource Management
type ConnectionManager struct {
    connections map[string]*ManagedConn
    mu          sync.RWMutex
    maxConns    int
    connCount   int64
}

func (cm *ConnectionManager) AddConnection(conn *ManagedConn) bool {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    
    if len(cm.connections) >= cm.maxConns {
        log.Printf("Connection limit reached (%d), rejecting new connection", cm.maxConns)
        return false
    }
    
    cm.connections[conn.ConnId()] = conn
    atomic.AddInt64(&cm.connCount, 1)
    return true
}

func (cm *ConnectionManager) RemoveConnection(connId string) {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    
    if _, exists := cm.connections[connId]; exists {
        delete(cm.connections, connId)
        atomic.AddInt64(&cm.connCount, -1)
    }
}

func (cm *ConnectionManager) BroadcastToRoom(roomId string, message any) {
    cm.mu.RLock()
    defer cm.mu.RUnlock()
    
    for _, conn := range cm.connections {
        if conn.roomId == roomId {
            conn.Writer.Send(gohttp.OutgoingMessage[any]{Data: &message})
        }
    }
}

func (cm *ConnectionManager) GetStats() map[string]any {
    return map[string]any{
        "total_connections": atomic.LoadInt64(&cm.connCount),
        "max_connections":   cm.maxConns,
    }
}

Server-Sent Events (SSE)

The http package provides SSEConn[O] and SSEHub[O] for server-sent events — the write-only counterpart to WebSocket connections. SSE is ideal for server-push scenarios (notifications, live updates, streaming responses).

Basic SSE Endpoint
import gohttp "github.com/panyam/servicekit/http"

// Define your handler
type MySSEHandler struct{}

func (h *MySSEHandler) Validate(w http.ResponseWriter, r *http.Request) (*gohttp.BaseSSEConn[any], bool) {
    return &gohttp.BaseSSEConn[any]{
        Codec:   &gohttp.JSONCodec{},
        NameStr: "MySSE",
    }, true
}

// Mount the endpoint
router.HandleFunc("/events", gohttp.SSEServe[any](&MySSEHandler{}, nil))
Sending Events
// Plain data event (SSE type defaults to "message")
conn.SendOutput(map[string]any{"status": "updated"})

// Named event (clients use addEventListener("alert", ...))
conn.SendEvent("alert", map[string]any{"level": "warning"})

// Named event with ID (enables reconnection via Last-Event-ID)
conn.SendEventWithID("update", "42", map[string]any{"version": 3})
Session Management with SSEHub
hub := gohttp.NewSSEHub[any]()

// Register/unregister in lifecycle hooks
hub.Register(conn)
hub.Unregister(conn.ConnId())

// Targeted and broadcast delivery
hub.Send(sessionId, msg)          // one connection
hub.Broadcast(msg)                // all connections
hub.BroadcastEvent("alert", msg)  // all, with event type

// Graceful shutdown
hub.CloseAll()
Important Notes
  • Set WriteTimeout = 0 on http.Server for SSE endpoints (long-lived connections)
  • Keepalive comments (: keepalive) are sent automatically at a configurable interval (default 30s) to prevent proxy timeouts
  • SSE follows the WHATWG Server-Sent Events spec

gRPC-over-WebSocket (grpcws Package)

The grpcws package provides WebSocket transport for gRPC streaming RPCs, supporting all three streaming patterns with full lifecycle management.

Streaming Patterns
Pattern gRPC Definition Use Case
Server Streaming rpc Subscribe(Req) returns (stream Resp) Real-time updates, notifications
Client Streaming rpc SendBatch(stream Req) returns (Resp) Bulk uploads, aggregations
Bidirectional rpc Chat(stream Req) returns (stream Resp) Real-time collaboration, gaming
Server Streaming Example
import (
    "github.com/panyam/servicekit/grpcws"
    gohttp "github.com/panyam/servicekit/http"
)

// Server streaming: Subscribe to game events
router.HandleFunc("/ws/v1/subscribe", gohttp.WSServe(
    grpcws.NewServerStreamHandler(
        func(ctx context.Context, req *pb.SubscribeRequest) (pb.GameService_SubscribeClient, error) {
            return grpcClient.Subscribe(ctx, req)
        },
        func(r *http.Request) (*pb.SubscribeRequest, error) {
            return &pb.SubscribeRequest{
                GameId: mux.Vars(r)["game_id"],
            }, nil
        },
    ),
    nil,
))
Client Streaming Example
// Client streaming: Send multiple commands, get summary
router.HandleFunc("/ws/v1/commands", gohttp.WSServe(
    grpcws.NewClientStreamHandler(
        func(ctx context.Context) (pb.GameService_SendCommandsClient, error) {
            return grpcClient.SendCommands(ctx)
        },
        func() *pb.GameCommand { return &pb.GameCommand{} },
    ),
    nil,
))
Bidirectional Streaming Example
// Bidirectional streaming: Real-time game sync
router.HandleFunc("/ws/v1/sync", gohttp.WSServe(
    grpcws.NewBidiStreamHandler(
        func(ctx context.Context) (pb.GameService_SyncGameClient, error) {
            return grpcClient.SyncGame(ctx)
        },
        func() *pb.PlayerAction { return &pb.PlayerAction{} },
    ),
    nil,
))
Message Protocol

All gRPC-WS messages use a JSON envelope:

// Server → Client
{"type": "data", "data": <proto-as-json>}
{"type": "error", "error": "message"}
{"type": "stream_end"}
{"type": "ping", "pingId": 123}

// Client → Server
{"type": "data", "data": <proto-as-json>}
{"type": "pong", "pingId": 123}
{"type": "cancel"}
{"type": "end_send"}  // Half-close for client/bidi streaming
Running the Demo
# Run the grpcws demo server
go run ./cmd/grpcws-demo

# Open http://localhost:8080 in browser to test all streaming patterns

Graceful Shutdown

ListenAndServeGraceful handles signal registration, connection draining, and cleanup callbacks for production HTTP servers.

hub := gohttp.NewSSEHub[any]()

err := gohttp.ListenAndServeGraceful(srv,
    gohttp.WithDrainTimeout(10*time.Second),
    gohttp.WithOnShutdown(hub.CloseAll),  // notify SSE clients before drain
)

Options:

  • WithDrainTimeout(d) — max time to wait for in-flight requests (default 30s)
  • WithOnShutdown(fn) — callbacks invoked before drain (SSEHub.CloseAll, flush logs, etc.)
  • WithSignals(sigs...) — OS signals to catch (default SIGTERM, SIGINT)
  • WithContext(ctx) — parent context; shutdown on cancellation

OnShutdown callbacks run before srv.Shutdown() so they can send goodbye events while connections are still open.

Streamable HTTP

StreamableServe implements the "POST-that-optionally-streams" pattern from MCP 2025-03-26 Streamable HTTP. A single endpoint returns either a JSON response or an SSE event stream.

router.HandleFunc("/rpc", gohttp.StreamableServe(
    func(ctx context.Context, r *http.Request) gohttp.StreamableResponse {
        if wantsStreaming(r) {
            ch := make(chan gohttp.SSEEvent)
            go func() {
                defer close(ch)
                ch <- gohttp.SSEEvent{Data: result}
            }()
            return gohttp.StreamResponse{Events: ch}
        }
        return gohttp.SingleResponse{Body: result}
    },
    nil,
))
  • SingleResponseContent-Type: application/json, custom status codes
  • StreamResponseContent-Type: text/event-stream, channel-based for backpressure
  • Request-scoped streams (simpler than SSEConn for one-shot streaming)

Upgrading

See UPGRADING.md for migration guides (e.g., JSONConn to typed BaseConn).

Middleware Package (middleware/)

Production-grade HTTP/WebSocket middleware with zero app-specific imports. All components are nil-safe.

Middleware Purpose
ClientIPExtractor Trusted proxy IP extraction (X-Forwarded-For / X-Real-IP)
RateLimiter Token-bucket rate limiting (global + per-key with KeyFunc)
ConnLimiter Concurrent connection limiting (503 when full)
BodyLimiter Request body size limiting via http.MaxBytesReader (413 on exceed)
OriginChecker WebSocket origin allowlist
CORS Origin-aware CORS headers
RequestID X-Request-Id generation/propagation + context injection
RequestLogger Structured HTTP request logging (includes request ID when available)
Recovery Panic recovery with structured logging
HealthCheck Health/readiness endpoint (http.Handler)
Guard Composable middleware chain
ApplyDefaults http.Server timeout defaults (helper function)
Quick Start
import "github.com/panyam/servicekit/middleware"

// Compose middleware chain
guard := &middleware.Guard{}
guard.Use(
    middleware.NewRequestID().Middleware,
    middleware.RequestLogger("/healthz"),
    middleware.CORS(middleware.NewOriginChecker([]string{"*.example.com"})),
    middleware.NewBodyLimiter(1 << 20).Middleware, // 1MB
    middleware.NewRateLimiter(middleware.RateLimitConfig{PerKeyPerSec: 10}).Middleware(nil),
    middleware.NewConnLimiter(1000).Middleware,
    middleware.Recovery,
)

// Mount health check directly (bypasses Guard)
mux.Handle(middleware.NewHealthCheck().Path(), middleware.NewHealthCheck())

// Apply Guard to your routes
mux.Handle("/api/", guard.Wrap(apiHandler))

// Apply server timeout defaults
srv := &http.Server{Addr: ":8080", Handler: mux}
middleware.ApplyDefaults(srv)

Testing

See the comprehensive test file ws2_test.go for examples of:

  • Unit testing WebSocket handlers
  • Integration testing with real WebSocket connections
  • Load testing with multiple concurrent connections
  • Ping-pong mechanism testing
  • Error scenario testing

Best Practices

  1. Always call parent OnStart/OnClose: When embedding JSONConn, call the parent methods first
  2. Handle ping-pong appropriately: Implement custom ping-pong logic if needed for your use case
  3. Implement proper authentication: Never trust client-side data, validate everything server-side
  4. Use connection limits: Prevent resource exhaustion with maximum connection limits
  5. Graceful error handling: Don't crash on client errors, log and continue
  6. Resource cleanup: Always clean up resources in OnClose methods
  7. Thread safety: Use mutexes when managing shared state across connections
  8. Monitor connection health: Track metrics like connection count, error rates, and latency

Common Patterns

Hub Pattern for Broadcasting

Use a central hub to manage connections and broadcasting:

type Hub struct {
    clients    map[*Client]bool
    broadcast  chan []byte
    register   chan *Client
    unregister chan *Client
}

func (h *Hub) run() {
    for {
        select {
        case client := <-h.register:
            h.clients[client] = true
        case client := <-h.unregister:
            delete(h.clients, client)
        case message := <-h.broadcast:
            for client := range h.clients {
                client.send <- message
            }
        }
    }
}
Room-based Messaging

Organize connections into rooms for targeted messaging:

type Room struct {
    name    string
    clients map[string]*Client
    mu      sync.RWMutex
}

func (r *Room) AddClient(client *Client) {
    r.mu.Lock()
    defer r.mu.Unlock()
    r.clients[client.id] = client
}

func (r *Room) Broadcast(message any, excludeId string) {
    r.mu.RLock()
    defer r.mu.RUnlock()
    
    for id, client := range r.clients {
        if id != excludeId {
            client.Send(message)
        }
    }
}

This tutorial provides a solid foundation for building production-ready WebSocket applications with the servicekit package. The key is to leverage the built-in lifecycle hooks and connection management while implementing your application-specific logic in the message handlers.

Directories

Path Synopsis
cmd
grpcws-demo command
grpcws-demo demonstrates the grpcws package for gRPC-over-WebSocket streaming.
grpcws-demo demonstrates the grpcws package for gRPC-over-WebSocket streaming.
timews command
Package grpcws provides WebSocket transport for gRPC streaming RPCs.
Package grpcws provides WebSocket transport for gRPC streaming RPCs.
Package http provides utilities for HTTP request handling and production-grade WebSocket connections.
Package http provides utilities for HTTP request handling and production-grade WebSocket connections.
Package middleware provides reusable HTTP middleware for production hardening.
Package middleware provides reusable HTTP middleware for production hardening.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL