README
¶
ServiceKit
A comprehensive guide to using servicekit for production-grade real-time applications.
Overview
ServiceKit provides a robust, production-ready framework for WebSocket connections and Server-Sent Events (SSE) with automatic connection management, heartbeats, and lifecycle hooks. It's built on top of the Gorilla WebSocket library with additional abstractions for concurrent message handling, and includes SSE support for server-push scenarios.
Key Features
- Production-grade connection management with automatic heartbeat detection
- Lifecycle hooks for connection start, close, timeout, and error handling
- Thread-safe message broadcasting to multiple clients
- Automatic ping-pong mechanism to prevent connection timeouts
- Type-safe message handling with generics
- Pluggable Codec system for flexible message encoding (JSON, Protobuf, custom)
- Generic BaseConn[I, O] for type-safe input/output message handling
- Server-Sent Events (SSE) via
SSEConn[O]andSSEHub[O]for server-push scenarios - Graceful shutdown via
ListenAndServeGracefulwith signal handling and connection draining - Streamable HTTP via
StreamableServefor the POST-that-optionally-streams pattern (MCP 2025-03-26) - gRPC-over-WebSocket support via the
grpcwspackage for all streaming modes - Configurable timeouts and intervals for different deployment scenarios
Architecture
Core Interfaces
WSConn[I any]
The main connection interface that applications must implement:
type WSConn[I any] interface {
BiDirStreamConn[I]
ReadMessage(w *websocket.Conn) (I, error)
OnStart(conn *websocket.Conn) error
}
WSHandler[I any, S WSConn[I]]
Validates HTTP requests and creates WebSocket connections:
type WSHandler[I any, S WSConn[I]] interface {
Validate(w http.ResponseWriter, r *http.Request) (S, bool)
}
BiDirStreamConn[I any]
Provides lifecycle and message handling methods:
type BiDirStreamConn[I any] interface {
SendPing() error
Name() string
ConnId() string
HandleMessage(msg I) error
OnError(err error) error
OnClose()
OnTimeout() bool
}
Codec System
The Codec interface decouples message encoding from transport, allowing you to use different serialization formats:
type Codec[I any, O any] interface {
Decode(data []byte, msgType MessageType) (I, error)
Encode(msg O) ([]byte, MessageType, error)
}
Note: Ping/pong messages are handled at the transport layer (always JSON), not by the Codec. This separation ensures control messages work consistently even with binary codecs.
Built-in Codecs
| Codec | Input/Output Types | Wire Format | Use Case |
|---|---|---|---|
JSONCodec |
any |
JSON text | Dynamic messages, debugging |
TypedJSONCodec[I, O] |
I, O |
JSON text | Typed Go structs |
ProtoJSONCodec[I, O] |
proto.Message |
JSON text | Proto messages, human-readable |
BinaryProtoCodec[I, O] |
proto.Message |
Binary | Proto messages, max efficiency |
BaseConn[I, O]
The generic BaseConn[I, O] is the foundation for all WebSocket connections:
type BaseConn[I any, O any] struct {
Codec Codec[I, O]
Writer *conc.Writer[OutgoingMessage[O]]
NameStr string
ConnIdStr string
PingId int64
}
All writes go through OutgoingMessage[O], a union type that handles data messages (via codec), ping messages, and error messages (always JSON) through a single serialized writer, preventing concurrent write panics.
For simple JSON use cases, JSONConn is an alias for BaseConn[any, any]:
type JSONConn = BaseConn[any, any]
Basic Usage
1. Simple Echo Server
package main
import (
"log"
"net/http"
"github.com/gorilla/mux"
gohttp "github.com/panyam/servicekit/http"
)
// Use the built-in JSONConn for simple JSON message handling
type EchoConn struct {
gohttp.JSONConn
}
func (e *EchoConn) HandleMessage(msg any) error {
log.Printf("Received: %v", msg)
// Echo the message back
e.Writer.Send(gohttp.OutgoingMessage[any]{Data: &msg})
return nil
}
type EchoHandler struct{}
func (h *EchoHandler) Validate(w http.ResponseWriter, r *http.Request) (*EchoConn, bool) {
// Accept all connections - add authentication here in production
return &EchoConn{
JSONConn: gohttp.JSONConn{
Codec: &gohttp.JSONCodec{},
NameStr: "EchoConn",
},
}, true
}
func main() {
r := mux.NewRouter()
r.HandleFunc("/echo", gohttp.WSServe(&EchoHandler{}, nil))
log.Println("Echo server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", r))
}
2. Custom Message Types
// Define your message structure
type GameMessage struct {
Type string `json:"type"`
PlayerID string `json:"playerId"`
Data any `json:"data,omitempty"`
Timestamp int64 `json:"timestamp"`
}
type GameConn struct {
gohttp.JSONConn
playerID string
gameRoom string
}
func (g *GameConn) HandleMessage(msg any) error {
// Parse the generic message into our GameMessage struct
msgMap, ok := msg.(map[string]any)
if !ok {
return fmt.Errorf("invalid message format")
}
gameMsg := GameMessage{
Type: msgMap["type"].(string),
PlayerID: g.playerID,
Timestamp: time.Now().Unix(),
}
switch gameMsg.Type {
case "move":
return g.handleMove(msgMap["data"])
case "chat":
return g.handleChat(msgMap["data"])
default:
log.Printf("Unknown message type: %s", gameMsg.Type)
}
return nil
}
func (g *GameConn) OnStart(conn *websocket.Conn) error {
// Call parent OnStart first
if err := g.JSONConn.OnStart(conn); err != nil {
return err
}
// Send welcome message
welcome := GameMessage{
Type: "welcome",
Data: map[string]any{
"playerId": g.playerID,
"gameRoom": g.gameRoom,
},
Timestamp: time.Now().Unix(),
}
g.Writer.Send(gohttp.OutgoingMessage[any]{Data: &welcome})
return nil
}
Advanced Features
1. Connection Management and Broadcasting
type ChatServer struct {
clients map[string]*ChatConn
mu sync.RWMutex
}
type ChatConn struct {
gohttp.JSONConn
server *ChatServer
username string
id string
}
func (c *ChatConn) OnStart(conn *websocket.Conn) error {
if err := c.JSONConn.OnStart(conn); err != nil {
return err
}
c.id = fmt.Sprintf("user_%s", conn.RemoteAddr().String())
// Register this connection
c.server.mu.Lock()
c.server.clients[c.id] = c
c.server.mu.Unlock()
// Notify others of new user
c.server.broadcast("user_joined", map[string]any{
"username": c.username,
"userId": c.id,
}, c.id) // Exclude self
return nil
}
func (c *ChatConn) OnClose() {
// Unregister this connection
c.server.mu.Lock()
delete(c.server.clients, c.id)
c.server.mu.Unlock()
// Notify others of user leaving
c.server.broadcast("user_left", map[string]any{
"username": c.username,
"userId": c.id,
}, c.id)
c.JSONConn.OnClose()
}
func (c *ChatConn) HandleMessage(msg any) error {
msgMap := msg.(map[string]any)
switch msgMap["type"].(string) {
case "chat_message":
c.server.broadcast("chat_message", map[string]any{
"username": c.username,
"message": msgMap["message"],
"timestamp": time.Now().Unix(),
}, "")
}
return nil
}
func (s *ChatServer) broadcast(messageType string, data any, excludeId string) {
message := map[string]any{
"type": messageType,
"data": data,
}
s.mu.RLock()
defer s.mu.RUnlock()
for id, client := range s.clients {
if id != excludeId {
client.Writer.Send(gohttp.OutgoingMessage[any]{Data: &message})
}
}
}
2. Custom Ping-Pong Handling
type CustomConn struct {
gohttp.JSONConn
lastPingTime time.Time
pingCount int64
}
func (c *CustomConn) SendPing() error {
c.pingCount++
c.lastPingTime = time.Now()
pingMsg := map[string]any{
"type": "ping",
"pingId": c.pingCount,
"timestamp": c.lastPingTime.Unix(),
"connId": c.ConnId(),
}
c.Writer.Send(gohttp.OutgoingMessage[any]{Data: &pingMsg})
return nil
}
func (c *CustomConn) HandleMessage(msg any) error {
msgMap, ok := msg.(map[string]any)
if !ok {
return fmt.Errorf("invalid message format")
}
switch msgMap["type"].(string) {
case "pong":
// Handle pong response
pingId := int64(msgMap["pingId"].(float64))
latency := time.Now().Sub(c.lastPingTime)
log.Printf("Received pong for ping %d, latency: %v", pingId, latency)
return nil
case "ping":
// Respond to client ping
pongMsg := map[string]any{
"type": "pong",
"pingId": msgMap["pingId"],
"timestamp": time.Now().Unix(),
}
c.Writer.Send(gohttp.OutgoingMessage[any]{Data: &pongMsg})
return nil
default:
// Handle other message types
return c.handleBusinessLogic(msgMap)
}
}
func (c *CustomConn) OnTimeout() bool {
log.Printf("Connection %s timed out after %d pings", c.ConnId(), c.pingCount)
return true // Close the connection
}
3. Authentication and Authorization
type AuthenticatedHandler struct {
jwtSecret []byte
}
func (h *AuthenticatedHandler) Validate(w http.ResponseWriter, r *http.Request) (*SecureConn, bool) {
// Extract token from header or query parameter
token := r.Header.Get("Authorization")
if token == "" {
token = r.URL.Query().Get("token")
}
if token == "" {
http.Error(w, "Missing authentication token", http.StatusUnauthorized)
return nil, false
}
// Validate JWT token
claims, err := h.validateJWT(token)
if err != nil {
http.Error(w, "Invalid token", http.StatusUnauthorized)
return nil, false
}
// Create authenticated connection
conn := &SecureConn{
userID: claims["userId"].(string),
username: claims["username"].(string),
roles: claims["roles"].([]string),
}
return conn, true
}
type SecureConn struct {
gohttp.JSONConn
userID string
username string
roles []string
}
func (s *SecureConn) HandleMessage(msg any) error {
msgMap := msg.(map[string]any)
// Check permissions for the message type
if !s.hasPermission(msgMap["type"].(string)) {
return fmt.Errorf("insufficient permissions")
}
// Process authorized message
return s.processMessage(msgMap)
}
func (s *SecureConn) hasPermission(messageType string) bool {
// Implement role-based access control
requiredRoles := map[string][]string{
"admin_command": {"admin"},
"moderate": {"admin", "moderator"},
"chat": {"user", "moderator", "admin"},
}
required, exists := requiredRoles[messageType]
if !exists {
return false
}
for _, role := range s.roles {
for _, req := range required {
if role == req {
return true
}
}
}
return false
}
Configuration
Custom Configuration
config := &gohttp.WSConnConfig{
BiDirStreamConfig: &gohttp.BiDirStreamConfig{
PingPeriod: time.Second * 25, // Send ping every 25 seconds
PongPeriod: time.Second * 300, // Timeout after 5 minutes of no activity
},
Upgrader: websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool {
// Allow connections from specific origins
origin := r.Header.Get("Origin")
return origin == "https://yourdomain.com"
},
},
}
r.HandleFunc("/ws", gohttp.WSServe(handler, config))
Environment-specific Configurations
func getWSConfig() *gohttp.WSConnConfig {
config := gohttp.DefaultWSConnConfig()
if os.Getenv("ENV") == "production" {
// More conservative timeouts for production
config.PingPeriod = time.Second * 30
config.PongPeriod = time.Second * 60
config.Upgrader.CheckOrigin = func(r *http.Request) bool {
// Strict origin checking in production
origin := r.Header.Get("Origin")
allowedOrigins := []string{
"https://yourdomain.com",
"https://app.yourdomain.com",
}
for _, allowed := range allowedOrigins {
if origin == allowed {
return true
}
}
return false
}
} else {
// More lenient settings for development
config.PingPeriod = time.Second * 10
config.PongPeriod = time.Second * 30
}
return config
}
Frontend Integration
JavaScript Client with Ping-Pong
class WebSocketClient {
constructor(url) {
this.url = url;
this.ws = null;
this.pingInterval = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.reconnectAttempts = 0;
this.startPingInterval();
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
this.ws.onclose = () => {
console.log('WebSocket disconnected');
this.stopPingInterval();
this.attemptReconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
}
handleMessage(message) {
switch (message.type) {
case 'ping':
// Respond to server ping
this.send({
type: 'pong',
pingId: message.pingId,
timestamp: Date.now()
});
break;
case 'pong':
// Handle server pong response
console.log('Received pong from server');
break;
default:
// Handle application-specific messages
this.onMessage(message);
}
}
startPingInterval() {
// Send ping every 25 seconds to keep connection alive
this.pingInterval = setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.send({
type: 'ping',
timestamp: Date.now()
});
}
}, 25000);
}
stopPingInterval() {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
}
send(message) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
}
}
attemptReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(`Attempting reconnect ${this.reconnectAttempts}/${this.maxReconnectAttempts}`);
setTimeout(() => this.connect(), 3000 * this.reconnectAttempts);
}
}
// Override this method to handle application messages
onMessage(message) {
console.log('Received message:', message);
}
disconnect() {
this.stopPingInterval();
if (this.ws) {
this.ws.close();
this.ws = null;
}
}
}
// Usage
const client = new WebSocketClient('ws://localhost:8080/ws');
client.onMessage = (message) => {
// Handle your application-specific messages
console.log('Application message:', message);
};
client.connect();
Error Handling and Resilience
Graceful Error Handling
type ResilientConn struct {
gohttp.JSONConn
errorCount int
maxErrors int
lastErrorTime time.Time
}
func (r *ResilientConn) OnError(err error) error {
r.errorCount++
r.lastErrorTime = time.Now()
log.Printf("WebSocket error #%d: %v", r.errorCount, err)
// Close connection after too many errors
if r.errorCount > r.maxErrors {
log.Printf("Too many errors (%d), closing connection", r.errorCount)
return err // This will close the connection
}
// Reset error count after successful period
if time.Since(r.lastErrorTime) > time.Minute*5 {
r.errorCount = 0
}
return nil // Continue with connection
}
func (r *ResilientConn) OnTimeout() bool {
log.Printf("Connection timeout for %s", r.ConnId())
// Try to send a final message before closing
r.Writer.Send(gohttp.OutgoingMessage[any]{Data: &map[string]any{
"type": "timeout_warning",
"message": "Connection will be closed due to inactivity",
}})
return true // Close the connection
}
Performance Optimization
Connection Pooling and Resource Management
type ConnectionManager struct {
connections map[string]*ManagedConn
mu sync.RWMutex
maxConns int
connCount int64
}
func (cm *ConnectionManager) AddConnection(conn *ManagedConn) bool {
cm.mu.Lock()
defer cm.mu.Unlock()
if len(cm.connections) >= cm.maxConns {
log.Printf("Connection limit reached (%d), rejecting new connection", cm.maxConns)
return false
}
cm.connections[conn.ConnId()] = conn
atomic.AddInt64(&cm.connCount, 1)
return true
}
func (cm *ConnectionManager) RemoveConnection(connId string) {
cm.mu.Lock()
defer cm.mu.Unlock()
if _, exists := cm.connections[connId]; exists {
delete(cm.connections, connId)
atomic.AddInt64(&cm.connCount, -1)
}
}
func (cm *ConnectionManager) BroadcastToRoom(roomId string, message any) {
cm.mu.RLock()
defer cm.mu.RUnlock()
for _, conn := range cm.connections {
if conn.roomId == roomId {
conn.Writer.Send(gohttp.OutgoingMessage[any]{Data: &message})
}
}
}
func (cm *ConnectionManager) GetStats() map[string]any {
return map[string]any{
"total_connections": atomic.LoadInt64(&cm.connCount),
"max_connections": cm.maxConns,
}
}
Server-Sent Events (SSE)
The http package provides SSEConn[O] and SSEHub[O] for server-sent events — the write-only counterpart to WebSocket connections. SSE is ideal for server-push scenarios (notifications, live updates, streaming responses).
Basic SSE Endpoint
import gohttp "github.com/panyam/servicekit/http"
// Define your handler
type MySSEHandler struct{}
func (h *MySSEHandler) Validate(w http.ResponseWriter, r *http.Request) (*gohttp.BaseSSEConn[any], bool) {
return &gohttp.BaseSSEConn[any]{
Codec: &gohttp.JSONCodec{},
NameStr: "MySSE",
}, true
}
// Mount the endpoint
router.HandleFunc("/events", gohttp.SSEServe[any](&MySSEHandler{}, nil))
Sending Events
// Plain data event (SSE type defaults to "message")
conn.SendOutput(map[string]any{"status": "updated"})
// Named event (clients use addEventListener("alert", ...))
conn.SendEvent("alert", map[string]any{"level": "warning"})
// Named event with ID (enables reconnection via Last-Event-ID)
conn.SendEventWithID("update", "42", map[string]any{"version": 3})
Session Management with SSEHub
hub := gohttp.NewSSEHub[any]()
// Register/unregister in lifecycle hooks
hub.Register(conn)
hub.Unregister(conn.ConnId())
// Targeted and broadcast delivery
hub.Send(sessionId, msg) // one connection
hub.Broadcast(msg) // all connections
hub.BroadcastEvent("alert", msg) // all, with event type
// Graceful shutdown
hub.CloseAll()
Important Notes
- Set
WriteTimeout = 0onhttp.Serverfor SSE endpoints (long-lived connections) - Keepalive comments (
: keepalive) are sent automatically at a configurable interval (default 30s) to prevent proxy timeouts - SSE follows the WHATWG Server-Sent Events spec
gRPC-over-WebSocket (grpcws Package)
The grpcws package provides WebSocket transport for gRPC streaming RPCs, supporting all three streaming patterns with full lifecycle management.
Streaming Patterns
| Pattern | gRPC Definition | Use Case |
|---|---|---|
| Server Streaming | rpc Subscribe(Req) returns (stream Resp) |
Real-time updates, notifications |
| Client Streaming | rpc SendBatch(stream Req) returns (Resp) |
Bulk uploads, aggregations |
| Bidirectional | rpc Chat(stream Req) returns (stream Resp) |
Real-time collaboration, gaming |
Server Streaming Example
import (
"github.com/panyam/servicekit/grpcws"
gohttp "github.com/panyam/servicekit/http"
)
// Server streaming: Subscribe to game events
router.HandleFunc("/ws/v1/subscribe", gohttp.WSServe(
grpcws.NewServerStreamHandler(
func(ctx context.Context, req *pb.SubscribeRequest) (pb.GameService_SubscribeClient, error) {
return grpcClient.Subscribe(ctx, req)
},
func(r *http.Request) (*pb.SubscribeRequest, error) {
return &pb.SubscribeRequest{
GameId: mux.Vars(r)["game_id"],
}, nil
},
),
nil,
))
Client Streaming Example
// Client streaming: Send multiple commands, get summary
router.HandleFunc("/ws/v1/commands", gohttp.WSServe(
grpcws.NewClientStreamHandler(
func(ctx context.Context) (pb.GameService_SendCommandsClient, error) {
return grpcClient.SendCommands(ctx)
},
func() *pb.GameCommand { return &pb.GameCommand{} },
),
nil,
))
Bidirectional Streaming Example
// Bidirectional streaming: Real-time game sync
router.HandleFunc("/ws/v1/sync", gohttp.WSServe(
grpcws.NewBidiStreamHandler(
func(ctx context.Context) (pb.GameService_SyncGameClient, error) {
return grpcClient.SyncGame(ctx)
},
func() *pb.PlayerAction { return &pb.PlayerAction{} },
),
nil,
))
Message Protocol
All gRPC-WS messages use a JSON envelope:
// Server → Client
{"type": "data", "data": <proto-as-json>}
{"type": "error", "error": "message"}
{"type": "stream_end"}
{"type": "ping", "pingId": 123}
// Client → Server
{"type": "data", "data": <proto-as-json>}
{"type": "pong", "pingId": 123}
{"type": "cancel"}
{"type": "end_send"} // Half-close for client/bidi streaming
Running the Demo
# Run the grpcws demo server
go run ./cmd/grpcws-demo
# Open http://localhost:8080 in browser to test all streaming patterns
Graceful Shutdown
ListenAndServeGraceful handles signal registration, connection draining, and cleanup callbacks for production HTTP servers.
hub := gohttp.NewSSEHub[any]()
err := gohttp.ListenAndServeGraceful(srv,
gohttp.WithDrainTimeout(10*time.Second),
gohttp.WithOnShutdown(hub.CloseAll), // notify SSE clients before drain
)
Options:
WithDrainTimeout(d)— max time to wait for in-flight requests (default 30s)WithOnShutdown(fn)— callbacks invoked before drain (SSEHub.CloseAll, flush logs, etc.)WithSignals(sigs...)— OS signals to catch (default SIGTERM, SIGINT)WithContext(ctx)— parent context; shutdown on cancellation
OnShutdown callbacks run before srv.Shutdown() so they can send goodbye events while connections are still open.
Streamable HTTP
StreamableServe implements the "POST-that-optionally-streams" pattern from MCP 2025-03-26 Streamable HTTP. A single endpoint returns either a JSON response or an SSE event stream.
router.HandleFunc("/rpc", gohttp.StreamableServe(
func(ctx context.Context, r *http.Request) gohttp.StreamableResponse {
if wantsStreaming(r) {
ch := make(chan gohttp.SSEEvent)
go func() {
defer close(ch)
ch <- gohttp.SSEEvent{Data: result}
}()
return gohttp.StreamResponse{Events: ch}
}
return gohttp.SingleResponse{Body: result}
},
nil,
))
- SingleResponse —
Content-Type: application/json, custom status codes - StreamResponse —
Content-Type: text/event-stream, channel-based for backpressure - Request-scoped streams (simpler than SSEConn for one-shot streaming)
Upgrading
See UPGRADING.md for migration guides (e.g., JSONConn to typed BaseConn).
Middleware Package (middleware/)
Production-grade HTTP/WebSocket middleware with zero app-specific imports. All components are nil-safe.
| Middleware | Purpose |
|---|---|
| ClientIPExtractor | Trusted proxy IP extraction (X-Forwarded-For / X-Real-IP) |
| RateLimiter | Token-bucket rate limiting (global + per-key with KeyFunc) |
| ConnLimiter | Concurrent connection limiting (503 when full) |
| BodyLimiter | Request body size limiting via http.MaxBytesReader (413 on exceed) |
| OriginChecker | WebSocket origin allowlist |
| CORS | Origin-aware CORS headers |
| RequestID | X-Request-Id generation/propagation + context injection |
| RequestLogger | Structured HTTP request logging (includes request ID when available) |
| Recovery | Panic recovery with structured logging |
| HealthCheck | Health/readiness endpoint (http.Handler) |
| Guard | Composable middleware chain |
| ApplyDefaults | http.Server timeout defaults (helper function) |
Quick Start
import "github.com/panyam/servicekit/middleware"
// Compose middleware chain
guard := &middleware.Guard{}
guard.Use(
middleware.NewRequestID().Middleware,
middleware.RequestLogger("/healthz"),
middleware.CORS(middleware.NewOriginChecker([]string{"*.example.com"})),
middleware.NewBodyLimiter(1 << 20).Middleware, // 1MB
middleware.NewRateLimiter(middleware.RateLimitConfig{PerKeyPerSec: 10}).Middleware(nil),
middleware.NewConnLimiter(1000).Middleware,
middleware.Recovery,
)
// Mount health check directly (bypasses Guard)
mux.Handle(middleware.NewHealthCheck().Path(), middleware.NewHealthCheck())
// Apply Guard to your routes
mux.Handle("/api/", guard.Wrap(apiHandler))
// Apply server timeout defaults
srv := &http.Server{Addr: ":8080", Handler: mux}
middleware.ApplyDefaults(srv)
Testing
See the comprehensive test file ws2_test.go for examples of:
- Unit testing WebSocket handlers
- Integration testing with real WebSocket connections
- Load testing with multiple concurrent connections
- Ping-pong mechanism testing
- Error scenario testing
Best Practices
- Always call parent OnStart/OnClose: When embedding JSONConn, call the parent methods first
- Handle ping-pong appropriately: Implement custom ping-pong logic if needed for your use case
- Implement proper authentication: Never trust client-side data, validate everything server-side
- Use connection limits: Prevent resource exhaustion with maximum connection limits
- Graceful error handling: Don't crash on client errors, log and continue
- Resource cleanup: Always clean up resources in OnClose methods
- Thread safety: Use mutexes when managing shared state across connections
- Monitor connection health: Track metrics like connection count, error rates, and latency
Common Patterns
Hub Pattern for Broadcasting
Use a central hub to manage connections and broadcasting:
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
}
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
delete(h.clients, client)
case message := <-h.broadcast:
for client := range h.clients {
client.send <- message
}
}
}
}
Room-based Messaging
Organize connections into rooms for targeted messaging:
type Room struct {
name string
clients map[string]*Client
mu sync.RWMutex
}
func (r *Room) AddClient(client *Client) {
r.mu.Lock()
defer r.mu.Unlock()
r.clients[client.id] = client
}
func (r *Room) Broadcast(message any, excludeId string) {
r.mu.RLock()
defer r.mu.RUnlock()
for id, client := range r.clients {
if id != excludeId {
client.Send(message)
}
}
}
This tutorial provides a solid foundation for building production-ready WebSocket applications with the servicekit package. The key is to leverage the built-in lifecycle hooks and connection management while implementing your application-specific logic in the message handlers.
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
grpcws-demo
command
grpcws-demo demonstrates the grpcws package for gRPC-over-WebSocket streaming.
|
grpcws-demo demonstrates the grpcws package for gRPC-over-WebSocket streaming. |
|
timews
command
|
|
|
Package grpcws provides WebSocket transport for gRPC streaming RPCs.
|
Package grpcws provides WebSocket transport for gRPC streaming RPCs. |
|
Package http provides utilities for HTTP request handling and production-grade WebSocket connections.
|
Package http provides utilities for HTTP request handling and production-grade WebSocket connections. |
|
Package middleware provides reusable HTTP middleware for production hardening.
|
Package middleware provides reusable HTTP middleware for production hardening. |