redisq

package module
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2025 License: MIT Imports: 11 Imported by: 0

README ¶

Redisq

Go Reference Go Report Card Go Version CI codecov

A lightweight, thread-safe Redis-backed queue implementation in Go with support for distributed notifications and priority queuing.

Features

  • Thread-safe queue operations
  • Priority queue support
  • Distributed queue with real-time notifications
  • Automatic message expiration support
  • Graceful shutdown handling
  • Concurrent operations support
  • Simple API for queue operations

Installation

go get github.com/goptics/redisq

Quick Start

import "github.com/goptics/redisq"

func main() {
    // Initialize queue system
    qs := redisq.New("redis://localhost:6379")
    defer qs.Close()

    // Create a simple queue
    queue := qs.NewQueue("my-queue")
    defer queue.Close()

    // Enqueue items
    queue.Enqueue("hello world")

    // Dequeue items
    data, ok := queue.Dequeue()
    if ok {
        fmt.Println(string(data.([]byte)))
    }
}

Usage

Simple Queue
// Create a queue
queue := qs.NewQueue("my-queue")

// Optional: Set expiration for queue items
queue.SetExpiration(time.Hour)

// Enqueue items (supports []byte and string)
queue.Enqueue("test message")
queue.Enqueue([]byte("binary data"))

// Get queue length
length := queue.Len()

// Get all values
values := queue.Values()

// Purge the queue
queue.Purge()
Priority Queue
// Create a priority queue
pq := qs.NewPriorityQueue("priority-queue")

// Enqueue items with priority (lower number = higher priority)
pq.Enqueue("high priority", 1)
pq.Enqueue("medium priority", 2)
pq.Enqueue("low priority", 3)

// Items will be dequeued in priority order
data, ok := pq.Dequeue() // Returns "high priority"
Distributed Priority Queue
// Create a distributed priority queue
dpq := qs.NewDistributedPriorityQueue("distributed-priority-queue")

// Subscribe to notifications for queue and dequeue events
dpq.Subscribe(func(action string) {
    fmt.Printf("Action: %s\n", action)
})


// Enqueue with priority will trigger "enqueued" notification
dpq.Enqueue("important message", 1)

// Dequeue will trigger "dequeued" notification
data, ok := dpq.Dequeue()

Distributed Queue with Notifications
// Create a distributed queue
dq := qs.NewDistributedQueue("distributed-queue")

// Subscribe to notifications for queue and dequeue events
dq.Subscribe(func(action string) {
    fmt.Printf("Action: %s\n", action)
})

// Enqueue will trigger "enqueued" notification
dq.Enqueue("test message")

// Dequeue will trigger "dequeued" notification
data, ok := dq.Dequeue()
Queue with Acknowledgment (Reliable Processing)
// Create a queue with acknowledgment support
queue := qs.NewQueue("ack-queue")

// Set acknowledgment timeout (how long before unacknowledged items are requeued)
queue.SetAckTimeout(time.Minute * 5)

// Dequeue an item with a unique acknowledgment ID
ackID := "job-123"
item, ok := queue.Dequeue()
if ok {
    // Process the item
    processItem(item)

    // Mark the item as successfully processed
    queue.Acknowledge(ackID)
}

// For manual control of the acknowledgment process:
// 1. Prepare an item for future acknowledgment
ackID := "job-456"
data := "important job"
err := queue.PrepareForFutureAck(ackID, data)

// 2. Acknowledge the item after processing
success := queue.Acknowledge(ackID)

// You can trigger requeue of all unacknowledged items
queue.RequeueNackedItems()

// 4. Get count of pending unacknowledged items
pendingCount := queue.GetNackedItemsCount()

Configuration

If you have docker installed just do the following:

cp .env.example .env
docker compose up -d

you can change the REDIS_PORT in the .env file

Testing

go test -race -v ./...

Requirements

  • Go 1.24.1 or higher
  • Redis 6.0 or higher

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

👤 Author

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation ¶

Index ¶

Constants ¶

This section is empty.

Variables ¶

This section is empty.

Functions ¶

This section is empty.

Types ¶

type DistributedPriorityQueue ¶

type DistributedPriorityQueue struct {
	*PriorityQueue
	*Notification
}

func (*DistributedPriorityQueue) Close ¶

func (q *DistributedPriorityQueue) Close() error

func (*DistributedPriorityQueue) Dequeue ¶

func (q *DistributedPriorityQueue) Dequeue() (any, bool)

func (*DistributedPriorityQueue) Enqueue ¶

func (q *DistributedPriorityQueue) Enqueue(item any, priority int) bool

type DistributedQueue ¶

type DistributedQueue struct {
	*Queue
	*Notification
}

func (*DistributedQueue) Close ¶

func (q *DistributedQueue) Close() error

func (*DistributedQueue) Dequeue ¶

func (q *DistributedQueue) Dequeue() (any, bool)

func (*DistributedQueue) Enqueue ¶

func (q *DistributedQueue) Enqueue(item any) bool

type Notification ¶

type Notification struct {
	// contains filtered or unexported fields
}

func (*Notification) Send ¶

func (n *Notification) Send(action string)

func (*Notification) Start ¶

func (n *Notification) Start()

func (*Notification) Stop ¶

func (n *Notification) Stop()

func (*Notification) Subscribe ¶

func (n *Notification) Subscribe(handler func(action string))

type PriorityQueue ¶

type PriorityQueue struct {
	*Queue
}

func (*PriorityQueue) Dequeue ¶

func (pq *PriorityQueue) Dequeue() (any, bool)

Dequeue removes and returns the highest priority item from the queue (lowest score)

func (*PriorityQueue) Enqueue ¶

func (pq *PriorityQueue) Enqueue(item any, priority int) bool

Enqueue adds an item to the queue with a specified priority Lower priority values (closer to 1) will be dequeued first

func (*PriorityQueue) Len ¶

func (pq *PriorityQueue) Len() int

Len returns the number of items in the priority queue

func (*PriorityQueue) Remove ¶

func (pq *PriorityQueue) Remove(item any) bool

func (*PriorityQueue) Values ¶

func (pq *PriorityQueue) Values() []any

Values returns all items in the priority queue ordered by priority (highest to lowest)

type Queue ¶

type Queue struct {
	// contains filtered or unexported fields
}

func (*Queue) Acknowledge ¶

func (q *Queue) Acknowledge(ackID string) bool

Acknowledge removes an item from the pending list indicating successful processing

func (*Queue) Close ¶

func (q *Queue) Close() error

func (*Queue) Dequeue ¶

func (q *Queue) Dequeue() (any, bool)

Dequeue removes and returns an item from the queue without acknowledgment For reliable processing with acknowledgment, use DequeueWithAck instead

func (*Queue) DequeueWithAckId ¶

func (q *Queue) DequeueWithAckId() (any, bool, string)

func (*Queue) Enqueue ¶

func (q *Queue) Enqueue(item any) bool

func (*Queue) GetNackedItemsCount ¶

func (q *Queue) GetNackedItemsCount() int

GetNackedItemsCount returns the number of items in the nacked list

func (*Queue) Len ¶

func (q *Queue) Len() int

func (*Queue) Listen ¶

func (q *Queue) Listen()

func (*Queue) PrepareForFutureAck ¶

func (q *Queue) PrepareForFutureAck(ackID string, item any) error

PrepareForFutureAck adds an item to the pending list for acknowledgment tracking Returns an error if the operation fails

func (*Queue) Purge ¶

func (q *Queue) Purge()

func (*Queue) Remove ¶

func (q *Queue) Remove(item any) bool

func (*Queue) RequeueNackedItems ¶

func (q *Queue) RequeueNackedItems() error

requeueNackedItems checks for un-acknowledged items in the nacked list and returns them to the main queue to be processed again

func (*Queue) SetAckTimeout ¶

func (q *Queue) SetAckTimeout(ackTimeout time.Duration)

SetAckTimeout sets the acknowledgment timeout for jobs This controls how long a job can be processing before being requeued

func (*Queue) SetExpiration ¶

func (q *Queue) SetExpiration(expiration time.Duration)

SetExpiration sets the expiration time for the Queue

func (*Queue) Values ¶

func (q *Queue) Values() []any

type Queues ¶

type Queues interface {
	NewQueue(queueKey string) *Queue
	NewPriorityQueue(queueKey string) *PriorityQueue
	NewDistributedQueue(queueKey string) *DistributedQueue
	NewDistributedPriorityQueue(queueKey string) *DistributedPriorityQueue
	Close() error
}

func New ¶

func New(url string) Queues

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL