queue

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package queue provides an offline command queue for devices that may not be connected when a command is issued. Commands are stored and delivered when the device reconnects.

Features:

  • TTL-based expiration (commands expire if not delivered in time)
  • Command deduplication by type (latest intent wins, not all intermediate commands)
  • Optional confirmation workflow (command waits for approval before delivery)
  • Drain-on-reconnect (deliver all pending commands when device comes online)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is an in-memory implementation of Store, useful for testing and development. Not suitable for production (no persistence across restarts).

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore creates an in-memory command queue.

func (*MemoryStore) CancelCommand

func (m *MemoryStore) CancelCommand(_ context.Context, commandID string) error

func (*MemoryStore) Cleanup

func (m *MemoryStore) Cleanup(_ context.Context) (int64, error)

func (*MemoryStore) ConfirmCommand

func (m *MemoryStore) ConfirmCommand(_ context.Context, commandID string) error

func (*MemoryStore) DrainPending

func (m *MemoryStore) DrainPending(_ context.Context, deviceID string) ([][]byte, error)

func (*MemoryStore) Enqueue

func (m *MemoryStore) Enqueue(_ context.Context, deviceID string, payload []byte, ttl time.Duration) error

func (*MemoryStore) EnqueueOrUpdate

func (m *MemoryStore) EnqueueOrUpdate(_ context.Context, deviceID, commandType string, payload []byte, ttl time.Duration) error

func (*MemoryStore) EnqueueWithConfirmation

func (m *MemoryStore) EnqueueWithConfirmation(_ context.Context, deviceID, commandType string, payload []byte, ttl time.Duration) (string, error)

func (*MemoryStore) GetPending

func (m *MemoryStore) GetPending(_ context.Context, deviceID string) ([]PendingCommand, error)

func (*MemoryStore) GetPendingConfirmations

func (m *MemoryStore) GetPendingConfirmations(_ context.Context, deviceID string) ([]PendingCommand, error)

type PendingCommand

type PendingCommand struct {
	ID                   string
	CommandType          string
	Payload              []byte
	RequiresConfirmation bool
}

PendingCommand represents a queued command awaiting delivery.

type PostgresStore

type PostgresStore struct {
	// contains filtered or unexported fields
}

PostgresStore implements Store using PostgreSQL.

Required table schema:

CREATE TABLE command_queue (
    id                    UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    device_id             TEXT NOT NULL,
    command_type          TEXT,
    payload               BYTEA NOT NULL,
    expires_at            TIMESTAMPTZ NOT NULL,
    delivered             BOOLEAN NOT NULL DEFAULT false,
    requires_confirmation BOOLEAN NOT NULL DEFAULT false,
    confirmed             BOOLEAN NOT NULL DEFAULT false,
    created_at            TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE UNIQUE INDEX idx_command_queue_dedup
    ON command_queue (device_id, command_type)
    WHERE delivered = false;

func NewPostgresStore

func NewPostgresStore(db *pgxpool.Pool) *PostgresStore

NewPostgresStore creates a Postgres-backed command queue.

func (*PostgresStore) CancelCommand

func (s *PostgresStore) CancelCommand(ctx context.Context, commandID string) error

func (*PostgresStore) Cleanup

func (s *PostgresStore) Cleanup(ctx context.Context) (int64, error)

func (*PostgresStore) ConfirmCommand

func (s *PostgresStore) ConfirmCommand(ctx context.Context, commandID string) error

func (*PostgresStore) DrainPending

func (s *PostgresStore) DrainPending(ctx context.Context, deviceID string) ([][]byte, error)

func (*PostgresStore) Enqueue

func (s *PostgresStore) Enqueue(ctx context.Context, deviceID string, payload []byte, ttl time.Duration) error

func (*PostgresStore) EnqueueOrUpdate

func (s *PostgresStore) EnqueueOrUpdate(ctx context.Context, deviceID, commandType string, payload []byte, ttl time.Duration) error

func (*PostgresStore) EnqueueWithConfirmation

func (s *PostgresStore) EnqueueWithConfirmation(ctx context.Context, deviceID, commandType string, payload []byte, ttl time.Duration) (string, error)

func (*PostgresStore) GetPending

func (s *PostgresStore) GetPending(ctx context.Context, deviceID string) ([]PendingCommand, error)

func (*PostgresStore) GetPendingConfirmations

func (s *PostgresStore) GetPendingConfirmations(ctx context.Context, deviceID string) ([]PendingCommand, error)

type RabbitMQConfig

type RabbitMQConfig struct {
	// Exchange name for command messages. Default: "ble.commands"
	Exchange string
	// Whether the exchange and queues should be declared as durable. Default: true
	Durable bool
}

RabbitMQConfig configures the RabbitMQ store.

type RabbitMQStore

type RabbitMQStore struct {
	// contains filtered or unexported fields
}

RabbitMQStore implements Store using RabbitMQ for command delivery with an in-memory index for confirmation tracking and dedup.

Best for: reliable delivery with message persistence, fan-out to multiple consumers, integration with existing AMQP infrastructure.

Trade-off: confirmation/dedup state is in-memory (lost on restart). For full durability, use Postgres or Redis for state and RabbitMQ only for delivery notification.

func NewRabbitMQStore

func NewRabbitMQStore(conn *amqp.Connection, cfg RabbitMQConfig) (*RabbitMQStore, error)

NewRabbitMQStore creates a RabbitMQ-backed command queue. The connection and channel should already be established.

func (*RabbitMQStore) CancelCommand

func (s *RabbitMQStore) CancelCommand(_ context.Context, commandID string) error

func (*RabbitMQStore) Cleanup

func (s *RabbitMQStore) Cleanup(_ context.Context) (int64, error)

func (*RabbitMQStore) Close

func (s *RabbitMQStore) Close() error

Close closes the channel (not the connection).

func (*RabbitMQStore) ConfirmCommand

func (s *RabbitMQStore) ConfirmCommand(ctx context.Context, commandID string) error

func (*RabbitMQStore) Consume

func (s *RabbitMQStore) Consume(ctx context.Context, deviceID string) (<-chan []byte, error)

Consume starts consuming messages from a device's queue. Call this when a device connects and you want to receive queued commands. The returned channel delivers messages until the context is cancelled or the connection drops.

This is an alternative to DrainPending — use Consume for real-time delivery from the broker, or DrainPending to pull from the in-memory index.

func (*RabbitMQStore) DrainPending

func (s *RabbitMQStore) DrainPending(_ context.Context, deviceID string) ([][]byte, error)

func (*RabbitMQStore) Enqueue

func (s *RabbitMQStore) Enqueue(ctx context.Context, deviceID string, payload []byte, ttl time.Duration) error

func (*RabbitMQStore) EnqueueOrUpdate

func (s *RabbitMQStore) EnqueueOrUpdate(ctx context.Context, deviceID, commandType string, payload []byte, ttl time.Duration) error

func (*RabbitMQStore) EnqueueWithConfirmation

func (s *RabbitMQStore) EnqueueWithConfirmation(ctx context.Context, deviceID, commandType string, payload []byte, ttl time.Duration) (string, error)

func (*RabbitMQStore) EnsureQueue

func (s *RabbitMQStore) EnsureQueue(deviceID string) error

EnsureQueue declares the queue for a device and binds it to the exchange. Call this when a device connects.

func (*RabbitMQStore) GetPending

func (s *RabbitMQStore) GetPending(_ context.Context, deviceID string) ([]PendingCommand, error)

func (*RabbitMQStore) GetPendingConfirmations

func (s *RabbitMQStore) GetPendingConfirmations(_ context.Context, deviceID string) ([]PendingCommand, error)

type RedisStore

type RedisStore struct {
	// contains filtered or unexported fields
}

RedisStore implements Store using Redis. Commands are stored as hashes with a sorted set index per device for ordered retrieval.

Good for: fast access, natural TTL support, no schema migrations. Trade-off: less durable than Postgres (depends on your Redis persistence config).

func NewRedisStore

func NewRedisStore(rdb *redis.Client) *RedisStore

NewRedisStore creates a Redis-backed command queue.

func (*RedisStore) CancelCommand

func (s *RedisStore) CancelCommand(ctx context.Context, commandID string) error

func (*RedisStore) Cleanup

func (s *RedisStore) Cleanup(ctx context.Context) (int64, error)

func (*RedisStore) ConfirmCommand

func (s *RedisStore) ConfirmCommand(ctx context.Context, commandID string) error

func (*RedisStore) DrainPending

func (s *RedisStore) DrainPending(ctx context.Context, deviceID string) ([][]byte, error)

func (*RedisStore) Enqueue

func (s *RedisStore) Enqueue(ctx context.Context, deviceID string, payload []byte, ttl time.Duration) error

func (*RedisStore) EnqueueOrUpdate

func (s *RedisStore) EnqueueOrUpdate(ctx context.Context, deviceID, commandType string, payload []byte, ttl time.Duration) error

func (*RedisStore) EnqueueWithConfirmation

func (s *RedisStore) EnqueueWithConfirmation(ctx context.Context, deviceID, commandType string, payload []byte, ttl time.Duration) (string, error)

func (*RedisStore) GetPending

func (s *RedisStore) GetPending(ctx context.Context, deviceID string) ([]PendingCommand, error)

func (*RedisStore) GetPendingConfirmations

func (s *RedisStore) GetPendingConfirmations(ctx context.Context, deviceID string) ([]PendingCommand, error)

type Store

type Store interface {
	// Enqueue adds a command to the queue.
	Enqueue(ctx context.Context, deviceID string, payload []byte, ttl time.Duration) error

	// EnqueueOrUpdate upserts a command using commandType as a dedup key.
	// If an undelivered command of the same type exists, it updates the payload
	// and expiry instead of inserting a duplicate.
	EnqueueOrUpdate(ctx context.Context, deviceID, commandType string, payload []byte, ttl time.Duration) error

	// EnqueueWithConfirmation adds a command that requires confirmation before delivery.
	// Returns the command ID for later confirmation or cancellation.
	EnqueueWithConfirmation(ctx context.Context, deviceID, commandType string, payload []byte, ttl time.Duration) (string, error)

	// GetPending returns all unexpired, undelivered commands without marking them delivered.
	GetPending(ctx context.Context, deviceID string) ([]PendingCommand, error)

	// DrainPending returns and marks as delivered all commands that are ready
	// (not requiring confirmation, or already confirmed).
	DrainPending(ctx context.Context, deviceID string) ([][]byte, error)

	// GetPendingConfirmations returns commands awaiting confirmation.
	GetPendingConfirmations(ctx context.Context, deviceID string) ([]PendingCommand, error)

	// ConfirmCommand marks a command as confirmed so it will be included in the next drain.
	ConfirmCommand(ctx context.Context, commandID string) error

	// CancelCommand removes a queued command.
	CancelCommand(ctx context.Context, commandID string) error

	// Cleanup removes expired commands. Returns the number of commands removed.
	Cleanup(ctx context.Context) (int64, error)
}

Store defines the interface for an offline command queue. Implement this against your preferred storage backend (Postgres, SQLite, Redis, etc.).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL