concurrency

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2026 License: MIT Imports: 4 Imported by: 0

README

concurrency — 并发控制

提供 Worker Pool、信号量、速率限制器、并发限制器等并发原语,简化 Go 中的并发任务管理。

主要组件

WorkerPool — Worker 池

批量并发执行任务,控制最大并发数:

import "github.com/leeforge/framework/concurrency"

pool := concurrency.NewWorkerPool(10) // 10 个 worker
pool.Start()

// 提交任务
pool.Submit(concurrency.JobFunc(func() error {
    // 执行工作
    return nil
}))

// 批量提交
pool.SubmitBatch(jobs)

// 等待所有任务完成后停止
pool.Stop()
Semaphore — 信号量

控制同时进入某段代码的 goroutine 数量:

sem := concurrency.NewSemaphore(5) // 同时最多 5 个

sem.WithSemaphore(func() {
    // 并发安全的关键操作
    callExternalAPI()
})
ConcurrencyLimiter — 并发限制器
limiter := concurrency.NewConcurrencyLimiter(20)

err := limiter.Execute(func() error {
    return processItem(item)
})

// 批量并发执行(自动限制并发数)
errs := limiter.ExecuteBatch([]func() error{fn1, fn2, fn3})
RateLimiter — 令牌桶速率限制器
// 每秒最多 100 个请求
rl := concurrency.NewRateLimiter(100)
defer rl.Stop()

if rl.Allow() {
    // 允许执行
}
// 或阻塞等待令牌
rl.Wait()
ParallelExecutor — 并行执行器
executor := concurrency.NewParallelExecutor(8) // 最多 8 路并行

errs := executor.Execute([]func() error{
    func() error { return processA() },
    func() error { return processB() },
    func() error { return processC() },
})
Future — 异步结果
executor := concurrency.NewAsyncExecutor(4)
executor.Start()
defer executor.Stop()

future := executor.SubmitAsync(job)

// 阻塞获取结果
result := future.Get()

// 带超时获取
result, err := future.GetWithTimeout(5 * time.Second)
TaskQueue — 任务队列
queue := concurrency.NewTaskQueue(100) // 缓冲区 100
queue.Start(4) // 4 个消费者

queue.Submit(func() {
    sendEmail(user)
})

queue.Stop() // 等待队列排空后停止

注意事项

  • WorkerPool.Stop() 有 30 秒超时,若任务长期阻塞请设置合理超时
  • RateLimiter.Wait() 使用轮询,精度约 10ms,高精度场景建议使用 time.Ticker
  • Future 是一次性的,Get 之后不能重复读取

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncExecutor

type AsyncExecutor struct {
	// contains filtered or unexported fields
}

AsyncExecutor 异步执行器

func NewAsyncExecutor

func NewAsyncExecutor(size int) *AsyncExecutor

NewAsyncExecutor 创建异步执行器

func (*AsyncExecutor) Start

func (a *AsyncExecutor) Start()

Start 启动

func (*AsyncExecutor) Stop

func (a *AsyncExecutor) Stop()

Stop 停止

func (*AsyncExecutor) SubmitAsync

func (a *AsyncExecutor) SubmitAsync(job Job) *Future

SubmitAsync 提交异步任务

type ConcurrencyLimiter

type ConcurrencyLimiter struct {
	// contains filtered or unexported fields
}

ConcurrencyLimiter 并发限制器

func NewConcurrencyLimiter

func NewConcurrencyLimiter(maxConcurrent int) *ConcurrencyLimiter

NewConcurrencyLimiter 创建并发限制器

func (*ConcurrencyLimiter) Execute

func (cl *ConcurrencyLimiter) Execute(fn func() error) error

Execute 在并发限制下执行

func (*ConcurrencyLimiter) ExecuteBatch

func (cl *ConcurrencyLimiter) ExecuteBatch(fns []func() error) []error

ExecuteBatch 批量执行

type ContextPool

type ContextPool struct {
	// contains filtered or unexported fields
}

ContextPool 上下文感知的 Worker 池

func NewContextPool

func NewContextPool(size int) *ContextPool

NewContextPool 创建上下文感知的 Worker 池

func (*ContextPool) Cancel

func (p *ContextPool) Cancel()

Cancel 取消所有任务

func (*ContextPool) StopWithContext

func (p *ContextPool) StopWithContext(ctx context.Context) error

StopWithContext 带上下文停止

func (*ContextPool) Submit

func (p *ContextPool) Submit(job Job) error

Submit 提交任务 (支持上下文)

type Future

type Future struct {
	// contains filtered or unexported fields
}

Future 异步结果

func NewFuture

func NewFuture() *Future

NewFuture 创建 Future

func (*Future) Complete

func (f *Future) Complete(result Result)

Complete 完成任务

func (*Future) Get

func (f *Future) Get() Result

Get 获取结果 (阻塞)

func (*Future) GetWithTimeout

func (f *Future) GetWithTimeout(timeout time.Duration) (Result, error)

GetWithTimeout 带超时获取结果

type Job

type Job interface {
	Execute() error
}

Job 任务接口

type JobFunc

type JobFunc func() error

JobFunc 函数式任务

func (JobFunc) Execute

func (f JobFunc) Execute() error

Execute 执行函数

type ParallelExecutor

type ParallelExecutor struct {
	// contains filtered or unexported fields
}

ParallelParallel 并行执行器

func NewParallelExecutor

func NewParallelExecutor(maxWorkers int) *ParallelExecutor

NewParallelExecutor 创建并行执行器

func (*ParallelExecutor) Execute

func (p *ParallelExecutor) Execute(fns []func() error) []error

Execute 并行执行

type PooledExecutor

type PooledExecutor struct {
	// contains filtered or unexported fields
}

PooledExecutor 池化执行器

func NewPooledExecutor

func NewPooledExecutor(size int) *PooledExecutor

NewPooledExecutor 创建池化执行器

func (*PooledExecutor) Execute

func (e *PooledExecutor) Execute(jobs []Job) ([]Result, error)

Execute 执行任务

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter 速率限制器

func NewRateLimiter

func NewRateLimiter(rate int) *RateLimiter

NewRateLimiter 创建速率限制器

func (*RateLimiter) Allow

func (rl *RateLimiter) Allow() bool

Allow 检查是否允许

func (*RateLimiter) Stop

func (rl *RateLimiter) Stop()

Stop 停止

func (*RateLimiter) Wait

func (rl *RateLimiter) Wait()

Wait 等待可用

type Result

type Result interface {
	GetError() error
}

Result 结果接口

type Semaphore

type Semaphore struct {
	// contains filtered or unexported fields
}

Semaphore 信号量

func NewSemaphore

func NewSemaphore(capacity int) *Semaphore

NewSemaphore 创建信号量

func (*Semaphore) Acquire

func (s *Semaphore) Acquire()

Acquire 获取信号量

func (*Semaphore) Release

func (s *Semaphore) Release()

Release 释放信号量

func (*Semaphore) WithSemaphore

func (s *Semaphore) WithSemaphore(fn func())

WithSemaphore 在信号量控制下执行

type SimpleResult

type SimpleResult struct {
	// contains filtered or unexported fields
}

SimpleResult 简单结果

func (*SimpleResult) GetError

func (r *SimpleResult) GetError() error

GetError 获取错误

type TaskQueue

type TaskQueue struct {
	// contains filtered or unexported fields
}

TaskQueue 任务队列

func NewTaskQueue

func NewTaskQueue(bufferSize int) *TaskQueue

NewTaskQueue 创建任务队列

func (*TaskQueue) Start

func (t *TaskQueue) Start(workers int)

Start 启动队列处理

func (*TaskQueue) Stop

func (t *TaskQueue) Stop()

Stop 停止队列

func (*TaskQueue) Submit

func (t *TaskQueue) Submit(task func())

Submit 提交任务

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker 工作协程

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool Worker 池

func NewWorkerPool

func NewWorkerPool(size int) *WorkerPool

NewWorkerPool 创建 Worker 池

func (*WorkerPool) GetResults

func (p *WorkerPool) GetResults() <-chan Result

GetResults 获取结果通道

func (*WorkerPool) Start

func (p *WorkerPool) Start()

Start 启动 Worker 池

func (*WorkerPool) Stop

func (p *WorkerPool) Stop() error

Stop 停止 Worker 池

func (*WorkerPool) Submit

func (p *WorkerPool) Submit(job Job) error

Submit 提交任务

func (*WorkerPool) SubmitBatch

func (p *WorkerPool) SubmitBatch(jobs []Job) error

SubmitBatch 批量提交任务

func (*WorkerPool) Wait

func (p *WorkerPool) Wait()

Wait 等待所有任务完成

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL