bridgekit

Go toolkit for server-to-device communication through intermediary bridges. WebSocket hub, request-response correlation, offline command queue.
The Problem
Your server needs to send commands to a BLE device, but BLE is a local radio protocol — servers can't reach devices directly. A phone physically near the device acts as a bridge.
Server ←──WebSocket──→ Phone App ←──BLE──→ Device
This library handles the server side of that architecture:
- WebSocket hub — manages connections from phones and users, with lifecycle hooks and subscription-based broadcasting
- Session manager — correlates outgoing commands with responses using request IDs and timeouts
- Offline queue — stores commands when a device is offline, delivers them on reconnect
- Device config registry — maps device types to their BLE service/characteristic UUIDs
Install
go get github.com/khavrks/bridgekit
Packages
| Package |
What it does |
relay |
BLE protocol types, session manager, device config registry |
ws |
WebSocket hub, client lifecycle, read/write pumps, rate limiter |
queue |
Offline command queue interface + 4 backends (Memory, Postgres, Redis, RabbitMQ) |
Quick Start
package main
import (
"github.com/khavrks/bridgekit/relay"
"github.com/khavrks/bridgekit/ws"
"github.com/khavrks/bridgekit/queue"
)
func main() {
// 1. Create and run the WebSocket hub
hub := ws.NewHub()
go hub.Run()
// 2. Register device BLE configs
configs := relay.NewConfigRegistry(relay.DeviceConfig{
ServiceUUID: "0000fff0-0000-1000-8000-00805f9b34fb",
WriteUUID: "0000fff1-0000-1000-8000-00805f9b34fb",
ListenUUID: "0000fff2-0000-1000-8000-00805f9b34fb",
})
// 3. Create session manager (sends commands through the hub)
sessions := relay.NewSessionManager(func(deviceID string, req relay.WriteRequest) bool {
return hub.SendToDevice(deviceID, ws.Message{Type: "ble_write", DeviceID: deviceID})
}, configs)
// 4. Send a command (blocks until response or timeout)
resp, err := sessions.SendCommand("device-123", "smart-lock", "aabbccdd", 10*time.Second)
}
Architecture
WebSocket Hub
The hub manages two types of connections:
- User connections — a user can have multiple (tabs, devices). The hub detects first-connect and last-disconnect.
- Device connections — one per device. This is the phone bridging BLE.
hub := ws.NewHub()
hub.SetOnDeviceConnect(func(deviceID string) {
log.Printf("device %s is online", deviceID)
})
hub.SetOnDeviceDisconnect(func(deviceID string) {
log.Printf("device %s went offline", deviceID)
})
// Subscription-based broadcasting: notify all users who care about a device
hub.SetSubscriptionLoader(func(userID string) []string {
return db.GetDeviceIDsForUser(userID) // your DB query
})
hub.BroadcastToDevice("device-123", ws.Message{Type: "state_changed"})
Session Manager
Request-response correlation over an async WebSocket channel:
// Server sends command → phone writes to BLE → device responds → phone sends back
resp, err := sessions.SendCommand("device-123", "lock-type", "hex-payload", 10*time.Second)
// In your WebSocket message handler, route responses back:
sessions.HandleResponse(bleResponse)
Offline Queue
Commands for offline devices are stored and delivered on reconnect:
// Pick your backend — all implement queue.Store
q := queue.NewMemoryStore() // dev/testing
q := queue.NewPostgresStore(pgPool) // durable, battle-tested
q := queue.NewRedisStore(redisClient) // fast, natural TTL
q, _ := queue.NewRabbitMQStore(amqpConn, queue.RabbitMQConfig{}) // reliable delivery
// Queue a command (with dedup — same type overwrites)
q.EnqueueOrUpdate(ctx, "device-123", "lock", payload, 1*time.Hour)
// On reconnect, drain all pending commands
payloads, _ := q.DrainPending(ctx, "device-123")
// Commands requiring approval before execution
cmdID, _ := q.EnqueueWithConfirmation(ctx, "device-123", "unlock", payload, 1*time.Hour)
q.ConfirmCommand(ctx, cmdID) // user approves
// or: q.CancelCommand(ctx, cmdID) // user denies
RabbitMQ Consumer
RabbitMQ also supports real-time consumption from per-device queues:
rmq, _ := queue.NewRabbitMQStore(conn, queue.RabbitMQConfig{})
// When a device connects, start consuming its queue
payloads, _ := rmq.Consume(ctx, "device-123")
go func() {
for payload := range payloads {
sessions.SendCommand("device-123", "lock-type", string(payload), 10*time.Second)
}
}()
Choosing a Backend
| Backend |
Durability |
Speed |
Best for |
| Memory |
None (lost on restart) |
Fastest |
Testing, development |
| Postgres |
Full |
Moderate |
Primary store, complex queries, existing PG infra |
| Redis |
Configurable (AOF/RDB) |
Fast |
High throughput, natural TTL, existing Redis infra |
| RabbitMQ |
Full (persistent msgs) |
Fast |
Reliable delivery, fan-out, existing AMQP infra |
Framework-Agnostic WebSocket Handler
The handler works with any HTTP framework — just pass http.ResponseWriter and *http.Request:
// net/http
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
ws.HandleWebSocket(hub, w, r, userID, deviceID, onMessage)
})
// Fiber (via fasthttpadaptor)
// Chi, Gin, Echo — same pattern, extract w and r from your framework
Postgres Schema
If using PostgresStore, create this table:
CREATE TABLE command_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
device_id TEXT NOT NULL,
command_type TEXT,
payload BYTEA NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
delivered BOOLEAN NOT NULL DEFAULT false,
requires_confirmation BOOLEAN NOT NULL DEFAULT false,
confirmed BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE UNIQUE INDEX idx_command_queue_dedup
ON command_queue (device_id, command_type)
WHERE delivered = false;
Phone App Side
Your phone app needs to:
- Connect to the WebSocket server with
?userId=X&deviceId=Y
- Connect to the BLE device
- When receiving a
ble_write message:
- Write
payload (hex-decoded) to the device's write characteristic
- Read the response from the listen characteristic
- Send back a
ble_response with the same requestId
- Send
ble_status messages when BLE connectivity changes
See the protocol types in relay/protocol.go for message formats.
Dependencies
License
MIT