Documentation
¶
Overview ¶
Package ws provides a WebSocket connection hub for managing user and device relay connections with subscription-based broadcasting.
The hub tracks two types of connections:
- User connections: a user can have multiple concurrent WebSocket connections (e.g., multiple browser tabs). The hub detects first-connect and last-disconnect.
- Device connections: each device has exactly one relay connection (the phone bridging BLE). Used for sending commands to the device.
Subscriptions let you broadcast device state updates to all users who should see them (e.g., everyone who has access to a device).
Index ¶
- Constants
- Variables
- func HandleWebSocket(hub *Hub, w http.ResponseWriter, r *http.Request, userID string, ...) error
- func ReadPump(hub *Hub, client *Client, onMessage MessageHandler)
- func WritePump(client *Client)
- type Client
- type DeviceConnectHandler
- type DeviceDisconnectHandler
- type Hub
- func (h *Hub) BroadcastToDevice(deviceID string, msg Message)
- func (h *Hub) InvalidateDeviceUsers(deviceID string)
- func (h *Hub) InvalidateUserDevices(userID string)
- func (h *Hub) IsDeviceOnline(deviceID string) bool
- func (h *Hub) IsUserOnline(userID string) bool
- func (h *Hub) LoadUserSubscriptions(userID string)
- func (h *Hub) Register(client *Client)
- func (h *Hub) RemoveUserSubscriptions(userID string)
- func (h *Hub) Run()
- func (h *Hub) SendToDevice(deviceID string, msg Message) bool
- func (h *Hub) SendToUser(userID string, msg Message)
- func (h *Hub) SetOnDeviceConnect(fn DeviceConnectHandler)
- func (h *Hub) SetOnDeviceDisconnect(fn DeviceDisconnectHandler)
- func (h *Hub) SetOnUserConnect(fn UserConnectHandler)
- func (h *Hub) SetOnUserDisconnect(fn UserDisconnectHandler)
- func (h *Hub) SetSubscriptionLoader(loader SubscriptionLoader)
- func (h *Hub) Unregister(client *Client)
- type Message
- type MessageHandler
- type OnConnectHook
- type RateLimiter
- type SubscriptionLoader
- type UserConnectHandler
- type UserDisconnectHandler
Constants ¶
Variables ¶
var Upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true }, }
Upgrader is the default WebSocket upgrader. Override CheckOrigin for production.
Functions ¶
func HandleWebSocket ¶
func HandleWebSocket(hub *Hub, w http.ResponseWriter, r *http.Request, userID string, deviceID string, onMessage MessageHandler, onConnect ...OnConnectHook) error
HandleWebSocket is a framework-agnostic WebSocket upgrade handler. It upgrades the HTTP connection, creates a Client, registers it with the hub, and starts the read/write pumps.
Parameters:
- hub: the connection hub
- userID: identifier for the connecting user
- deviceID: identifier for the device being relayed (empty string for user-only connections)
- onMessage: callback for incoming messages
- onConnect: optional hooks called after registration
This function blocks until the connection closes. Call it from your HTTP handler.
func ReadPump ¶
func ReadPump(hub *Hub, client *Client, onMessage MessageHandler)
ReadPump reads messages from the WebSocket connection until it closes. It handles pong keep-alive and delegates messages to onMessage.
Types ¶
type Client ¶
type Client struct {
Conn *websocket.Conn
UserID string
DeviceID string // non-empty only for device relay connections
Send chan []byte
RateLimiter *RateLimiter
}
Client represents a single WebSocket connection.
type DeviceConnectHandler ¶
type DeviceConnectHandler func(deviceID string)
DeviceConnectHandler is called when a device relay connection opens.
type DeviceDisconnectHandler ¶
type DeviceDisconnectHandler func(deviceID string)
DeviceDisconnectHandler is called when a device relay connection closes.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub manages all active WebSocket connections and provides:
- Multi-connection per user (tabs, devices)
- Single device relay connection per device
- First-connect / last-disconnect lifecycle hooks
- Subscription-based broadcasting (device updates → interested users)
func NewHub ¶
func NewHub() *Hub
NewHub creates a new connection hub. Call Run() in a goroutine to start processing connections.
func (*Hub) BroadcastToDevice ¶
BroadcastToDevice sends a message to all users subscribed to a device. Use this for device state updates (locked/unlocked, battery, online/offline).
func (*Hub) InvalidateDeviceUsers ¶
InvalidateDeviceUsers reloads subscriptions for all users subscribed to a device. Call this when a device's access list changes.
func (*Hub) InvalidateUserDevices ¶
InvalidateUserDevices reloads subscriptions for a user. Call this when a user's device access changes (e.g., new contract, permission change).
func (*Hub) IsDeviceOnline ¶
IsDeviceOnline returns true if a device has an active relay connection.
func (*Hub) IsUserOnline ¶
IsUserOnline returns true if a user has any active connections.
func (*Hub) LoadUserSubscriptions ¶
LoadUserSubscriptions queries the subscription loader for a user's device access and registers the subscriptions for broadcasting.
func (*Hub) RemoveUserSubscriptions ¶
RemoveUserSubscriptions removes all device subscriptions for a user.
func (*Hub) Run ¶
func (h *Hub) Run()
Run processes client registration and unregistration. Run this in a goroutine. It blocks forever.
func (*Hub) SendToDevice ¶
SendToDevice sends a message to a device's relay connection. Returns false if the device is not connected or the send buffer is full.
func (*Hub) SendToUser ¶
SendToUser sends a message to all of a user's connections.
func (*Hub) SetOnDeviceConnect ¶
func (h *Hub) SetOnDeviceConnect(fn DeviceConnectHandler)
SetOnDeviceConnect sets a callback fired when a device relay connects.
func (*Hub) SetOnDeviceDisconnect ¶
func (h *Hub) SetOnDeviceDisconnect(fn DeviceDisconnectHandler)
SetOnDeviceDisconnect sets a callback fired when a device relay disconnects.
func (*Hub) SetOnUserConnect ¶
func (h *Hub) SetOnUserConnect(fn UserConnectHandler)
SetOnUserConnect sets a callback fired when a user's first connection opens.
func (*Hub) SetOnUserDisconnect ¶
func (h *Hub) SetOnUserDisconnect(fn UserDisconnectHandler)
SetOnUserDisconnect sets a callback fired when a user's last connection closes.
func (*Hub) SetSubscriptionLoader ¶
func (h *Hub) SetSubscriptionLoader(loader SubscriptionLoader)
SetSubscriptionLoader sets the function used to load device subscriptions.
func (*Hub) Unregister ¶
Unregister queues a client for removal. Non-blocking.
type Message ¶
type Message struct {
Type relay.MessageType `json:"type"`
DeviceID string `json:"deviceId,omitempty"`
UserID string `json:"userId,omitempty"`
Payload json.RawMessage `json:"payload,omitempty"`
}
Message is the envelope for all WebSocket messages.
type MessageHandler ¶
MessageHandler is called for each incoming WebSocket message.
type OnConnectHook ¶
type OnConnectHook func(client *Client)
OnConnectHook is called after a client is registered.
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements a fixed-window token bucket for per-connection rate limiting. Each window starts with maxTokens; each Allow() call consumes one. Tokens refill when the window expires.
func NewRateLimiter ¶
func NewRateLimiter(maxPerWindow int, window time.Duration) *RateLimiter
NewRateLimiter creates a rate limiter allowing maxPerWindow messages within each window duration.
func (*RateLimiter) Allow ¶
func (rl *RateLimiter) Allow() bool
Allow returns true if the client is within rate limits and consumes one token.
func (*RateLimiter) RetryAfterMs ¶
func (rl *RateLimiter) RetryAfterMs() int64
RetryAfterMs returns milliseconds until the rate limit window resets.
type SubscriptionLoader ¶
SubscriptionLoader returns the device IDs a user should receive broadcasts for. Implement this to query your database for device access (ownership, permissions, etc.).
type UserConnectHandler ¶
type UserConnectHandler func(userID string)
UserConnectHandler is called when a user's first connection opens.
type UserDisconnectHandler ¶
type UserDisconnectHandler func(userID string)
UserDisconnectHandler is called when a user's last connection closes.