queue

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrQueueClosed  = errors.New("queue is closed")
	ErrTaskNotFound = errors.New("task not found")
	ErrNilTask      = errors.New("task cannot be nil")
	ErrEmptyQueue   = errors.New("queue name cannot be empty")
)

队列操作公共错误

Functions

This section is empty.

Types

type Handler

type Handler interface {
	// Handle 处理任务
	Handle(ctx context.Context, task *Task) error
}

Handler 任务处理器接口

type HandlerFunc

type HandlerFunc func(ctx context.Context, task *Task) error

HandlerFunc 函数类型适配 Handler 接口

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(ctx context.Context, task *Task) error

Handle 实现 Handler 接口

type Queue

type Queue interface {
	// Enqueue 立即入队任务
	Enqueue(ctx context.Context, task *Task) error

	// EnqueueWithDelay 延迟入队任务
	EnqueueWithDelay(ctx context.Context, task *Task, delay time.Duration) error

	// Dequeue 出队一个可消费任务,无任务时返回 (nil, nil)
	Dequeue(ctx context.Context) (*Task, error)

	// Ack 通过任务 ID 确认处理成功
	Ack(ctx context.Context, taskID string) error

	// Nack 确认处理失败,自动重试或移入死信队列
	Nack(ctx context.Context, taskID string, taskErr error) error

	// GetTask 通过 ID 获取任务详情
	GetTask(ctx context.Context, taskID string) (*Task, error)

	// GetStats 获取队列统计信息
	GetStats(ctx context.Context) (*QueueStats, error)

	// Purge 清空队列所有数据
	Purge(ctx context.Context) error

	// Close 释放队列资源
	Close() error
}

Queue 工作队列核心接口,队列只存储任务 ID,任务详情独立存储

type QueueConfig

type QueueConfig struct {
	Name                      string             // 队列名称
	MaxConcurrency            int                // 最大并发工作线程数
	VisibilityTimeout         int                // 任务可见性超时(秒)
	MaxRetries                int                // 最大重试次数
	RetryDelay                RetryDelayStrategy // 重试延迟策略
	EnableDeadLetter          bool               // 是否启用死信队列
	DeadLetterMaxRetries      int                // 死信队列最大重试次数
	DelayQueueScanInterval    time.Duration      // 延迟队列扫描间隔
	ProcessingCleanupInterval time.Duration      // 处理中任务清理间隔
	Prefix                    string             // Redis key 前缀
}

QueueConfig 队列配置

func DefaultConfig

func DefaultConfig() QueueConfig

DefaultConfig 返回默认队列配置

func (QueueConfig) WithDeadLetter

func (c QueueConfig) WithDeadLetter(enable bool, maxRetries int) QueueConfig

WithDeadLetter 设置死信队列配置

func (QueueConfig) WithMaxConcurrency

func (c QueueConfig) WithMaxConcurrency(max int) QueueConfig

WithMaxConcurrency 设置最大并发数

func (QueueConfig) WithMaxRetries

func (c QueueConfig) WithMaxRetries(retries int) QueueConfig

WithMaxRetries 设置最大重试次数

func (QueueConfig) WithName

func (c QueueConfig) WithName(name string) QueueConfig

WithName 设置队列名称

func (QueueConfig) WithRetryDelay

func (c QueueConfig) WithRetryDelay(strategy RetryDelayStrategy) QueueConfig

WithRetryDelay 设置重试延迟策略

func (QueueConfig) WithScanIntervals

func (c QueueConfig) WithScanIntervals(delayScan, cleanup time.Duration) QueueConfig

WithScanIntervals 设置扫描间隔

func (QueueConfig) WithVisibilityTimeout

func (c QueueConfig) WithVisibilityTimeout(timeout int) QueueConfig

WithVisibilityTimeout 设置可见性超时

type QueueManager

type QueueManager struct {
	// contains filtered or unexported fields
}

QueueManager 管理多个队列和工作线程池

func NewQueueManager

func NewQueueManager(client *redis.Client) *QueueManager

NewQueueManager 创建队列管理器

func (*QueueManager) EnqueueTask

func (m *QueueManager) EnqueueTask(ctx context.Context, queueName string, task *Task) error

EnqueueTask 入队任务到指定队列

func (*QueueManager) EnqueueTaskWithDelay

func (m *QueueManager) EnqueueTaskWithDelay(ctx context.Context, queueName string, task *Task, delay time.Duration) error

EnqueueTaskWithDelay 延迟入队任务到指定队列

func (*QueueManager) GetOrCreateQueue

func (m *QueueManager) GetOrCreateQueue(name string, config QueueConfig) *RedisQueue

GetOrCreateQueue 获取或创建指定名称的队列

func (*QueueManager) GetQueueStats

func (m *QueueManager) GetQueueStats(ctx context.Context) (map[string]*QueueStats, error)

GetQueueStats 获取所有队列的统计信息

func (*QueueManager) RegisterWorkerPool

func (m *QueueManager) RegisterWorkerPool(queueName string, handler Handler, config QueueConfig) (*WorkerPoolImpl, error)

RegisterWorkerPool 注册工作线程池,如果队列不存在则自动创建

func (*QueueManager) StartAllWorkerPools

func (m *QueueManager) StartAllWorkerPools(ctx context.Context) error

StartAllWorkerPools 启动所有工作线程池

func (*QueueManager) StopAllWorkerPools

func (m *QueueManager) StopAllWorkerPools() error

StopAllWorkerPools 停止所有工作线程池

type QueueStats

type QueueStats struct {
	Name       string `json:"name"`        // 队列名称
	Pending    int64  `json:"pending"`     // 等待处理任务数
	Processing int64  `json:"processing"`  // 处理中任务数
	Delayed    int64  `json:"delayed"`     // 延迟队列任务数
	Completed  int64  `json:"completed"`   // 已完成任务总数
	Failed     int64  `json:"failed"`      // 已失败任务总数
	DeadLetter int64  `json:"dead_letter"` // 死信队列任务数
}

QueueStats 队列统计信息

type RedisQueue

type RedisQueue struct {
	// contains filtered or unexported fields
}

RedisQueue 基于 Redis 的工作队列,队列只存任务 ID,详情独立存储,通过 Lua 脚本保证原子性。

Redis 数据结构:

{prefix}IMMEDIATE     → List   立即执行队列
{prefix}DELAYED       → ZSet   延迟队列(score=执行时间戳)
{prefix}PROCESSING    → List   处理中队列
{prefix}DEAD          → List   死信队列
{prefix}TASK:{id}     → String 任务 JSON 数据
{prefix}TIMEOUT:{id}  → String 可见性超时追踪(带 TTL)
{prefix}STATS         → Hash   统计计数

func NewRedisQueue

func NewRedisQueue(client *redis.Client, config QueueConfig) *RedisQueue

NewRedisQueue 创建 Redis 队列实例,Redis 客户端由调用方管理生命周期

func (*RedisQueue) Ack

func (q *RedisQueue) Ack(ctx context.Context, taskID string) error

Ack 通过任务 ID 确认处理成功

func (*RedisQueue) Close

func (q *RedisQueue) Close() error

Close 释放队列资源(Redis 客户端由外部管理)

func (*RedisQueue) Dequeue

func (q *RedisQueue) Dequeue(ctx context.Context) (*Task, error)

Dequeue 出队一个可消费任务,无任务时返回 (nil, nil)

func (*RedisQueue) Enqueue

func (q *RedisQueue) Enqueue(ctx context.Context, task *Task) error

Enqueue 立即入队任务

func (*RedisQueue) EnqueueWithDelay

func (q *RedisQueue) EnqueueWithDelay(ctx context.Context, task *Task, delay time.Duration) error

EnqueueWithDelay 延迟入队任务

func (*RedisQueue) GetStats

func (q *RedisQueue) GetStats(ctx context.Context) (*QueueStats, error)

GetStats 获取队列统计信息

func (*RedisQueue) GetTask

func (q *RedisQueue) GetTask(ctx context.Context, taskID string) (*Task, error)

GetTask 通过 ID 获取任务详情

func (*RedisQueue) Nack

func (q *RedisQueue) Nack(ctx context.Context, taskID string, taskErr error) error

Nack 确认处理失败,未超重试上限则延迟重试,否则移入死信队列

func (*RedisQueue) Purge

func (q *RedisQueue) Purge(ctx context.Context) error

Purge 清空队列所有数据

type RetryDelayStrategy

type RetryDelayStrategy string

RetryDelayStrategy 重试延迟策略

const (
	// RetryDelayFixed 固定延迟
	RetryDelayFixed RetryDelayStrategy = "fixed"
	// RetryDelayExponential 指数退避
	RetryDelayExponential RetryDelayStrategy = "exponential"
	// RetryDelayRandom 随机延迟
	RetryDelayRandom RetryDelayStrategy = "random"
)

type Task

type Task struct {
	ID          string            `json:"id"`           // 任务ID
	Queue       string            `json:"queue"`        // 队列名称
	Payload     []byte            `json:"payload"`      // 任务数据
	Type        string            `json:"type"`         // 任务类型
	MaxRetries  int               `json:"max_retries"`  // 最大重试次数
	RetryCount  int               `json:"retry_count"`  // 当前重试次数
	DelayUntil  int64             `json:"delay_until"`  // 延迟执行时间(Unix时间戳,秒)
	CreatedAt   int64             `json:"created_at"`   // 任务创建时间
	StartedAt   int64             `json:"started_at"`   // 任务开始处理时间
	CompletedAt int64             `json:"completed_at"` // 任务完成时间
	Status      TaskStatus        `json:"status"`       // 任务状态
	Error       string            `json:"error"`        // 错误信息
	Metadata    map[string]string `json:"metadata"`     // 元数据
}

Task 表示一个工作队列任务

func NewTask

func NewTask(queue, taskType string, payload []byte) *Task

NewTask 创建新任务

func UnmarshalTask

func UnmarshalTask(data []byte) (*Task, error)

UnmarshalTask 从JSON反序列化任务

func (*Task) Marshal

func (t *Task) Marshal() ([]byte, error)

Marshal 序列化任务为JSON

func (*Task) WithDelay

func (t *Task) WithDelay(delay time.Duration) *Task

WithDelay 设置延迟执行

func (*Task) WithMaxRetries

func (t *Task) WithMaxRetries(maxRetries int) *Task

WithMaxRetries 设置最大重试次数

func (*Task) WithMetadata

func (t *Task) WithMetadata(key, value string) *Task

WithMetadata 设置元数据

type TaskStatus

type TaskStatus string

TaskStatus 任务状态

const (
	// TaskStatusPending 等待处理
	TaskStatusPending TaskStatus = "pending"
	// TaskStatusProcessing 处理中
	TaskStatusProcessing TaskStatus = "processing"
	// TaskStatusCompleted 已完成
	TaskStatusCompleted TaskStatus = "completed"
	// TaskStatusFailed 已失败
	TaskStatusFailed TaskStatus = "failed"
	// TaskStatusDeadLetter 死信
	TaskStatusDeadLetter TaskStatus = "dead_letter"
)

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker 从队列消费任务并交给 Handler 处理

func NewWorker

func NewWorker(id string, queue Queue, handler Handler) *Worker

NewWorker 创建工作线程

func (*Worker) Start

func (w *Worker) Start(ctx context.Context) error

Start 启动工作线程

func (*Worker) Stop

func (w *Worker) Stop() error

Stop 停止工作线程,等待当前任务处理完成

func (*Worker) WithLogger

func (w *Worker) WithLogger(log logger.Logger) *Worker

WithLogger 设置工作线程的日志实例

type WorkerPool

type WorkerPool interface {
	// Start 启动工作线程池
	Start(ctx context.Context) error

	// Stop 停止工作线程池
	Stop() error

	// IsRunning 线程池是否正在运行
	IsRunning() bool
}

WorkerPool 工作线程池接口

type WorkerPoolImpl

type WorkerPoolImpl struct {
	// contains filtered or unexported fields
}

WorkerPoolImpl 工作线程池,管理多个 Worker 协程

func NewWorkerPool

func NewWorkerPool(queue Queue, handler Handler, config QueueConfig) *WorkerPoolImpl

NewWorkerPool 创建工作线程池

func (*WorkerPoolImpl) IsRunning

func (p *WorkerPoolImpl) IsRunning() bool

IsRunning 报告线程池是否正在运行

func (*WorkerPoolImpl) Start

func (p *WorkerPoolImpl) Start(ctx context.Context) error

Start 启动工作线程池,创建 config.MaxConcurrency 个工作线程

func (*WorkerPoolImpl) Stop

func (p *WorkerPoolImpl) Stop() error

Stop 优雅停止工作线程池,等待所有工作线程完成当前任务

func (*WorkerPoolImpl) WithLogger

func (p *WorkerPoolImpl) WithLogger(log logger.Logger) *WorkerPoolImpl

WithLogger 设置工作线程池的日志实例

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL