task

package
v0.0.0-...-7cfd609 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: MIT Imports: 11 Imported by: 0

README

Task Queue - 企业级本地任务队列

高性能、高效率、高可靠性的本地任务队列系统,支持多工作器并发执行。

特性

  • 高性能: 基于优先级队列的任务调度,支持批量提交
  • 多工作器: 自动根据 CPU 核心数配置工作器数量,支持自定义
  • 任务重试: 内置重试机制,可配置最大重试次数
  • 超时控制: 支持任务超时自动终止
  • 优先级调度: 支持 0-100 优先级,数值越高优先级越高
  • 监控指标: 实时统计任务执行情况,支持健康检查
  • 线程安全: 完全线程安全,支持高并发提交

快速开始

基本使用
package main

import (
    "leyline-doc-backend/pkg/task"
    "time"
)

func main() {
    // 创建任务队列
    queue := task.NewTaskQueue(task.DefaultConfig())
    queue.Start()
    defer queue.Stop()
    
    // 提交任务
    taskID, err := queue.SubmitFunc("my_task", func() error {
        // 任务逻辑
        return nil
    })
    
    if err != nil {
        // 处理错误
    }
    
    // 等待任务执行
    time.Sleep(100 * time.Millisecond)
}
配置选项
queue := task.NewTaskQueue(task.TaskQueueConfig{
    WorkerCount:  8,      // 工作器数量 (0=自动)
    MaxQueueSize: 10000,  // 最大队列大小
})

// 提交带配置的任务
taskID, err := queue.SubmitFunc(
    "configured_task",
    taskFunc,
    task.WithPriority(80),            // 高优先级 (0-100)
    task.WithMaxRetries(5),           // 最多重试 5 次
    task.WithTimeout(5*time.Second),  // 5 秒超时
    task.WithMetadata("key", value),  // 元数据
)
批量提交
tasks := make([]*task.Task, 0, 10)
for i := 0; i < 10; i++ {
    t := task.NewTask(
        fmt.Sprintf("batch_task_%d", i),
        taskFunc,
        task.WithPriority(50),
    )
    tasks = append(tasks, t)
}

err := queue.pool.SubmitBatch(tasks)
监控和指标
// 获取统计信息
stats := queue.GetStats()
fmt.Printf("总任务数:%d\n", stats.TotalTasks)
fmt.Printf("成功任务数:%d\n", stats.SuccessTasks)
fmt.Printf("活跃工作器:%d/%d\n", stats.ActiveWorkers, stats.WorkerCount)

// 导出指标 JSON
exporter := task.NewMetricsExporter(queue)
metricsJSON, _ := exporter.ExportMetrics()

// 健康检查
health := exporter.HealthCheck()
if health.Status != "healthy" {
    // 处理不健康状态
}

API 参考

TaskQueue
方法 说明
NewTaskQueue(config) 创建任务队列
Start() 启动任务队列
Stop() 停止任务队列
Submit(task) 提交任务
SubmitFunc(name, fn, opts...) 提交任务函数
GetTask(taskID) 获取任务
CancelTask(taskID) 取消任务
GetStats() 获取统计信息
GetQueueSize() 获取队列大小
Task 选项
函数 说明
WithPriority(n) 设置优先级 (0-100)
WithMaxRetries(n) 设置最大重试次数
WithTimeout(d) 设置超时时间
WithMetadata(k, v) 设置元数据
任务状态
状态 说明
TaskStatusPending 等待执行
TaskStatusRunning 执行中
TaskStatusSuccess 执行成功
TaskStatusFailed 执行失败
TaskStatusCancelled 已取消

统计指标

指标 说明
TotalTasks 总任务数
SuccessTasks 成功任务数
FailedTasks 失败任务数
RetryTasks 重试任务数
ActiveWorkers 活跃工作器数
QueuedTasks 排队任务数
AvgExecuteTime 平均执行时间 (纳秒)

最佳实践

  1. 合理设置工作器数量: 默认根据 CPU 核心数自动配置,IO 密集型任务可适当增加
  2. 设置合适的超时时间: 避免任务长时间占用工作器
  3. 配置重试机制: 对于可能失败的任务,配置适当的重试次数
  4. 使用优先级: 重要任务设置高优先级
  5. 监控队列状态: 定期检查队列大小和失败率
  6. 优雅关闭: 使用 defer queue.Stop() 确保资源释放

测试

cd pkg/task
go test -v -race

注意事项

  • 任务函数应该是幂等的,因为可能会重试执行
  • 避免在任务函数中执行阻塞操作,如需阻塞请使用超时控制
  • 任务队列停止时会等待所有执行中的任务完成
  • 取消操作只对等待执行的任务有效,执行中的任务无法取消

Documentation

Overview

Example (BasicUsage)

Example_basicUsage 基本使用示例

// 创建任务队列
queue := NewTaskQueue(DefaultConfig())
queue.Start()
defer queue.Stop()

// 提交简单任务
taskID, err := queue.SubmitFunc("simple_task", func() error {
	fmt.Println("执行简单任务")
	return nil
})

if err != nil {
	fmt.Printf("提交任务失败:%v\n", err)
	return
}

fmt.Printf("任务已提交:ID=%s\n", taskID)

// 等待任务执行
time.Sleep(100 * time.Millisecond)
Example (BatchSubmit)

Example_batchSubmit 批量提交示例

queue := NewTaskQueue(TaskQueueConfig{
	WorkerCount:  4,
	MaxQueueSize: 1000,
})
queue.Start()
defer queue.Stop()

// 批量创建任务
tasks := make([]*Task, 0, 10)
for i := 0; i < 10; i++ {
	task := NewTask(
		fmt.Sprintf("batch_task_%d", i),
		func() error {
			fmt.Println("执行批量任务")
			return nil
		},
		WithPriority(50),
	)
	tasks = append(tasks, task)
}

// 批量提交
err := queue.pool.SubmitBatch(tasks)
if err != nil {
	fmt.Printf("批量提交失败:%v\n", err)
	return
}

fmt.Printf("已批量提交 %d 个任务\n", len(tasks))
time.Sleep(500 * time.Millisecond)
Example (Monitor)

Example_monitor 监控示例

queue := NewTaskQueue(DefaultConfig())
queue.Start()
defer queue.Stop()

// 提交一些任务
for i := 0; i < 5; i++ {
	queue.SubmitFunc(
		fmt.Sprintf("monitor_task_%d", i),
		func() error {
			time.Sleep(10 * time.Millisecond)
			return nil
		},
	)
}

time.Sleep(500 * time.Millisecond)

// 获取统计信息
stats := queue.GetStats()
fmt.Printf("总任务数:%d\n", stats.TotalTasks)
fmt.Printf("成功任务数:%d\n", stats.SuccessTasks)
fmt.Printf("失败任务数:%d\n", stats.FailedTasks)
fmt.Printf("活跃工作器:%d/%d\n", stats.ActiveWorkers, stats.WorkerCount)
fmt.Printf("平均执行时间:%d ms\n", stats.AvgExecuteTime/1e6)

// 导出指标
exporter := NewMetricsExporter(queue)
metricsJSON, _ := exporter.ExportMetrics()
fmt.Printf("指标 JSON: %s\n", string(metricsJSON))

// 健康检查
health := exporter.HealthCheck()
fmt.Printf("健康状态:%s\n", health.Status)
Example (Retry)

Example_retry 重试机制示例

queue := NewTaskQueue(DefaultConfig())
queue.Start()
defer queue.Stop()

var attempt int
maxAttempts := 3

queue.SubmitFunc(
	"retry_example",
	func() error {
		attempt++
		fmt.Printf("执行尝试 %d/%d\n", attempt, maxAttempts)

		if attempt < maxAttempts {
			return fmt.Errorf("模拟失败 (尝试 %d)", attempt)
		}

		fmt.Println("任务执行成功")
		return nil
	},
	WithMaxRetries(3),
)

time.Sleep(500 * time.Millisecond)
Example (Timeout)

Example_timeout 超时控制示例

queue := NewTaskQueue(DefaultConfig())
queue.Start()
defer queue.Stop()

queue.SubmitFunc(
	"timeout_example",
	func() error {
		fmt.Println("开始执行长时间任务")
		time.Sleep(2 * time.Second) // 模拟长时间任务
		fmt.Println("任务完成")
		return nil
	},
	WithTimeout(500*time.Millisecond), // 设置 500ms 超时
)

time.Sleep(3 * time.Second)

// 任务会因超时而失败
Example (WithOptions)

Example_withOptions 带配置选项的任务

queue := NewTaskQueue(DefaultConfig())
queue.Start()
defer queue.Stop()

taskID, err := queue.SubmitFunc(
	"configured_task",
	func() error {
		fmt.Println("执行配置任务")
		return nil
	},
	WithPriority(80),           // 高优先级
	WithMaxRetries(5),          // 最多重试 5 次
	WithTimeout(5*time.Second), // 5 秒超时
	WithMetadata("user_id", 123),
)

if err != nil {
	fmt.Printf("提交任务失败:%v\n", err)
	return
}

fmt.Printf("任务已提交:ID=%s\n", taskID)
time.Sleep(100 * time.Millisecond)

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrPoolShutdown 工作池已关闭
	ErrPoolShutdown = errors.New("worker pool is shutdown")

	// ErrTaskNotFound 任务未找到
	ErrTaskNotFound = errors.New("task not found")

	// ErrQueueFull 队列已满
	ErrQueueFull = errors.New("task queue is full")
)

Functions

This section is empty.

Types

type HealthStatus

type HealthStatus struct {
	Status    string          `json:"status"`
	Timestamp string          `json:"timestamp"`
	Checks    map[string]bool `json:"checks"`
}

HealthStatus 健康状态

type MetricsExporter

type MetricsExporter struct {
	// contains filtered or unexported fields
}

MetricsExporter 指标导出器

func NewMetricsExporter

func NewMetricsExporter(queue *TaskQueue) *MetricsExporter

NewMetricsExporter 创建指标导出器

func (*MetricsExporter) ExportMetrics

func (e *MetricsExporter) ExportMetrics() ([]byte, error)

ExportMetrics 导出指标为 JSON

func (*MetricsExporter) GetMetricsMap

func (e *MetricsExporter) GetMetricsMap() map[string]interface{}

GetMetricsMap 获取指标 Map

func (*MetricsExporter) HealthCheck

func (e *MetricsExporter) HealthCheck() HealthStatus

HealthCheck 健康检查

type PoolStats

type PoolStats struct {
	TotalTasks     int64 // 总任务数
	SuccessTasks   int64 // 成功任务数
	FailedTasks    int64 // 失败任务数
	RetryTasks     int64 // 重试任务数
	ActiveWorkers  int64 // 活跃工作器数
	QueuedTasks    int64 // 排队任务数
	AvgExecuteTime int64 // 平均执行时间 (纳秒)
	// contains filtered or unexported fields
}

PoolStats 工作池统计信息

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue 优先级队列

func (*PriorityQueue) PeekTask

func (pq *PriorityQueue) PeekTask() *Task

PeekTask 查看队首任务(不取出)

func (*PriorityQueue) PopTask

func (pq *PriorityQueue) PopTask() *Task

PopTask 从队列取出任务

func (*PriorityQueue) PushTask

func (pq *PriorityQueue) PushTask(task *Task)

PushTask 添加任务到队列

func (*PriorityQueue) Size

func (pq *PriorityQueue) Size() int

Size 获取队列大小

type QueueStats

type QueueStats struct {
	TotalTasks     int64 // 总任务数
	SuccessTasks   int64 // 成功任务数
	FailedTasks    int64 // 失败任务数
	RetryTasks     int64 // 重试任务数
	ActiveWorkers  int64 // 活跃工作器数
	QueuedTasks    int64 // 排队任务数
	WorkerCount    int64 // 工作器总数
	MaxQueueSize   int64 // 最大队列大小
	AvgExecuteTime int64 // 平均执行时间 (纳秒)
}

QueueStats 队列统计信息

type Task

type Task struct {
	ID         string                 // 任务唯一标识
	Name       string                 // 任务名称
	TaskFunc   TaskFunc               // 任务执行函数
	Status     TaskStatus             // 任务状态
	Priority   int                    // 优先级 (0-100, 越高优先级越高)
	MaxRetries int                    // 最大重试次数
	RetryCount int                    // 当前重试次数
	Timeout    time.Duration          // 任务超时时间
	CreatedAt  time.Time              // 创建时间
	StartedAt  *time.Time             // 开始执行时间
	FinishedAt *time.Time             // 完成时间
	Error      error                  // 错误信息
	Metadata   map[string]interface{} // 元数据
	// contains filtered or unexported fields
}

Task 任务定义

func NewTask

func NewTask(name string, fn TaskFunc, opts ...TaskOption) *Task

NewTask 创建新任务

func (*Task) GetResult

func (t *Task) GetResult() interface{}

GetResult 获取任务执行结果

type TaskFunc

type TaskFunc func() error

TaskFunc 任务函数类型

type TaskOption

type TaskOption func(*Task)

TaskOption 任务配置选项

func WithMaxRetries

func WithMaxRetries(maxRetries int) TaskOption

WithMaxRetries 设置最大重试次数

func WithMetadata

func WithMetadata(key string, value interface{}) TaskOption

WithMetadata 设置任务元数据

func WithPriority

func WithPriority(priority int) TaskOption

WithPriority 设置任务优先级

func WithTimeout

func WithTimeout(timeout time.Duration) TaskOption

WithTimeout 设置任务超时时间

type TaskQueue

type TaskQueue struct {
	// contains filtered or unexported fields
}

TaskQueue 任务队列管理器

func NewTaskQueue

func NewTaskQueue(config TaskQueueConfig) *TaskQueue

NewTaskQueue 创建任务队列

func (*TaskQueue) CancelTask

func (q *TaskQueue) CancelTask(taskID string) error

CancelTask 取消任务

func (*TaskQueue) GetQueueSize

func (q *TaskQueue) GetQueueSize() int

GetQueueSize 获取队列大小

func (*TaskQueue) GetStats

func (q *TaskQueue) GetStats() *QueueStats

GetStats 获取统计信息

func (*TaskQueue) GetTask

func (q *TaskQueue) GetTask(taskID string) (*Task, bool)

GetTask 获取任务

func (*TaskQueue) Start

func (q *TaskQueue) Start()

Start 启动任务队列

func (*TaskQueue) Stop

func (q *TaskQueue) Stop()

Stop 停止任务队列

func (*TaskQueue) Submit

func (q *TaskQueue) Submit(task *Task) error

Submit 提交任务

func (*TaskQueue) SubmitFunc

func (q *TaskQueue) SubmitFunc(name string, fn TaskFunc, opts ...TaskOption) (string, error)

SubmitFunc 提交任务函数(便捷方法)

func (*TaskQueue) SubmitFuncSync

func (q *TaskQueue) SubmitFuncSync(name string, fn TaskFunc, timeout time.Duration, opts ...TaskOption) (interface{}, error)

SubmitFuncSync 同步提交任务并等待结果

type TaskQueueConfig

type TaskQueueConfig struct {
	WorkerCount  int // 工作器数量
	MaxQueueSize int // 最大队列大小
}

TaskQueueConfig 任务队列配置

func DefaultConfig

func DefaultConfig() TaskQueueConfig

DefaultConfig 默认配置

type TaskStatus

type TaskStatus int

TaskStatus 任务状态

const (
	TaskStatusPending   TaskStatus = iota // 等待执行
	TaskStatusRunning                     // 执行中
	TaskStatusSuccess                     // 执行成功
	TaskStatusFailed                      // 执行失败
	TaskStatusCancelled                   // 已取消
)

func (TaskStatus) String

func (s TaskStatus) String() string

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker 工作器

func (*Worker) GetCurrentTask

func (w *Worker) GetCurrentTask() *Task

GetCurrentTask 获取当前执行的任务

func (*Worker) IsBusy

func (w *Worker) IsBusy() bool

IsBusy 检查工作器是否忙碌

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool 工作池

func NewWorkerPool

func NewWorkerPool(workerCount int) *WorkerPool

NewWorkerPool 创建工作池

func (*WorkerPool) GetQueueSize

func (p *WorkerPool) GetQueueSize() int

GetQueueSize 获取队列大小

func (*WorkerPool) GetStats

func (p *WorkerPool) GetStats() *PoolStats

GetStats 获取统计信息

func (*WorkerPool) GetWorkerCount

func (p *WorkerPool) GetWorkerCount() int

GetWorkerCount 获取工作器数量

func (*WorkerPool) IsShutdown

func (p *WorkerPool) IsShutdown() bool

IsShutdown 检查是否已关闭

func (*WorkerPool) OnTaskFailed

func (p *WorkerPool) OnTaskFailed(fn func(*Task))

OnTaskFailed 设置任务失败回调

func (*WorkerPool) OnTaskSuccess

func (p *WorkerPool) OnTaskSuccess(fn func(*Task))

OnTaskSuccess 设置任务成功回调

func (*WorkerPool) Start

func (p *WorkerPool) Start()

Start 启动工作池

func (*WorkerPool) Stop

func (p *WorkerPool) Stop()

Stop 停止工作池

func (*WorkerPool) Submit

func (p *WorkerPool) Submit(task *Task) error

Submit 提交任务

func (*WorkerPool) SubmitBatch

func (p *WorkerPool) SubmitBatch(tasks []*Task) error

SubmitBatch 批量提交任务

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL