queue

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2026 License: GPL-3.0 Imports: 12 Imported by: 0

Documentation

Overview

Package queue provides a durable, file-backed FIFO queue for QueueItems.

Items are persisted as JSONL (one JSON object per line) in a single flat file. All operations are protected by a mutex; Enqueue and Dequeue atomically rewrite the backing file to ensure consistency.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GenerateItemID

func GenerateItemID() string

GenerateItemID generates a unique queue item ID in the form "item_<yyyymmdd>_<HHMM>_<4hex>".

Types

type FileQueue

type FileQueue struct {
	// contains filtered or unexported fields
}

FileQueue is a mutex-protected, JSONL-backed FIFO queue. Create with NewFileQueue; all exported methods are safe for concurrent use.

func NewFileQueue

func NewFileQueue(path string) (*FileQueue, error)

NewFileQueue creates a FileQueue backed by path, loading any existing items. If path does not exist, the queue starts empty and the file is created on the first Enqueue call.

func (*FileQueue) Dequeue

func (q *FileQueue) Dequeue() (model.QueueItem, bool, error)

Dequeue removes and returns the head item. Returns (item, true, nil) on success, ({}, false, nil) when the queue is empty, and ({}, false, err) if a persistence error occurs.

func (*FileQueue) DequeueByIDs

func (q *FileQueue) DequeueByIDs(ids []string) ([]model.QueueItem, error)

DequeueByIDs removes and returns the items whose IDs appear in ids, preserving queue order for the remaining items. IDs not found in the queue are silently ignored. Returns the matched items in queue order and persists the change.

func (*FileQueue) Drain

func (q *FileQueue) Drain() ([]model.QueueItem, error)

Drain removes and returns all items from the queue. The backing file is truncated.

func (*FileQueue) Enqueue

func (q *FileQueue) Enqueue(item model.QueueItem) error

Enqueue appends item to the tail of the queue and persists to disk.

func (*FileQueue) Len

func (q *FileQueue) Len() int

Len returns the number of items currently in the queue.

func (*FileQueue) Peek

func (q *FileQueue) Peek() (model.QueueItem, bool)

Peek returns the head item without removing it. Returns ({}, false) when the queue is empty.

func (*FileQueue) Scan

func (q *FileQueue) Scan() []model.QueueItem

Scan returns a snapshot of all items currently in the queue without removing any. The returned slice is a copy; mutating it does not affect the queue.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager holds named FileQueues, creating new ones on first access. All methods are safe for concurrent use.

func NewManager

func NewManager(dir string) *Manager

NewManager returns a Manager that stores queue files under dir. The directory is created lazily when the first queue file is written.

func (*Manager) Delete

func (m *Manager) Delete(name string) error

Delete removes the named queue from the in-memory map and deletes its backing file. For single-use queues stored in a subdirectory, the subdirectory is removed when it becomes empty. Returns nil when the file does not exist (idempotent).

func (*Manager) Depth

func (m *Manager) Depth(name string) int

Depth returns the number of items currently in the named queue. Returns 0 when the queue has not been opened yet (file may still hold items; Depth is accurate only after Get has been called at least once for the queue).

func (*Manager) Enqueue

func (m *Manager) Enqueue(queueName string, item model.QueueItem) error

Enqueue is a convenience method that calls Get then Enqueue.

func (*Manager) EnqueueIfAbsent

func (m *Manager) EnqueueIfAbsent(name string, item model.QueueItem) (bool, error)

EnqueueIfAbsent enqueues item into the named queue only when no item with the same ID already exists in the queue. It returns (true, nil) when enqueued and (false, nil) when skipped due to a duplicate. An error is returned only for I/O failures.

func (*Manager) Get

func (m *Manager) Get(name string) (*FileQueue, error)

Get returns the named queue, creating it if it does not yet exist. The backing file is <dir>/<name>.jsonl for static queues, or <dir>/<prefix>/<id>.jsonl for single-use queues whose name contains a "/".

func (*Manager) Names

func (m *Manager) Names() []string

Names returns the names of all queues that have been accessed via Get.

func (*Manager) NamesContaining

func (m *Manager) NamesContaining(substr string) ([]string, error)

NamesContaining returns the names of all queues on disk whose names contain substr as a substring. It reads the queue directory and one level of subdirectories directly so it discovers queues created by other workers or not yet opened via Get. Use this to check whether a hide_id is still referenced by any live queue.

func (*Manager) NamesWithPrefix

func (m *Manager) NamesWithPrefix(prefix string) ([]string, error)

NamesWithPrefix returns the names of all single-use queues stored under the <dir>/<prefix>/ subdirectory. Each returned name has the form "<prefix>/<id>". It reads the subdirectory directly so it discovers queues created by other processes or not yet opened via Get.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL