Documentation
¶
Overview ¶
Example (BasicUsage) ¶
Example_basicUsage 基本使用示例
// 创建任务队列
queue := NewTaskQueue(DefaultConfig())
queue.Start()
defer queue.Stop()
// 提交简单任务
taskID, err := queue.SubmitFunc("simple_task", func() error {
fmt.Println("执行简单任务")
return nil
})
if err != nil {
fmt.Printf("提交任务失败:%v\n", err)
return
}
fmt.Printf("任务已提交:ID=%s\n", taskID)
// 等待任务执行
time.Sleep(100 * time.Millisecond)
Example (BatchSubmit) ¶
Example_batchSubmit 批量提交示例
queue := NewTaskQueue(TaskQueueConfig{
WorkerCount: 4,
MaxQueueSize: 1000,
})
queue.Start()
defer queue.Stop()
// 批量创建任务
tasks := make([]*Task, 0, 10)
for i := 0; i < 10; i++ {
task := NewTask(
fmt.Sprintf("batch_task_%d", i),
func() error {
fmt.Println("执行批量任务")
return nil
},
WithPriority(50),
)
tasks = append(tasks, task)
}
// 批量提交
err := queue.pool.SubmitBatch(tasks)
if err != nil {
fmt.Printf("批量提交失败:%v\n", err)
return
}
fmt.Printf("已批量提交 %d 个任务\n", len(tasks))
time.Sleep(500 * time.Millisecond)
Example (Monitor) ¶
Example_monitor 监控示例
queue := NewTaskQueue(DefaultConfig())
queue.Start()
defer queue.Stop()
// 提交一些任务
for i := 0; i < 5; i++ {
queue.SubmitFunc(
fmt.Sprintf("monitor_task_%d", i),
func() error {
time.Sleep(10 * time.Millisecond)
return nil
},
)
}
time.Sleep(500 * time.Millisecond)
// 获取统计信息
stats := queue.GetStats()
fmt.Printf("总任务数:%d\n", stats.TotalTasks)
fmt.Printf("成功任务数:%d\n", stats.SuccessTasks)
fmt.Printf("失败任务数:%d\n", stats.FailedTasks)
fmt.Printf("活跃工作器:%d/%d\n", stats.ActiveWorkers, stats.WorkerCount)
fmt.Printf("平均执行时间:%d ms\n", stats.AvgExecuteTime/1e6)
// 导出指标
exporter := NewMetricsExporter(queue)
metricsJSON, _ := exporter.ExportMetrics()
fmt.Printf("指标 JSON: %s\n", string(metricsJSON))
// 健康检查
health := exporter.HealthCheck()
fmt.Printf("健康状态:%s\n", health.Status)
Example (Retry) ¶
Example_retry 重试机制示例
queue := NewTaskQueue(DefaultConfig())
queue.Start()
defer queue.Stop()
var attempt int
maxAttempts := 3
queue.SubmitFunc(
"retry_example",
func() error {
attempt++
fmt.Printf("执行尝试 %d/%d\n", attempt, maxAttempts)
if attempt < maxAttempts {
return fmt.Errorf("模拟失败 (尝试 %d)", attempt)
}
fmt.Println("任务执行成功")
return nil
},
WithMaxRetries(3),
)
time.Sleep(500 * time.Millisecond)
Example (Timeout) ¶
Example_timeout 超时控制示例
queue := NewTaskQueue(DefaultConfig())
queue.Start()
defer queue.Stop()
queue.SubmitFunc(
"timeout_example",
func() error {
fmt.Println("开始执行长时间任务")
time.Sleep(2 * time.Second) // 模拟长时间任务
fmt.Println("任务完成")
return nil
},
WithTimeout(500*time.Millisecond), // 设置 500ms 超时
)
time.Sleep(3 * time.Second)
// 任务会因超时而失败
Example (WithOptions) ¶
Example_withOptions 带配置选项的任务
queue := NewTaskQueue(DefaultConfig())
queue.Start()
defer queue.Stop()
taskID, err := queue.SubmitFunc(
"configured_task",
func() error {
fmt.Println("执行配置任务")
return nil
},
WithPriority(80), // 高优先级
WithMaxRetries(5), // 最多重试 5 次
WithTimeout(5*time.Second), // 5 秒超时
WithMetadata("user_id", 123),
)
if err != nil {
fmt.Printf("提交任务失败:%v\n", err)
return
}
fmt.Printf("任务已提交:ID=%s\n", taskID)
time.Sleep(100 * time.Millisecond)
Index ¶
- Variables
- type HealthStatus
- type MetricsExporter
- type PoolStats
- type PriorityQueue
- type QueueStats
- type Task
- type TaskFunc
- type TaskOption
- type TaskQueue
- func (q *TaskQueue) CancelTask(taskID string) error
- func (q *TaskQueue) GetQueueSize() int
- func (q *TaskQueue) GetStats() *QueueStats
- func (q *TaskQueue) GetTask(taskID string) (*Task, bool)
- func (q *TaskQueue) Start()
- func (q *TaskQueue) Stop()
- func (q *TaskQueue) Submit(task *Task) error
- func (q *TaskQueue) SubmitFunc(name string, fn TaskFunc, opts ...TaskOption) (string, error)
- func (q *TaskQueue) SubmitFuncSync(name string, fn TaskFunc, timeout time.Duration, opts ...TaskOption) (interface{}, error)
- type TaskQueueConfig
- type TaskStatus
- type Worker
- type WorkerPool
- func (p *WorkerPool) GetQueueSize() int
- func (p *WorkerPool) GetStats() *PoolStats
- func (p *WorkerPool) GetWorkerCount() int
- func (p *WorkerPool) IsShutdown() bool
- func (p *WorkerPool) OnTaskFailed(fn func(*Task))
- func (p *WorkerPool) OnTaskSuccess(fn func(*Task))
- func (p *WorkerPool) Start()
- func (p *WorkerPool) Stop()
- func (p *WorkerPool) Submit(task *Task) error
- func (p *WorkerPool) SubmitBatch(tasks []*Task) error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type HealthStatus ¶
type HealthStatus struct {
Status string `json:"status"`
Timestamp string `json:"timestamp"`
Checks map[string]bool `json:"checks"`
}
HealthStatus 健康状态
type MetricsExporter ¶
type MetricsExporter struct {
// contains filtered or unexported fields
}
MetricsExporter 指标导出器
func NewMetricsExporter ¶
func NewMetricsExporter(queue *TaskQueue) *MetricsExporter
NewMetricsExporter 创建指标导出器
func (*MetricsExporter) ExportMetrics ¶
func (e *MetricsExporter) ExportMetrics() ([]byte, error)
ExportMetrics 导出指标为 JSON
func (*MetricsExporter) GetMetricsMap ¶
func (e *MetricsExporter) GetMetricsMap() map[string]interface{}
GetMetricsMap 获取指标 Map
func (*MetricsExporter) HealthCheck ¶
func (e *MetricsExporter) HealthCheck() HealthStatus
HealthCheck 健康检查
type PoolStats ¶
type PoolStats struct {
TotalTasks int64 // 总任务数
SuccessTasks int64 // 成功任务数
FailedTasks int64 // 失败任务数
RetryTasks int64 // 重试任务数
ActiveWorkers int64 // 活跃工作器数
QueuedTasks int64 // 排队任务数
AvgExecuteTime int64 // 平均执行时间 (纳秒)
// contains filtered or unexported fields
}
PoolStats 工作池统计信息
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
PriorityQueue 优先级队列
type QueueStats ¶
type QueueStats struct {
TotalTasks int64 // 总任务数
SuccessTasks int64 // 成功任务数
FailedTasks int64 // 失败任务数
RetryTasks int64 // 重试任务数
ActiveWorkers int64 // 活跃工作器数
QueuedTasks int64 // 排队任务数
WorkerCount int64 // 工作器总数
MaxQueueSize int64 // 最大队列大小
AvgExecuteTime int64 // 平均执行时间 (纳秒)
}
QueueStats 队列统计信息
type Task ¶
type Task struct {
ID string // 任务唯一标识
Name string // 任务名称
TaskFunc TaskFunc // 任务执行函数
Status TaskStatus // 任务状态
Priority int // 优先级 (0-100, 越高优先级越高)
MaxRetries int // 最大重试次数
RetryCount int // 当前重试次数
Timeout time.Duration // 任务超时时间
CreatedAt time.Time // 创建时间
StartedAt *time.Time // 开始执行时间
FinishedAt *time.Time // 完成时间
Error error // 错误信息
Metadata map[string]interface{} // 元数据
// contains filtered or unexported fields
}
Task 任务定义
type TaskOption ¶
type TaskOption func(*Task)
TaskOption 任务配置选项
func WithMetadata ¶
func WithMetadata(key string, value interface{}) TaskOption
WithMetadata 设置任务元数据
type TaskQueue ¶
type TaskQueue struct {
// contains filtered or unexported fields
}
TaskQueue 任务队列管理器
func (*TaskQueue) SubmitFunc ¶
SubmitFunc 提交任务函数(便捷方法)
func (*TaskQueue) SubmitFuncSync ¶
func (q *TaskQueue) SubmitFuncSync(name string, fn TaskFunc, timeout time.Duration, opts ...TaskOption) (interface{}, error)
SubmitFuncSync 同步提交任务并等待结果
type TaskQueueConfig ¶
TaskQueueConfig 任务队列配置
type TaskStatus ¶
type TaskStatus int
TaskStatus 任务状态
const ( TaskStatusPending TaskStatus = iota // 等待执行 TaskStatusRunning // 执行中 TaskStatusSuccess // 执行成功 TaskStatusFailed // 执行失败 TaskStatusCancelled // 已取消 )
func (TaskStatus) String ¶
func (s TaskStatus) String() string
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool 工作池
func (*WorkerPool) GetWorkerCount ¶
func (p *WorkerPool) GetWorkerCount() int
GetWorkerCount 获取工作器数量
func (*WorkerPool) OnTaskFailed ¶
func (p *WorkerPool) OnTaskFailed(fn func(*Task))
OnTaskFailed 设置任务失败回调
func (*WorkerPool) OnTaskSuccess ¶
func (p *WorkerPool) OnTaskSuccess(fn func(*Task))
OnTaskSuccess 设置任务成功回调
func (*WorkerPool) SubmitBatch ¶
func (p *WorkerPool) SubmitBatch(tasks []*Task) error
SubmitBatch 批量提交任务
Click to show internal directories.
Click to hide internal directories.