分布式训练平台测试工具包

一个专为Go语言分布式训练平台设计的单元测试工具包。本项目提供了完整的Mock框架,用于模拟Kubernetes集群、GPU资源管理和分布式存储系统,让开发者能够在本地环境中进行全面的单元测试。
🚀 核心特性
- 依赖隔离: 完整的Kubernetes、GPU资源、存储系统Mock实现
- 场景构建: 预置常用测试场景模板,快速构建测试用例
- 分布式测试: 模拟多节点训练环境,无需真实集群
- 资源管理: 完整的GPU分配、内存管理、资源调度测试
- 框架无关: 支持PyTorch、TensorFlow等多种训练框架
- 线程安全: 所有Mock实现支持并发测试
- 易于扩展: 模块化设计,便于添加新的Mock和断言
📋 目录结构
📦 安装
go get github.com/heibot/aiut
🚀 快速开始
# 克隆仓库
git clone https://github.com/heibot/aiut.git
cd aiut
# 安装依赖
go mod tidy
# 运行测试
go test ./...
# 运行覆盖率测试
go test ./... -coverprofile=coverage.out
🏗️ 核心组件
Mock层
k8s/ - Kubernetes API完整Mock实现
resource/ - GPU和资源管理Mock
storage/ - 对象存储(S3/GCS)Mock
monitoring/ - 性能监控和告警Mock
communication/ - 分布式通信Mock
场景生成器
scenarios/ - 预置测试场景模板
builders/ - 流畅API构建复杂测试对象
断言扩展
- 分布式训练专用断言
- 任务状态验证
- 资源分配验证
- 分布式一致性检查
- 性能监控断言
- 告警状态验证
💡 使用示例
基础任务创建测试
package main
import (
"context"
"testing"
"github.com/heibot/aiut/k8s"
"github.com/heibot/aiut/scenarios"
"github.com/stretchr/testify/assert"
)
func TestJobScheduler_CreateJob(t *testing.T) {
// 创建Mock Kubernetes客户端
mockK8s := k8s.NewMockK8sClient()
// 使用场景构建器创建测试任务
job := scenarios.NewPyTorchJob("test-job").
WithGPUs("4").
WithNamespace("default").
Build()
// 设置Mock期望
mockK8s.TrainJobs().(*k8s.MockTrainJobClient).
On("Create", mock.Anything, job).Return(nil)
// 测试任务创建
err := mockK8s.TrainJobs().Create(context.Background(), job)
assert.NoError(t, err)
}
GPU资源管理测试
func TestGPUAllocation(t *testing.T) {
// 创建测试环境
env := scenarios.NewTestEnvironment()
// 添加GPU到环境
gpu1 := scenarios.NewGPU("gpu-1").
WithNode("node-1").
WithStatus("available").
Build()
gpu2 := scenarios.NewGPU("gpu-2").
WithNode("node-1").
WithStatus("available").
Build()
env.WithGPU(gpu1).WithGPU(gpu2)
env.Setup()
defer env.Cleanup()
// 测试GPU分配
gpuID, err := env.GPUManager.AllocateGPU("node-1", "job-1", "pod-1")
assert.NoError(t, err)
assert.NotEmpty(t, gpuID)
// 验证分配结果
allocations, err := env.GPUManager.GetGPUAllocations()
assert.NoError(t, err)
assert.Len(t, allocations, 1)
assert.Equal(t, "job-1", allocations[0].JobID)
}
任务状态流转测试
func TestJobStatusTransitions(t *testing.T) {
// 创建初始状态的任务
job := scenarios.NewPyTorchJob("test-job").
WithStatus(k8s.JobPhasePending).
WithCondition("JobPhase", "True", "Pending", "任务等待中").
Build()
// 模拟状态流转
job.Status.Phase = k8s.JobPhaseRunning
job.Status.Conditions = append(job.Status.Conditions, k8s.JobCondition{
Type: "JobPhase",
Status: "True",
Reason: "Running",
Message: "任务运行中",
Time: time.Now(),
})
// 测试状态流转断言
expectedPhases := []k8s.JobPhase{k8s.JobPhasePending, k8s.JobPhaseRunning}
assertions.JobStatusTransition(t, job, expectedPhases)
}
📚 API文档
Kubernetes Mock
// 创建Mock客户端
mockK8s := k8s.NewMockK8sClient()
// Mock任务操作
mockK8s.TrainJobs().(*k8s.MockTrainJobClient).
On("Create", mock.Anything, job).Return(nil)
// Mock Pod操作
mockK8s.Pods().(*k8s.MockPodClient).
On("Get", mock.Anything, "pod-1").Return(pod, nil)
GPU资源管理
// 创建GPU管理器
gpuManager := resource.NewMockGPUManager()
// 添加GPU到管理器
gpu := &resource.GPUInfo{
ID: "gpu-1",
NodeName: "node-1",
Status: "available",
}
gpuManager.AddGPU(gpu)
// 分配GPU
gpuID, err := gpuManager.AllocateGPU("node-1", "job-1", "pod-1")
存储Mock
// 创建存储客户端
storageClient := storage.NewMockStorageClient()
// Mock对象操作
storageClient.On("PutObject", mock.Anything, "bucket", "key", mock.Anything, mock.Anything).
Return(nil)
// 添加测试数据
storageClient.AddObject("bucket", "key", []byte("test data"), nil)
监控Mock
// 创建监控客户端
monitoringClient := monitoring.NewMockMonitoringClient()
// 记录性能指标
err := monitoringClient.Metrics().RecordMetric(context.Background(), &monitoring.Metric{
Name: "gpu_utilization",
Type: monitoring.MetricTypeGauge,
Value: 85.5,
Labels: map[string]string{"gpu_id": "gpu-1", "node": "node-1"},
})
// 创建告警
err = monitoringClient.Alerts().CreateAlert(context.Background(), &monitoring.Alert{
Name: "high_gpu_temp",
Severity: "warning",
Message: "GPU temperature is high",
Labels: map[string]string{"gpu_id": "gpu-1"},
Status: "firing",
})
🤝 贡献指南
我们欢迎所有形式的贡献!请查看贡献指南了解详细信息。
开发环境设置
-
Fork仓库
-
克隆你的Fork
git clone https://github.com/heibot/aiut.git
cd aiut
-
创建功能分支
git checkout -b feature/amazing-feature
-
进行修改
- 为新功能添加测试
- 确保所有测试通过
- 遵循Go代码规范
-
提交修改
git commit -m "feat: 添加新功能"
-
推送到你的Fork
git push origin feature/amazing-feature
-
创建Pull Request
代码规范
- 遵循Go代码审查注释
- 使用
gofmt格式化代码
- 为新功能编写测试
- 根据需要更新文档
📄 许可证
本项目采用MIT许可证 - 详见LICENSE文件。
🙏 致谢
- 感谢所有帮助改进本项目的贡献者
- 受分布式训练平台测试挑战启发
- 基于testify构建Mock能力
📞 支持
为分布式训练社区而构建 ❤️