l8chat-pool
并发工具库,提供协程池、键锁、等待组等并发控制工具。
📦 安装
go get github.com/L8CHAT/l8chat-pool
✨ 功能特性
- ✅ 协程池(Worker Pool)- 基于 ants
- ✅ 键锁(Key Lock)- 基于键的互斥锁
- ✅ 等待组(Wait Group)- 增强的 sync.WaitGroup
📖 使用示例
协程池(Pool)
package main
import (
"fmt"
"github.com/L8CHAT/l8chat-pool/pool"
)
func main() {
// 创建协程池,最多100个协程
dispatcher := pool.NewDispatcher(100)
// 提交任务
for i := 0; i < 1000; i++ {
taskID := i
dispatcher.Submit(func() {
fmt.Printf("Task %d is running\n", taskID)
// 执行任务逻辑
})
}
// 等待所有任务完成
dispatcher.Wait()
// 释放资源
dispatcher.Release()
}
键锁(KeyLock)
package main
import (
"fmt"
"github.com/L8CHAT/l8chat-pool/keylock"
)
func main() {
kl := keylock.NewKeyLock()
// 锁定特定的键
key := "user:123"
kl.Lock(key)
defer kl.Unlock(key)
// 在锁保护下执行操作
fmt.Printf("Processing %s\n", key)
// ... 执行需要互斥的操作
}
使用场景:
- 用户数据更新(避免同一用户的并发修改)
- 订单处理(避免同一订单的并发操作)
- 缓存更新(避免同一缓存键的并发写入)
// 示例:用户余额更新
func UpdateUserBalance(userID string, amount float64) error {
key := fmt.Sprintf("user:balance:%s", userID)
keyLock.Lock(key)
defer keyLock.Unlock(key)
// 读取当前余额
balance := getUserBalance(userID)
// 更新余额
newBalance := balance + amount
// 保存新余额
return saveUserBalance(userID, newBalance)
}
等待组(Wait)
package main
import (
"fmt"
"github.com/L8CHAT/l8chat-pool/wait"
"time"
)
func main() {
wg := wait.NewWait()
// 启动多个 goroutine
for i := 0; i < 10; i++ {
taskID := i
wg.Add(1)
go func() {
defer wg.Done()
fmt.Printf("Task %d started\n", taskID)
time.Sleep(time.Second)
fmt.Printf("Task %d completed\n", taskID)
}()
}
// 等待所有 goroutine 完成
wg.Wait()
fmt.Println("All tasks completed")
}
🎯 高级用法
协程池 - 带返回值的任务
type Result struct {
ID int
Value string
Error error
}
results := make(chan Result, 100)
dispatcher := pool.NewDispatcher(10)
for i := 0; i < 100; i++ {
taskID := i
dispatcher.Submit(func() {
// 执行任务
value, err := processTask(taskID)
results <- Result{ID: taskID, Value: value, Error: err}
})
}
// 等待所有任务完成
dispatcher.Wait()
close(results)
// 收集结果
for result := range results {
if result.Error != nil {
fmt.Printf("Task %d failed: %v\n", result.ID, result.Error)
} else {
fmt.Printf("Task %d succeeded: %s\n", result.ID, result.Value)
}
}
键锁 - TryLock
// 尝试获取锁,如果已被锁定则立即返回
if kl.TryLock(key) {
defer kl.Unlock(key)
// 执行操作
} else {
// 锁已被占用,执行其他逻辑
fmt.Println("Key is locked by another goroutine")
}
等待组 - 超时等待
wg := wait.NewWait()
// 启动任务
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 执行任务
}()
}
// 等待最多 5 秒
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
fmt.Println("All tasks completed")
case <-time.After(5 * time.Second):
fmt.Println("Timeout: some tasks are still running")
}
📝 性能优化
协程池大小选择
// CPU 密集型任务:协程数 = CPU 核心数
numCPU := runtime.NumCPU()
dispatcher := pool.NewDispatcher(numCPU)
// IO 密集型任务:协程数 = CPU 核心数 * 2 或更多
dispatcher := pool.NewDispatcher(numCPU * 2)
// 混合型任务:根据实际情况调整
dispatcher := pool.NewDispatcher(100)
📝 依赖
github.com/panjf2000/ants/v2 - 高性能协程池
github.com/eapache/queue - 队列实现
📄 许可证
MIT License
🤝 贡献
欢迎提交 Issue 和 Pull Request!