uniqw

package module
v0.0.0-...-e6c155e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2025 License: MIT Imports: 14 Imported by: 0

README

UniQw-Go

Go Reference Go Report Card

UniQw-Go is a Redis-based task queue library for Go that prioritizes performance, reliability, and ease of use. Designed to handle millions of tasks with unique features such as separate retention for success/failure, progress tracking, and unique task locking.

Key Features

  • At-Least-Once Delivery: Guarantees that tasks are executed at least once.
  • Separate Retention: Set different retention times for successful tasks vs failed tasks (Dead Letter Queue).
  • Task Uniqueness: Prevent task duplication with unique keys per queue.
  • Progress & Result Tracking: Handlers can report progress percentage and store execution results (JSON).
  • Cluster Friendly: Uses Redis Hash Tags {queue} for full compatibility with Redis Cluster.
  • Graceful Shutdown: Supports safe server termination without losing currently running tasks.
  • Comprehensive Inspection: APIs to list, delete, and retry tasks in various states.

Installation

go get github.com/UniQw/uniqw-go

Quick Start

Producer (Client)
package main

import (
	"context"
	"time"
	"github.com/UniQw/uniqw-go"
	"github.com/redis/go-redis/v9"
)

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	client := uniqw.NewClient(rdb)

	// Enqueue task with options
	err := client.Enqueue(context.Background(), "default", "send:email",
		map[string]string{"to": "user@example.com"},
		uniqw.MaxRetry(3),
		uniqw.Retention(1*time.Hour),       // Retain for 1 hour if successful
		uniqw.RetentionError(24*time.Hour), // Retain for 24 hours if fully failed
	)
	if err != nil {
		panic(err)
	}
}
Consumer (Server)
package main

import (
	"context"
	"github.com/UniQw/uniqw-go"
	"github.com/redis/go-redis/v9"
)

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	mux := uniqw.NewMux()

	mux.Handle("send:email", func(ctx context.Context, payload []byte) error {
		// Report progress to dashboard
		uniqw.SetProgress(ctx, 50)

		// Store execution result
		uniqw.SetResult(ctx, map[string]string{"status": "delivered"})

		return nil
	})

	server := uniqw.NewServer(rdb, uniqw.ServerConfig{
		Queues:      map[string]int{"default": 1},
		Concurrency: 10,
	}, mux)

	server.Start()
}

Advanced Concepts

Task States

Tasks move through several states:

  1. Pending: Task is ready for execution.
  2. Active: Task is being processed by a worker (with visibility TTL).
  3. Delayed: Task scheduled for the future or in backoff retry period.
  4. Succeeded: Task completed successfully (retained according to Retention).
  5. Dead: Task failed after reaching MaxRetry (retained according to RetentionError).
Task Management

You can inspect and manage the queue using the Client:

// List failed tasks
tasks, _ := client.ListTasks(ctx, "default", uniqw.StateDead, nil)

// Retry task from Dead Letter Queue
client.RetryDead(ctx, "default", "task-id-123", uniqw.Delay(1*time.Minute))

// Delete a specific task (searches across all states)
client.DeleteTask(ctx, "default", "task-id-123")

Testing

Tests
# Run all tests (unit and integration)
make test

# With coverage
make cover

Note: Integration tests require Docker to run Redis via testcontainers.

Full Examples

See the examples/ folder for more detailed producer and consumer implementation examples.

License

Distributed under the MIT License. See LICENSE for more information.

Documentation

Index

Constants

This section is empty.

Variables

AllStates lists every valid queue state in a stable order.

View Source
var ErrActiveState = errors.New("uniqw: operation not allowed on active state")

ErrActiveState is returned when an operation is not allowed on the active state.

View Source
var ErrDuplicateTask = errors.New("uniqw: duplicate task id")

ErrDuplicateTask is returned when Enqueue is called with an ID that already exists for the queue.

View Source
var ErrTaskNotFound = errors.New("uniqw: task not found")

ErrTaskNotFound is returned when a task with the specified ID is not found.

View Source
var ErrUnknownState = errors.New("uniqw: unknown state")

ErrUnknownState is returned when an invalid state is used.

Functions

func ExtractQueueName

func ExtractQueueName(key string) string

ExtractQueueName parses a queue name from a raw Redis key (e.g. "uniqw:{default}:pending"). It returns an empty string if the format is invalid.

func SetProgress

func SetProgress(ctx context.Context, p int)

SetProgress allows a handler to report progress (0..100) for the current task. It is a no-op if the context is not provided by the UniQw runtime.

func SetResult

func SetResult(ctx context.Context, v any) error

SetResult encodes the provided value using the default JSON encoder and attaches it as the handler result. It is safe to call multiple times; last wins. It is a no-op if the context is not provided by the UniQw runtime.

func SetResultBytes

func SetResultBytes(ctx context.Context, b []byte)

SetResultBytes attaches raw bytes as the handler result without encoding. It is a no-op if the context is not provided by the UniQw runtime.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client provides APIs to enqueue and manage tasks in Redis.

func NewClient

func NewClient(rdb redis.UniversalClient) *Client

NewClient creates a new UniQw client.

func (*Client) DeleteTask

func (c *Client) DeleteTask(ctx context.Context, queue string, id string, opts ...Option) error

DeleteTask removes a task from the specified queue by its ID. It searches across all states (Pending, Delayed, Succeeded, Dead) and removes the first match. It returns ErrTaskNotFound if the ID is not found in any of those states.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, queue, taskType string, payload any, opts ...Option) error

Enqueue adds a new task to the specified queue. It returns ErrDuplicateTask if the task ID (explicit or generated) already exists in the queue.

func (*Client) ListTasks

func (c *Client) ListTasks(ctx context.Context, queue string, state State, filter TaskFilter) ([]*Task, error)

ListTasks returns a list of tasks in a specific state for the given queue. It supports filtering tasks by any field.

func (*Client) RetryDead

func (c *Client) RetryDead(ctx context.Context, queue string, id string, opts ...Option) error

RetryDead moves a task from the Dead Letter Queue back to Pending or Delayed state. It resets retry counts and errors. You can override retention settings or add a delay.

type Encoder

type Encoder interface {
	// Encode serializes a value to bytes.
	Encode(any) ([]byte, error)
	// Decode deserializes bytes to a value.
	Decode([]byte, any) error
}

Encoder defines the interface for task payload serialization.

type FmtLogger

type FmtLogger struct{}

FmtLogger is a minimal logger that prints messages with level prefixes. Debug/Info go to stdout; Warn/Error go to stderr.

func NewFmtLogger

func NewFmtLogger() *FmtLogger

NewFmtLogger creates a new FmtLogger.

func (FmtLogger) Debugf

func (FmtLogger) Debugf(format string, args ...any)

func (FmtLogger) Errorf

func (FmtLogger) Errorf(format string, args ...any)

func (FmtLogger) Infof

func (FmtLogger) Infof(format string, args ...any)

func (FmtLogger) Warnf

func (FmtLogger) Warnf(format string, args ...any)

type HandlerFunc

type HandlerFunc func(ctx context.Context, payload []byte) error

HandlerFunc is the function signature for processing a task.

type JSONEncoder

type JSONEncoder struct{}

JSONEncoder is the default implementation of Encoder using JSON. It uses standard library for encoding and sonic for decoding.

func (*JSONEncoder) Decode

func (*JSONEncoder) Decode(data []byte, v any) error

Decode deserializes JSON bytes using sonic.

func (*JSONEncoder) Encode

func (*JSONEncoder) Encode(v any) ([]byte, error)

Encode serializes a value to JSON using standard library.

type Logger

type Logger interface {
	Debugf(format string, args ...any)
	Infof(format string, args ...any)
	Warnf(format string, args ...any)
	Errorf(format string, args ...any)
}

Logger defines logging methods used by the library. Implementations should be cheap. Default is FmtLogger which writes to stdout/stderr using fmt.

type Middleware

type Middleware func(HandlerFunc) HandlerFunc

Middleware is a function that wraps a HandlerFunc to provide cross-cutting concerns.

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

Mux routes tasks to their respective handlers based on task type.

func NewMux

func NewMux() *Mux

NewMux creates a new Task Mux.

func (*Mux) Handle

func (m *Mux) Handle(taskType string, fn func(context.Context, []byte) error)

Handle registers a handler for a specific task type.

func (*Mux) Use

func (m *Mux) Use(mw Middleware)

Use adds middleware(s) to the mux. Middlewares are executed in the order they are added.

type Option

type Option func(*options)

Option is a function that configures task behavior during Enqueue or RetryDead.

func Deadline

func Deadline(t time.Time) Option

Deadline sets an absolute deadline for the task. The task will not be processed if the current time is past the deadline.

func Delay

func Delay(d time.Duration) Option

Delay schedules the task to be executed after the specified duration.

func ExpireIn

func ExpireIn(d time.Duration) Option

ExpireIn sets a relative deadline for the task. The task will not be processed if the current time is past the deadline.

func MaxRetry

func MaxRetry(n int) Option

MaxRetry sets the maximum number of retry attempts for the task.

func Retention

func Retention(d time.Duration) Option

Retention sets how long (in seconds) the task is kept in the Succeeded state.

func RetentionError

func RetentionError(d time.Duration) Option

RetentionError sets how long (in seconds) the task is kept in the Dead state. If d is 0, the task will be dropped immediately after final failure. If d is negative, the task will be kept forever (default).

func TaskID

func TaskID(id string) Option

TaskID sets a custom ID for the task. If not provided, a random UUID will be generated.

func WithKeepUniqueLock

func WithKeepUniqueLock() Option

WithKeepUniqueLock ensures that the uniqueness lock for the task ID is NOT released even after the task is deleted (unless it reached Succeeded state).

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server processes tasks from Redis queues using workers.

func NewServer

func NewServer(rdb *redis.Client, cfg ServerConfig, mux *Mux) *Server

NewServer creates a new UniQw server.

func (*Server) Start

func (s *Server) Start()

Start launches the server workers and background maintenance routines. It is idempotent and non-blocking.

func (*Server) Stop

func (s *Server) Stop()

Stop gracefully shuts down the server, waiting for workers to finish current tasks.

type ServerConfig

type ServerConfig struct {
	// Queues defines the queues to process and their relative weights.
	Queues map[string]int
	// Concurrency is the number of worker goroutines.
	Concurrency int
	// VisibilityTTL is the duration for which a task is leased by a worker.
	// If the worker fails or crashes, the task will be reclaimed after this TTL.
	VisibilityTTL time.Duration
	// Logger is the logger used for server events.
	Logger Logger
}

ServerConfig defines the configuration for a UniQw server.

type State

type State string

State represents a queue state used to store and inspect tasks. Use the exported constants (StatePending, StateActive, etc.) instead of raw strings to avoid typos.

const (
	// StatePending contains tasks ready for execution (LIST).
	StatePending State = "pending"
	// StateActive contains tasks currently being processed by workers (ZSET).
	StateActive State = "active"
	// StateDelayed contains scheduled tasks or tasks in backoff retry (ZSET).
	StateDelayed State = "delayed"
	// StateSucceeded contains successfully completed tasks (ZSET).
	StateSucceeded State = "succeeded"
	// StateDead contains permanently failed tasks (LIST).
	StateDead State = "dead"
)

func ParseState

func ParseState(s string) (State, error)

ParseState converts a string into a State, returning an error for unknown values.

func (State) String

func (s State) String() string

String returns the raw string value of the state.

type Task

type Task struct {
	// ID is the unique identifier for the task.
	ID string `json:"id"`
	// Type defines the task category, used by Mux to route to the correct handler.
	Type string `json:"type"`
	// Queue is the name of the queue this task belongs to.
	Queue string `json:"queue"`
	// Payload is the raw task data.
	Payload []byte `json:"payload"`
	// Retry is the current number of retry attempts made.
	Retry int `json:"retry"`
	// MaxRetry is the maximum number of retries allowed before moving to Dead state.
	MaxRetry int `json:"max_retry"`
	// Retention is the duration (in seconds) to keep the task after successful completion.
	Retention int64 `json:"retention"`
	// ErrRetention is the duration (in seconds) to keep the task after it has permanently failed.
	ErrRetention int64 `json:"err_retention,omitempty"`
	// CreatedAt is the timestamp (ms) when the task was enqueued.
	CreatedAt int64 `json:"created_at,omitempty"`
	// DeadlineMs is the absolute timestamp (ms) after which the task should not be processed.
	DeadlineMs int64 `json:"deadline_ms,omitempty"`
	// StartedAt is the timestamp (ms) when the worker started processing the task.
	StartedAt int64 `json:"started_at,omitempty"`
	// CompletedAt is the timestamp (ms) when the task was finished (success or final failure).
	CompletedAt int64 `json:"completed_at,omitempty"`
	// LastError is the error message from the last failed attempt.
	LastError string `json:"last_error,omitempty"`
	// LastErrorAt is the timestamp (ms) of the last failed attempt.
	LastErrorAt int64 `json:"last_error_at,omitempty"`
	// Progress is the current task progress (0..100).
	Progress int `json:"progress,omitempty"`
	// Result is the execution result stored as JSON.
	Result []byte `json:"result,omitempty"`
}

Task represents a unit of work to be processed by a worker. It is serialized to JSON and stored in Redis.

type TaskFilter

type TaskFilter func(*Task) bool

TaskFilter is a function used to filter tasks during ListTasks.

Directories

Path Synopsis
examples
client command
server command
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL