Documentation
¶
Index ¶
- type DBConnector
- func (d *DBConnector) CleanupCompletedTasks(threshold time.Time) (int64, error)
- func (d *DBConnector) Close() error
- func (d *DBConnector) CompleteTask(taskID int64) error
- func (d *DBConnector) CreateQueue(name string, workerCount int) error
- func (d *DBConnector) DeleteQueue(name string) error
- func (d *DBConnector) DequeueTask(ctx context.Context, queueName, workerID string) (*Task, error)
- func (d *DBConnector) EnqueueTask(queueName string, payload []byte) (int64, error)
- func (d *DBConnector) FailTask(taskID int64, errMsg string) error
- func (d *DBConnector) GetAllQueues() ([]*Queue, error)
- func (d *DBConnector) GetPendingTaskCount(queueName string) (int, error)
- func (d *DBConnector) GetQueue(name string) (*Queue, error)
- func (d *DBConnector) InitSchema() error
- func (d *DBConnector) ResetStaleTasks(threshold time.Time) (int64, error)
- func (d *DBConnector) UpdateQueueWorkerCount(name string, workerCount int) error
- type DefaultTaskProcessor
- type Monitor
- type Queue
- type QueueManager
- func (qm *QueueManager) CreateQueue(queueName string, workerCount int) error
- func (qm *QueueManager) DeleteQueue(queueName string) error
- func (qm *QueueManager) GetAllQueueStatus() ([]*QueueStatus, error)
- func (qm *QueueManager) GetQueueStatus(queueName string) (*QueueStatus, error)
- func (qm *QueueManager) RegisterTaskProcessor(queueName string, processor TaskProcessor)
- func (qm *QueueManager) SetCleanupInterval(interval time.Duration)
- func (qm *QueueManager) SetDefaultTaskProcessor(processor TaskProcessor)
- func (qm *QueueManager) Start() error
- func (qm *QueueManager) Stop()
- func (qm *QueueManager) UpdateWorkerCount(queueName string, newWorkerCount int) error
- type QueueStatus
- type Task
- type TaskProcessor
- type TaskProcessorRegistry
- type WorkerPool
- func (wp *WorkerPool) GetActiveWorkerCount() int
- func (wp *WorkerPool) GetWorkerCount() int
- func (wp *WorkerPool) SetPollInterval(interval time.Duration)
- func (wp *WorkerPool) SetProcessTimeout(timeout time.Duration)
- func (wp *WorkerPool) SetProcessor(processor TaskProcessor)
- func (wp *WorkerPool) Start()
- func (wp *WorkerPool) Stop()
- func (wp *WorkerPool) UpdateWorkerCount(newCount int)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DBConnector ¶
type DBConnector struct {
// contains filtered or unexported fields
}
DBConnector 处理与PostgreSQL的交互
func NewDBConnector ¶
func NewDBConnector(connectionString string) (*DBConnector, error)
NewDBConnector 创建一个新的数据库连接器
func (*DBConnector) CleanupCompletedTasks ¶
func (d *DBConnector) CleanupCompletedTasks(threshold time.Time) (int64, error)
CleanupCompletedTasks 清理已完成的任务
func (*DBConnector) CompleteTask ¶
func (d *DBConnector) CompleteTask(taskID int64) error
CompleteTask 标记任务为已完成
func (*DBConnector) CreateQueue ¶
func (d *DBConnector) CreateQueue(name string, workerCount int) error
CreateQueue 在数据库中创建队列
func (*DBConnector) DeleteQueue ¶
func (d *DBConnector) DeleteQueue(name string) error
DeleteQueue 删除队列
func (*DBConnector) DequeueTask ¶
DequeueTask 从队列中获取一个待处理的任务
func (*DBConnector) EnqueueTask ¶
func (d *DBConnector) EnqueueTask(queueName string, payload []byte) (int64, error)
EnqueueTask 将任务加入队列
func (*DBConnector) FailTask ¶
func (d *DBConnector) FailTask(taskID int64, errMsg string) error
FailTask 标记任务为失败
func (*DBConnector) GetAllQueues ¶
func (d *DBConnector) GetAllQueues() ([]*Queue, error)
GetAllQueues 获取所有队列
func (*DBConnector) GetPendingTaskCount ¶
func (d *DBConnector) GetPendingTaskCount(queueName string) (int, error)
GetPendingTaskCount 获取队列中待处理任务的数量
func (*DBConnector) GetQueue ¶
func (d *DBConnector) GetQueue(name string) (*Queue, error)
GetQueue 获取队列信息
func (*DBConnector) ResetStaleTasks ¶
func (d *DBConnector) ResetStaleTasks(threshold time.Time) (int64, error)
ResetStaleTasks 重置卡住的任务
func (*DBConnector) UpdateQueueWorkerCount ¶
func (d *DBConnector) UpdateQueueWorkerCount(name string, workerCount int) error
UpdateQueueWorkerCount 更新队列的worker数量
type QueueManager ¶
type QueueManager struct {
// contains filtered or unexported fields
}
QueueManager 负责队列的创建、删除和worker数量的动态调整
func NewQueueManager ¶
func NewQueueManager(db *DBConnector) *QueueManager
NewQueueManager 创建一个新的队列管理器
func (*QueueManager) CreateQueue ¶
func (qm *QueueManager) CreateQueue(queueName string, workerCount int) error
CreateQueue 创建一个新队列并启动指定数量的worker
func (*QueueManager) DeleteQueue ¶
func (qm *QueueManager) DeleteQueue(queueName string) error
DeleteQueue 删除指定队列
func (*QueueManager) GetAllQueueStatus ¶
func (qm *QueueManager) GetAllQueueStatus() ([]*QueueStatus, error)
GetAllQueueStatus 获取所有队列的状态
func (*QueueManager) GetQueueStatus ¶
func (qm *QueueManager) GetQueueStatus(queueName string) (*QueueStatus, error)
GetQueueStatus 获取指定队列的状态
func (*QueueManager) RegisterTaskProcessor ¶
func (qm *QueueManager) RegisterTaskProcessor(queueName string, processor TaskProcessor)
RegisterTaskProcessor 注册队列的任务处理器
func (*QueueManager) SetCleanupInterval ¶
func (qm *QueueManager) SetCleanupInterval(interval time.Duration)
SetCleanupInterval 设置清理间隔
func (*QueueManager) SetDefaultTaskProcessor ¶
func (qm *QueueManager) SetDefaultTaskProcessor(processor TaskProcessor)
SetDefaultTaskProcessor 设置默认任务处理器
func (*QueueManager) UpdateWorkerCount ¶
func (qm *QueueManager) UpdateWorkerCount(queueName string, newWorkerCount int) error
UpdateWorkerCount 更新指定队列的worker数量
type QueueStatus ¶
type QueueStatus struct { QueueName string `json:"queue_name"` WorkerCount int `json:"worker_count"` PendingTasks int `json:"pending_tasks"` ActiveWorkers int `json:"active_workers"` }
QueueStatus 表示队列的状态信息
type Task ¶
type Task struct { ID int64 QueueName string Payload []byte Status string // pending, processing, completed, failed CreatedAt time.Time UpdatedAt time.Time StartedAt *time.Time EndedAt *time.Time Error string WorkerID string }
Task 表示队列中的任务
type TaskProcessor ¶
type TaskProcessor interface { // Process 处理任务,返回错误表示处理失败 Process(ctx context.Context, task *Task) error }
TaskProcessor 定义任务处理器接口
type TaskProcessorRegistry ¶
type TaskProcessorRegistry struct {
// contains filtered or unexported fields
}
TaskProcessorRegistry 任务处理器注册表
func NewTaskProcessorRegistry ¶
func NewTaskProcessorRegistry() *TaskProcessorRegistry
NewTaskProcessorRegistry 创建一个新的任务处理器注册表
func (*TaskProcessorRegistry) GetProcessor ¶
func (r *TaskProcessorRegistry) GetProcessor(queueName string) TaskProcessor
GetProcessor 获取队列的任务处理器
func (*TaskProcessorRegistry) Register ¶
func (r *TaskProcessorRegistry) Register(queueName string, processor TaskProcessor)
Register 注册队列的任务处理器
func (*TaskProcessorRegistry) SetDefaultProcessor ¶
func (r *TaskProcessorRegistry) SetDefaultProcessor(processor TaskProcessor)
SetDefaultProcessor 设置默认任务处理器
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool 管理goroutine工作器的生命周期
func NewWorkerPool ¶
func NewWorkerPool(db *DBConnector, queueName string, workerCount int) *WorkerPool
NewWorkerPool 创建一个新的工作器池
func (*WorkerPool) GetActiveWorkerCount ¶
func (wp *WorkerPool) GetActiveWorkerCount() int
GetActiveWorkerCount 获取活跃工作器数量
func (*WorkerPool) GetWorkerCount ¶
func (wp *WorkerPool) GetWorkerCount() int
GetWorkerCount 获取工作器数量
func (*WorkerPool) SetPollInterval ¶
func (wp *WorkerPool) SetPollInterval(interval time.Duration)
SetPollInterval 设置轮询间隔
func (*WorkerPool) SetProcessTimeout ¶
func (wp *WorkerPool) SetProcessTimeout(timeout time.Duration)
SetProcessTimeout 设置任务处理超时时间
func (*WorkerPool) SetProcessor ¶
func (wp *WorkerPool) SetProcessor(processor TaskProcessor)
SetProcessor 设置任务处理器
func (*WorkerPool) UpdateWorkerCount ¶
func (wp *WorkerPool) UpdateWorkerCount(newCount int)
UpdateWorkerCount 更新工作器数量