queue

package
v0.0.0-...-9b26579 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DBConnector

type DBConnector struct {
	// contains filtered or unexported fields
}

DBConnector 处理与PostgreSQL的交互

func NewDBConnector

func NewDBConnector(connectionString string) (*DBConnector, error)

NewDBConnector 创建一个新的数据库连接器

func (*DBConnector) CleanupCompletedTasks

func (d *DBConnector) CleanupCompletedTasks(threshold time.Time) (int64, error)

CleanupCompletedTasks 清理已完成的任务

func (*DBConnector) Close

func (d *DBConnector) Close() error

Close 关闭数据库连接

func (*DBConnector) CompleteTask

func (d *DBConnector) CompleteTask(taskID int64) error

CompleteTask 标记任务为已完成

func (*DBConnector) CreateQueue

func (d *DBConnector) CreateQueue(name string, workerCount int) error

CreateQueue 在数据库中创建队列

func (*DBConnector) DeleteQueue

func (d *DBConnector) DeleteQueue(name string) error

DeleteQueue 删除队列

func (*DBConnector) DequeueTask

func (d *DBConnector) DequeueTask(ctx context.Context, queueName, workerID string) (*Task, error)

DequeueTask 从队列中获取一个待处理的任务

func (*DBConnector) EnqueueTask

func (d *DBConnector) EnqueueTask(queueName string, payload []byte) (int64, error)

EnqueueTask 将任务加入队列

func (*DBConnector) FailTask

func (d *DBConnector) FailTask(taskID int64, errMsg string) error

FailTask 标记任务为失败

func (*DBConnector) GetAllQueues

func (d *DBConnector) GetAllQueues() ([]*Queue, error)

GetAllQueues 获取所有队列

func (*DBConnector) GetPendingTaskCount

func (d *DBConnector) GetPendingTaskCount(queueName string) (int, error)

GetPendingTaskCount 获取队列中待处理任务的数量

func (*DBConnector) GetQueue

func (d *DBConnector) GetQueue(name string) (*Queue, error)

GetQueue 获取队列信息

func (*DBConnector) InitSchema

func (d *DBConnector) InitSchema() error

InitSchema 初始化数据库表结构

func (*DBConnector) ResetStaleTasks

func (d *DBConnector) ResetStaleTasks(threshold time.Time) (int64, error)

ResetStaleTasks 重置卡住的任务

func (*DBConnector) UpdateQueueWorkerCount

func (d *DBConnector) UpdateQueueWorkerCount(name string, workerCount int) error

UpdateQueueWorkerCount 更新队列的worker数量

type DefaultTaskProcessor

type DefaultTaskProcessor struct{}

DefaultTaskProcessor 默认任务处理器实现

func (*DefaultTaskProcessor) Process

func (p *DefaultTaskProcessor) Process(ctx context.Context, task *Task) error

Process 实现默认的任务处理逻辑

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

Monitor 提供系统状态查询功能

func NewMonitor

func NewMonitor(qm *QueueManager, db *DBConnector) *Monitor

NewMonitor 创建一个新的监控接口

func (*Monitor) Start

func (m *Monitor) Start(addr string) error

Start 启动监控HTTP服务

func (*Monitor) Stop

func (m *Monitor) Stop() error

Stop 停止监控HTTP服务

type Queue

type Queue struct {
	Name        string
	WorkerCount int
	CreatedAt   time.Time
	UpdatedAt   time.Time
}

Queue 表示队列配置

type QueueManager

type QueueManager struct {
	// contains filtered or unexported fields
}

QueueManager 负责队列的创建、删除和worker数量的动态调整

func NewQueueManager

func NewQueueManager(db *DBConnector) *QueueManager

NewQueueManager 创建一个新的队列管理器

func (*QueueManager) CreateQueue

func (qm *QueueManager) CreateQueue(queueName string, workerCount int) error

CreateQueue 创建一个新队列并启动指定数量的worker

func (*QueueManager) DeleteQueue

func (qm *QueueManager) DeleteQueue(queueName string) error

DeleteQueue 删除指定队列

func (*QueueManager) GetAllQueueStatus

func (qm *QueueManager) GetAllQueueStatus() ([]*QueueStatus, error)

GetAllQueueStatus 获取所有队列的状态

func (*QueueManager) GetQueueStatus

func (qm *QueueManager) GetQueueStatus(queueName string) (*QueueStatus, error)

GetQueueStatus 获取指定队列的状态

func (*QueueManager) RegisterTaskProcessor

func (qm *QueueManager) RegisterTaskProcessor(queueName string, processor TaskProcessor)

RegisterTaskProcessor 注册队列的任务处理器

func (*QueueManager) SetCleanupInterval

func (qm *QueueManager) SetCleanupInterval(interval time.Duration)

SetCleanupInterval 设置清理间隔

func (*QueueManager) SetDefaultTaskProcessor

func (qm *QueueManager) SetDefaultTaskProcessor(processor TaskProcessor)

SetDefaultTaskProcessor 设置默认任务处理器

func (*QueueManager) Start

func (qm *QueueManager) Start() error

Start 启动队列管理器

func (*QueueManager) Stop

func (qm *QueueManager) Stop()

Stop 停止队列管理器

func (*QueueManager) UpdateWorkerCount

func (qm *QueueManager) UpdateWorkerCount(queueName string, newWorkerCount int) error

UpdateWorkerCount 更新指定队列的worker数量

type QueueStatus

type QueueStatus struct {
	QueueName     string `json:"queue_name"`
	WorkerCount   int    `json:"worker_count"`
	PendingTasks  int    `json:"pending_tasks"`
	ActiveWorkers int    `json:"active_workers"`
}

QueueStatus 表示队列的状态信息

type Task

type Task struct {
	ID        int64
	QueueName string
	Payload   []byte
	Status    string // pending, processing, completed, failed
	CreatedAt time.Time
	UpdatedAt time.Time
	StartedAt *time.Time
	EndedAt   *time.Time
	Error     string
	WorkerID  string
}

Task 表示队列中的任务

type TaskProcessor

type TaskProcessor interface {
	// Process 处理任务,返回错误表示处理失败
	Process(ctx context.Context, task *Task) error
}

TaskProcessor 定义任务处理器接口

type TaskProcessorRegistry

type TaskProcessorRegistry struct {
	// contains filtered or unexported fields
}

TaskProcessorRegistry 任务处理器注册表

func NewTaskProcessorRegistry

func NewTaskProcessorRegistry() *TaskProcessorRegistry

NewTaskProcessorRegistry 创建一个新的任务处理器注册表

func (*TaskProcessorRegistry) GetProcessor

func (r *TaskProcessorRegistry) GetProcessor(queueName string) TaskProcessor

GetProcessor 获取队列的任务处理器

func (*TaskProcessorRegistry) Register

func (r *TaskProcessorRegistry) Register(queueName string, processor TaskProcessor)

Register 注册队列的任务处理器

func (*TaskProcessorRegistry) SetDefaultProcessor

func (r *TaskProcessorRegistry) SetDefaultProcessor(processor TaskProcessor)

SetDefaultProcessor 设置默认任务处理器

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool 管理goroutine工作器的生命周期

func NewWorkerPool

func NewWorkerPool(db *DBConnector, queueName string, workerCount int) *WorkerPool

NewWorkerPool 创建一个新的工作器池

func (*WorkerPool) GetActiveWorkerCount

func (wp *WorkerPool) GetActiveWorkerCount() int

GetActiveWorkerCount 获取活跃工作器数量

func (*WorkerPool) GetWorkerCount

func (wp *WorkerPool) GetWorkerCount() int

GetWorkerCount 获取工作器数量

func (*WorkerPool) SetPollInterval

func (wp *WorkerPool) SetPollInterval(interval time.Duration)

SetPollInterval 设置轮询间隔

func (*WorkerPool) SetProcessTimeout

func (wp *WorkerPool) SetProcessTimeout(timeout time.Duration)

SetProcessTimeout 设置任务处理超时时间

func (*WorkerPool) SetProcessor

func (wp *WorkerPool) SetProcessor(processor TaskProcessor)

SetProcessor 设置任务处理器

func (*WorkerPool) Start

func (wp *WorkerPool) Start()

Start 启动工作器池

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop 停止工作器池

func (*WorkerPool) UpdateWorkerCount

func (wp *WorkerPool) UpdateWorkerCount(newCount int)

UpdateWorkerCount 更新工作器数量

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL