Documentation
¶
Overview ¶
Package queue provides an offline command queue for devices that may not be connected when a command is issued. Commands are stored and delivered when the device reconnects.
Features:
- TTL-based expiration (commands expire if not delivered in time)
- Command deduplication by type (latest intent wins, not all intermediate commands)
- Optional confirmation workflow (command waits for approval before delivery)
- Drain-on-reconnect (deliver all pending commands when device comes online)
Index ¶
- type MemoryStore
- func (m *MemoryStore) CancelCommand(_ context.Context, commandID string) error
- func (m *MemoryStore) Cleanup(_ context.Context) (int64, error)
- func (m *MemoryStore) ConfirmCommand(_ context.Context, commandID string) error
- func (m *MemoryStore) DrainPending(_ context.Context, deviceID string) ([][]byte, error)
- func (m *MemoryStore) Enqueue(_ context.Context, deviceID string, payload []byte, ttl time.Duration) error
- func (m *MemoryStore) EnqueueOrUpdate(_ context.Context, deviceID, commandType string, payload []byte, ...) error
- func (m *MemoryStore) EnqueueWithConfirmation(_ context.Context, deviceID, commandType string, payload []byte, ...) (string, error)
- func (m *MemoryStore) GetPending(_ context.Context, deviceID string) ([]PendingCommand, error)
- func (m *MemoryStore) GetPendingConfirmations(_ context.Context, deviceID string) ([]PendingCommand, error)
- type PendingCommand
- type PostgresStore
- func (s *PostgresStore) CancelCommand(ctx context.Context, commandID string) error
- func (s *PostgresStore) Cleanup(ctx context.Context) (int64, error)
- func (s *PostgresStore) ConfirmCommand(ctx context.Context, commandID string) error
- func (s *PostgresStore) DrainPending(ctx context.Context, deviceID string) ([][]byte, error)
- func (s *PostgresStore) Enqueue(ctx context.Context, deviceID string, payload []byte, ttl time.Duration) error
- func (s *PostgresStore) EnqueueOrUpdate(ctx context.Context, deviceID, commandType string, payload []byte, ...) error
- func (s *PostgresStore) EnqueueWithConfirmation(ctx context.Context, deviceID, commandType string, payload []byte, ...) (string, error)
- func (s *PostgresStore) GetPending(ctx context.Context, deviceID string) ([]PendingCommand, error)
- func (s *PostgresStore) GetPendingConfirmations(ctx context.Context, deviceID string) ([]PendingCommand, error)
- type RabbitMQConfig
- type RabbitMQStore
- func (s *RabbitMQStore) CancelCommand(_ context.Context, commandID string) error
- func (s *RabbitMQStore) Cleanup(_ context.Context) (int64, error)
- func (s *RabbitMQStore) Close() error
- func (s *RabbitMQStore) ConfirmCommand(ctx context.Context, commandID string) error
- func (s *RabbitMQStore) Consume(ctx context.Context, deviceID string) (<-chan []byte, error)
- func (s *RabbitMQStore) DrainPending(_ context.Context, deviceID string) ([][]byte, error)
- func (s *RabbitMQStore) Enqueue(ctx context.Context, deviceID string, payload []byte, ttl time.Duration) error
- func (s *RabbitMQStore) EnqueueOrUpdate(ctx context.Context, deviceID, commandType string, payload []byte, ...) error
- func (s *RabbitMQStore) EnqueueWithConfirmation(ctx context.Context, deviceID, commandType string, payload []byte, ...) (string, error)
- func (s *RabbitMQStore) EnsureQueue(deviceID string) error
- func (s *RabbitMQStore) GetPending(_ context.Context, deviceID string) ([]PendingCommand, error)
- func (s *RabbitMQStore) GetPendingConfirmations(_ context.Context, deviceID string) ([]PendingCommand, error)
- type RedisStore
- func (s *RedisStore) CancelCommand(ctx context.Context, commandID string) error
- func (s *RedisStore) Cleanup(ctx context.Context) (int64, error)
- func (s *RedisStore) ConfirmCommand(ctx context.Context, commandID string) error
- func (s *RedisStore) DrainPending(ctx context.Context, deviceID string) ([][]byte, error)
- func (s *RedisStore) Enqueue(ctx context.Context, deviceID string, payload []byte, ttl time.Duration) error
- func (s *RedisStore) EnqueueOrUpdate(ctx context.Context, deviceID, commandType string, payload []byte, ...) error
- func (s *RedisStore) EnqueueWithConfirmation(ctx context.Context, deviceID, commandType string, payload []byte, ...) (string, error)
- func (s *RedisStore) GetPending(ctx context.Context, deviceID string) ([]PendingCommand, error)
- func (s *RedisStore) GetPendingConfirmations(ctx context.Context, deviceID string) ([]PendingCommand, error)
- type Store
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore is an in-memory implementation of Store, useful for testing and development. Not suitable for production (no persistence across restarts).
func NewMemoryStore ¶
func NewMemoryStore() *MemoryStore
NewMemoryStore creates an in-memory command queue.
func (*MemoryStore) CancelCommand ¶
func (m *MemoryStore) CancelCommand(_ context.Context, commandID string) error
func (*MemoryStore) ConfirmCommand ¶
func (m *MemoryStore) ConfirmCommand(_ context.Context, commandID string) error
func (*MemoryStore) DrainPending ¶
func (*MemoryStore) EnqueueOrUpdate ¶
func (*MemoryStore) EnqueueWithConfirmation ¶
func (*MemoryStore) GetPending ¶
func (m *MemoryStore) GetPending(_ context.Context, deviceID string) ([]PendingCommand, error)
func (*MemoryStore) GetPendingConfirmations ¶
func (m *MemoryStore) GetPendingConfirmations(_ context.Context, deviceID string) ([]PendingCommand, error)
type PendingCommand ¶
type PendingCommand struct {
ID string
CommandType string
Payload []byte
RequiresConfirmation bool
}
PendingCommand represents a queued command awaiting delivery.
type PostgresStore ¶
type PostgresStore struct {
// contains filtered or unexported fields
}
PostgresStore implements Store using PostgreSQL.
Required table schema:
CREATE TABLE command_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
device_id TEXT NOT NULL,
command_type TEXT,
payload BYTEA NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
delivered BOOLEAN NOT NULL DEFAULT false,
requires_confirmation BOOLEAN NOT NULL DEFAULT false,
confirmed BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE UNIQUE INDEX idx_command_queue_dedup
ON command_queue (device_id, command_type)
WHERE delivered = false;
func NewPostgresStore ¶
func NewPostgresStore(db *pgxpool.Pool) *PostgresStore
NewPostgresStore creates a Postgres-backed command queue.
func (*PostgresStore) CancelCommand ¶
func (s *PostgresStore) CancelCommand(ctx context.Context, commandID string) error
func (*PostgresStore) ConfirmCommand ¶
func (s *PostgresStore) ConfirmCommand(ctx context.Context, commandID string) error
func (*PostgresStore) DrainPending ¶
func (*PostgresStore) EnqueueOrUpdate ¶
func (*PostgresStore) EnqueueWithConfirmation ¶
func (*PostgresStore) GetPending ¶
func (s *PostgresStore) GetPending(ctx context.Context, deviceID string) ([]PendingCommand, error)
func (*PostgresStore) GetPendingConfirmations ¶
func (s *PostgresStore) GetPendingConfirmations(ctx context.Context, deviceID string) ([]PendingCommand, error)
type RabbitMQConfig ¶
type RabbitMQConfig struct {
// Exchange name for command messages. Default: "ble.commands"
Exchange string
// Whether the exchange and queues should be declared as durable. Default: true
Durable bool
}
RabbitMQConfig configures the RabbitMQ store.
type RabbitMQStore ¶
type RabbitMQStore struct {
// contains filtered or unexported fields
}
RabbitMQStore implements Store using RabbitMQ for command delivery with an in-memory index for confirmation tracking and dedup.
Best for: reliable delivery with message persistence, fan-out to multiple consumers, integration with existing AMQP infrastructure.
Trade-off: confirmation/dedup state is in-memory (lost on restart). For full durability, use Postgres or Redis for state and RabbitMQ only for delivery notification.
func NewRabbitMQStore ¶
func NewRabbitMQStore(conn *amqp.Connection, cfg RabbitMQConfig) (*RabbitMQStore, error)
NewRabbitMQStore creates a RabbitMQ-backed command queue. The connection and channel should already be established.
func (*RabbitMQStore) CancelCommand ¶
func (s *RabbitMQStore) CancelCommand(_ context.Context, commandID string) error
func (*RabbitMQStore) Close ¶
func (s *RabbitMQStore) Close() error
Close closes the channel (not the connection).
func (*RabbitMQStore) ConfirmCommand ¶
func (s *RabbitMQStore) ConfirmCommand(ctx context.Context, commandID string) error
func (*RabbitMQStore) Consume ¶
Consume starts consuming messages from a device's queue. Call this when a device connects and you want to receive queued commands. The returned channel delivers messages until the context is cancelled or the connection drops.
This is an alternative to DrainPending — use Consume for real-time delivery from the broker, or DrainPending to pull from the in-memory index.
func (*RabbitMQStore) DrainPending ¶
func (*RabbitMQStore) EnqueueOrUpdate ¶
func (*RabbitMQStore) EnqueueWithConfirmation ¶
func (*RabbitMQStore) EnsureQueue ¶
func (s *RabbitMQStore) EnsureQueue(deviceID string) error
EnsureQueue declares the queue for a device and binds it to the exchange. Call this when a device connects.
func (*RabbitMQStore) GetPending ¶
func (s *RabbitMQStore) GetPending(_ context.Context, deviceID string) ([]PendingCommand, error)
func (*RabbitMQStore) GetPendingConfirmations ¶
func (s *RabbitMQStore) GetPendingConfirmations(_ context.Context, deviceID string) ([]PendingCommand, error)
type RedisStore ¶
type RedisStore struct {
// contains filtered or unexported fields
}
RedisStore implements Store using Redis. Commands are stored as hashes with a sorted set index per device for ordered retrieval.
Good for: fast access, natural TTL support, no schema migrations. Trade-off: less durable than Postgres (depends on your Redis persistence config).
func NewRedisStore ¶
func NewRedisStore(rdb *redis.Client) *RedisStore
NewRedisStore creates a Redis-backed command queue.
func (*RedisStore) CancelCommand ¶
func (s *RedisStore) CancelCommand(ctx context.Context, commandID string) error
func (*RedisStore) ConfirmCommand ¶
func (s *RedisStore) ConfirmCommand(ctx context.Context, commandID string) error
func (*RedisStore) DrainPending ¶
func (*RedisStore) EnqueueOrUpdate ¶
func (*RedisStore) EnqueueWithConfirmation ¶
func (*RedisStore) GetPending ¶
func (s *RedisStore) GetPending(ctx context.Context, deviceID string) ([]PendingCommand, error)
func (*RedisStore) GetPendingConfirmations ¶
func (s *RedisStore) GetPendingConfirmations(ctx context.Context, deviceID string) ([]PendingCommand, error)
type Store ¶
type Store interface {
// Enqueue adds a command to the queue.
Enqueue(ctx context.Context, deviceID string, payload []byte, ttl time.Duration) error
// EnqueueOrUpdate upserts a command using commandType as a dedup key.
// If an undelivered command of the same type exists, it updates the payload
// and expiry instead of inserting a duplicate.
EnqueueOrUpdate(ctx context.Context, deviceID, commandType string, payload []byte, ttl time.Duration) error
// EnqueueWithConfirmation adds a command that requires confirmation before delivery.
// Returns the command ID for later confirmation or cancellation.
EnqueueWithConfirmation(ctx context.Context, deviceID, commandType string, payload []byte, ttl time.Duration) (string, error)
// GetPending returns all unexpired, undelivered commands without marking them delivered.
GetPending(ctx context.Context, deviceID string) ([]PendingCommand, error)
// DrainPending returns and marks as delivered all commands that are ready
// (not requiring confirmation, or already confirmed).
DrainPending(ctx context.Context, deviceID string) ([][]byte, error)
// GetPendingConfirmations returns commands awaiting confirmation.
GetPendingConfirmations(ctx context.Context, deviceID string) ([]PendingCommand, error)
// ConfirmCommand marks a command as confirmed so it will be included in the next drain.
ConfirmCommand(ctx context.Context, commandID string) error
// CancelCommand removes a queued command.
CancelCommand(ctx context.Context, commandID string) error
// Cleanup removes expired commands. Returns the number of commands removed.
Cleanup(ctx context.Context) (int64, error)
}
Store defines the interface for an offline command queue. Implement this against your preferred storage backend (Postgres, SQLite, Redis, etc.).