l8chat-pool

module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2025 License: MIT

README

l8chat-pool

并发工具库,提供协程池、键锁、等待组等并发控制工具。

📦 安装

go get github.com/L8CHAT/l8chat-pool

✨ 功能特性

  • ✅ 协程池(Worker Pool)- 基于 ants
  • ✅ 键锁(Key Lock)- 基于键的互斥锁
  • ✅ 等待组(Wait Group)- 增强的 sync.WaitGroup

📖 使用示例

协程池(Pool)
package main

import (
    "fmt"
    "github.com/L8CHAT/l8chat-pool/pool"
)

func main() {
    // 创建协程池,最多100个协程
    dispatcher := pool.NewDispatcher(100)
    
    // 提交任务
    for i := 0; i < 1000; i++ {
        taskID := i
        dispatcher.Submit(func() {
            fmt.Printf("Task %d is running\n", taskID)
            // 执行任务逻辑
        })
    }
    
    // 等待所有任务完成
    dispatcher.Wait()
    
    // 释放资源
    dispatcher.Release()
}
键锁(KeyLock)
package main

import (
    "fmt"
    "github.com/L8CHAT/l8chat-pool/keylock"
)

func main() {
    kl := keylock.NewKeyLock()
    
    // 锁定特定的键
    key := "user:123"
    kl.Lock(key)
    defer kl.Unlock(key)
    
    // 在锁保护下执行操作
    fmt.Printf("Processing %s\n", key)
    // ... 执行需要互斥的操作
}

使用场景

  • 用户数据更新(避免同一用户的并发修改)
  • 订单处理(避免同一订单的并发操作)
  • 缓存更新(避免同一缓存键的并发写入)
// 示例:用户余额更新
func UpdateUserBalance(userID string, amount float64) error {
    key := fmt.Sprintf("user:balance:%s", userID)
    
    keyLock.Lock(key)
    defer keyLock.Unlock(key)
    
    // 读取当前余额
    balance := getUserBalance(userID)
    
    // 更新余额
    newBalance := balance + amount
    
    // 保存新余额
    return saveUserBalance(userID, newBalance)
}
等待组(Wait)
package main

import (
    "fmt"
    "github.com/L8CHAT/l8chat-pool/wait"
    "time"
)

func main() {
    wg := wait.NewWait()
    
    // 启动多个 goroutine
    for i := 0; i < 10; i++ {
        taskID := i
        wg.Add(1)
        go func() {
            defer wg.Done()
            fmt.Printf("Task %d started\n", taskID)
            time.Sleep(time.Second)
            fmt.Printf("Task %d completed\n", taskID)
        }()
    }
    
    // 等待所有 goroutine 完成
    wg.Wait()
    fmt.Println("All tasks completed")
}

🎯 高级用法

协程池 - 带返回值的任务
type Result struct {
    ID    int
    Value string
    Error error
}

results := make(chan Result, 100)
dispatcher := pool.NewDispatcher(10)

for i := 0; i < 100; i++ {
    taskID := i
    dispatcher.Submit(func() {
        // 执行任务
        value, err := processTask(taskID)
        results <- Result{ID: taskID, Value: value, Error: err}
    })
}

// 等待所有任务完成
dispatcher.Wait()
close(results)

// 收集结果
for result := range results {
    if result.Error != nil {
        fmt.Printf("Task %d failed: %v\n", result.ID, result.Error)
    } else {
        fmt.Printf("Task %d succeeded: %s\n", result.ID, result.Value)
    }
}
键锁 - TryLock
// 尝试获取锁,如果已被锁定则立即返回
if kl.TryLock(key) {
    defer kl.Unlock(key)
    // 执行操作
} else {
    // 锁已被占用,执行其他逻辑
    fmt.Println("Key is locked by another goroutine")
}
等待组 - 超时等待
wg := wait.NewWait()

// 启动任务
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        // 执行任务
    }()
}

// 等待最多 5 秒
done := make(chan struct{})
go func() {
    wg.Wait()
    close(done)
}()

select {
case <-done:
    fmt.Println("All tasks completed")
case <-time.After(5 * time.Second):
    fmt.Println("Timeout: some tasks are still running")
}

📝 性能优化

协程池大小选择
// CPU 密集型任务:协程数 = CPU 核心数
numCPU := runtime.NumCPU()
dispatcher := pool.NewDispatcher(numCPU)

// IO 密集型任务:协程数 = CPU 核心数 * 2 或更多
dispatcher := pool.NewDispatcher(numCPU * 2)

// 混合型任务:根据实际情况调整
dispatcher := pool.NewDispatcher(100)

📝 依赖

  • github.com/panjf2000/ants/v2 - 高性能协程池
  • github.com/eapache/queue - 队列实现

📄 许可证

MIT License

🤝 贡献

欢迎提交 Issue 和 Pull Request!

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL