Documentation
¶
Index ¶
- Variables
- type Handler
- type HandlerFunc
- type Queue
- type QueueConfig
- func (c QueueConfig) WithDeadLetter(enable bool, maxRetries int) QueueConfig
- func (c QueueConfig) WithMaxConcurrency(max int) QueueConfig
- func (c QueueConfig) WithMaxRetries(retries int) QueueConfig
- func (c QueueConfig) WithName(name string) QueueConfig
- func (c QueueConfig) WithRetryDelay(strategy RetryDelayStrategy) QueueConfig
- func (c QueueConfig) WithScanIntervals(delayScan, cleanup time.Duration) QueueConfig
- func (c QueueConfig) WithVisibilityTimeout(timeout int) QueueConfig
- type QueueManager
- func (m *QueueManager) EnqueueTask(ctx context.Context, queueName string, task *Task) error
- func (m *QueueManager) EnqueueTaskWithDelay(ctx context.Context, queueName string, task *Task, delay time.Duration) error
- func (m *QueueManager) GetOrCreateQueue(name string, config QueueConfig) *RedisQueue
- func (m *QueueManager) GetQueueStats(ctx context.Context) (map[string]*QueueStats, error)
- func (m *QueueManager) RegisterWorkerPool(queueName string, handler Handler, config QueueConfig) (*WorkerPoolImpl, error)
- func (m *QueueManager) StartAllWorkerPools(ctx context.Context) error
- func (m *QueueManager) StopAllWorkerPools() error
- type QueueStats
- type RedisQueue
- func (q *RedisQueue) Ack(ctx context.Context, taskID string) error
- func (q *RedisQueue) Close() error
- func (q *RedisQueue) Dequeue(ctx context.Context) (*Task, error)
- func (q *RedisQueue) Enqueue(ctx context.Context, task *Task) error
- func (q *RedisQueue) EnqueueWithDelay(ctx context.Context, task *Task, delay time.Duration) error
- func (q *RedisQueue) GetStats(ctx context.Context) (*QueueStats, error)
- func (q *RedisQueue) GetTask(ctx context.Context, taskID string) (*Task, error)
- func (q *RedisQueue) Nack(ctx context.Context, taskID string, taskErr error) error
- func (q *RedisQueue) Purge(ctx context.Context) error
- type RetryDelayStrategy
- type Task
- type TaskStatus
- type Worker
- type WorkerPool
- type WorkerPoolImpl
Constants ¶
This section is empty.
Variables ¶
var ( ErrQueueClosed = errors.New("queue is closed") ErrTaskNotFound = errors.New("task not found") ErrNilTask = errors.New("task cannot be nil") ErrEmptyQueue = errors.New("queue name cannot be empty") )
队列操作公共错误
Functions ¶
This section is empty.
Types ¶
type HandlerFunc ¶
HandlerFunc 函数类型适配 Handler 接口
type Queue ¶
type Queue interface {
// Enqueue 立即入队任务
Enqueue(ctx context.Context, task *Task) error
// EnqueueWithDelay 延迟入队任务
EnqueueWithDelay(ctx context.Context, task *Task, delay time.Duration) error
// Dequeue 出队一个可消费任务,无任务时返回 (nil, nil)
Dequeue(ctx context.Context) (*Task, error)
// Ack 通过任务 ID 确认处理成功
Ack(ctx context.Context, taskID string) error
// Nack 确认处理失败,自动重试或移入死信队列
Nack(ctx context.Context, taskID string, taskErr error) error
// GetTask 通过 ID 获取任务详情
GetTask(ctx context.Context, taskID string) (*Task, error)
// GetStats 获取队列统计信息
GetStats(ctx context.Context) (*QueueStats, error)
// Purge 清空队列所有数据
Purge(ctx context.Context) error
// Close 释放队列资源
Close() error
}
Queue 工作队列核心接口,队列只存储任务 ID,任务详情独立存储
type QueueConfig ¶
type QueueConfig struct {
Name string // 队列名称
MaxConcurrency int // 最大并发工作线程数
VisibilityTimeout int // 任务可见性超时(秒)
MaxRetries int // 最大重试次数
RetryDelay RetryDelayStrategy // 重试延迟策略
EnableDeadLetter bool // 是否启用死信队列
DeadLetterMaxRetries int // 死信队列最大重试次数
DelayQueueScanInterval time.Duration // 延迟队列扫描间隔
ProcessingCleanupInterval time.Duration // 处理中任务清理间隔
Prefix string // Redis key 前缀
}
QueueConfig 队列配置
func (QueueConfig) WithDeadLetter ¶
func (c QueueConfig) WithDeadLetter(enable bool, maxRetries int) QueueConfig
WithDeadLetter 设置死信队列配置
func (QueueConfig) WithMaxConcurrency ¶
func (c QueueConfig) WithMaxConcurrency(max int) QueueConfig
WithMaxConcurrency 设置最大并发数
func (QueueConfig) WithMaxRetries ¶
func (c QueueConfig) WithMaxRetries(retries int) QueueConfig
WithMaxRetries 设置最大重试次数
func (QueueConfig) WithName ¶
func (c QueueConfig) WithName(name string) QueueConfig
WithName 设置队列名称
func (QueueConfig) WithRetryDelay ¶
func (c QueueConfig) WithRetryDelay(strategy RetryDelayStrategy) QueueConfig
WithRetryDelay 设置重试延迟策略
func (QueueConfig) WithScanIntervals ¶
func (c QueueConfig) WithScanIntervals(delayScan, cleanup time.Duration) QueueConfig
WithScanIntervals 设置扫描间隔
func (QueueConfig) WithVisibilityTimeout ¶
func (c QueueConfig) WithVisibilityTimeout(timeout int) QueueConfig
WithVisibilityTimeout 设置可见性超时
type QueueManager ¶
type QueueManager struct {
// contains filtered or unexported fields
}
QueueManager 管理多个队列和工作线程池
func NewQueueManager ¶
func NewQueueManager(client *redis.Client) *QueueManager
NewQueueManager 创建队列管理器
func (*QueueManager) EnqueueTask ¶
EnqueueTask 入队任务到指定队列
func (*QueueManager) EnqueueTaskWithDelay ¶
func (m *QueueManager) EnqueueTaskWithDelay(ctx context.Context, queueName string, task *Task, delay time.Duration) error
EnqueueTaskWithDelay 延迟入队任务到指定队列
func (*QueueManager) GetOrCreateQueue ¶
func (m *QueueManager) GetOrCreateQueue(name string, config QueueConfig) *RedisQueue
GetOrCreateQueue 获取或创建指定名称的队列
func (*QueueManager) GetQueueStats ¶
func (m *QueueManager) GetQueueStats(ctx context.Context) (map[string]*QueueStats, error)
GetQueueStats 获取所有队列的统计信息
func (*QueueManager) RegisterWorkerPool ¶
func (m *QueueManager) RegisterWorkerPool(queueName string, handler Handler, config QueueConfig) (*WorkerPoolImpl, error)
RegisterWorkerPool 注册工作线程池,如果队列不存在则自动创建
func (*QueueManager) StartAllWorkerPools ¶
func (m *QueueManager) StartAllWorkerPools(ctx context.Context) error
StartAllWorkerPools 启动所有工作线程池
func (*QueueManager) StopAllWorkerPools ¶
func (m *QueueManager) StopAllWorkerPools() error
StopAllWorkerPools 停止所有工作线程池
type QueueStats ¶
type QueueStats struct {
Name string `json:"name"` // 队列名称
Pending int64 `json:"pending"` // 等待处理任务数
Processing int64 `json:"processing"` // 处理中任务数
Delayed int64 `json:"delayed"` // 延迟队列任务数
Completed int64 `json:"completed"` // 已完成任务总数
Failed int64 `json:"failed"` // 已失败任务总数
DeadLetter int64 `json:"dead_letter"` // 死信队列任务数
}
QueueStats 队列统计信息
type RedisQueue ¶
type RedisQueue struct {
// contains filtered or unexported fields
}
RedisQueue 基于 Redis 的工作队列,队列只存任务 ID,详情独立存储,通过 Lua 脚本保证原子性。
Redis 数据结构:
{prefix}IMMEDIATE → List 立即执行队列
{prefix}DELAYED → ZSet 延迟队列(score=执行时间戳)
{prefix}PROCESSING → List 处理中队列
{prefix}DEAD → List 死信队列
{prefix}TASK:{id} → String 任务 JSON 数据
{prefix}TIMEOUT:{id} → String 可见性超时追踪(带 TTL)
{prefix}STATS → Hash 统计计数
func NewRedisQueue ¶
func NewRedisQueue(client *redis.Client, config QueueConfig) *RedisQueue
NewRedisQueue 创建 Redis 队列实例,Redis 客户端由调用方管理生命周期
func (*RedisQueue) Ack ¶
func (q *RedisQueue) Ack(ctx context.Context, taskID string) error
Ack 通过任务 ID 确认处理成功
func (*RedisQueue) Dequeue ¶
func (q *RedisQueue) Dequeue(ctx context.Context) (*Task, error)
Dequeue 出队一个可消费任务,无任务时返回 (nil, nil)
func (*RedisQueue) Enqueue ¶
func (q *RedisQueue) Enqueue(ctx context.Context, task *Task) error
Enqueue 立即入队任务
func (*RedisQueue) EnqueueWithDelay ¶
EnqueueWithDelay 延迟入队任务
func (*RedisQueue) GetStats ¶
func (q *RedisQueue) GetStats(ctx context.Context) (*QueueStats, error)
GetStats 获取队列统计信息
type RetryDelayStrategy ¶
type RetryDelayStrategy string
RetryDelayStrategy 重试延迟策略
const ( // RetryDelayFixed 固定延迟 RetryDelayFixed RetryDelayStrategy = "fixed" // RetryDelayExponential 指数退避 RetryDelayExponential RetryDelayStrategy = "exponential" // RetryDelayRandom 随机延迟 RetryDelayRandom RetryDelayStrategy = "random" )
type Task ¶
type Task struct {
ID string `json:"id"` // 任务ID
Queue string `json:"queue"` // 队列名称
Payload []byte `json:"payload"` // 任务数据
Type string `json:"type"` // 任务类型
MaxRetries int `json:"max_retries"` // 最大重试次数
RetryCount int `json:"retry_count"` // 当前重试次数
DelayUntil int64 `json:"delay_until"` // 延迟执行时间(Unix时间戳,秒)
CreatedAt int64 `json:"created_at"` // 任务创建时间
StartedAt int64 `json:"started_at"` // 任务开始处理时间
CompletedAt int64 `json:"completed_at"` // 任务完成时间
Status TaskStatus `json:"status"` // 任务状态
Error string `json:"error"` // 错误信息
Metadata map[string]string `json:"metadata"` // 元数据
}
Task 表示一个工作队列任务
func (*Task) WithMaxRetries ¶
WithMaxRetries 设置最大重试次数
type TaskStatus ¶
type TaskStatus string
TaskStatus 任务状态
const ( // TaskStatusPending 等待处理 TaskStatusPending TaskStatus = "pending" // TaskStatusProcessing 处理中 TaskStatusProcessing TaskStatus = "processing" // TaskStatusCompleted 已完成 TaskStatusCompleted TaskStatus = "completed" // TaskStatusFailed 已失败 TaskStatusFailed TaskStatus = "failed" // TaskStatusDeadLetter 死信 TaskStatusDeadLetter TaskStatus = "dead_letter" )
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker 从队列消费任务并交给 Handler 处理
type WorkerPool ¶
type WorkerPool interface {
// Start 启动工作线程池
Start(ctx context.Context) error
// Stop 停止工作线程池
Stop() error
// IsRunning 线程池是否正在运行
IsRunning() bool
}
WorkerPool 工作线程池接口
type WorkerPoolImpl ¶
type WorkerPoolImpl struct {
// contains filtered or unexported fields
}
WorkerPoolImpl 工作线程池,管理多个 Worker 协程
func NewWorkerPool ¶
func NewWorkerPool(queue Queue, handler Handler, config QueueConfig) *WorkerPoolImpl
NewWorkerPool 创建工作线程池
func (*WorkerPoolImpl) Start ¶
func (p *WorkerPoolImpl) Start(ctx context.Context) error
Start 启动工作线程池,创建 config.MaxConcurrency 个工作线程
func (*WorkerPoolImpl) WithLogger ¶
func (p *WorkerPoolImpl) WithLogger(log logger.Logger) *WorkerPoolImpl
WithLogger 设置工作线程池的日志实例