routine

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

README

并发控制

在日常开发中,我们会遇到大量相似的并发场景,比如等待一个事件发生,或者要并行调用多个不同的服务。Golang提供了非常高效的内置语法来解决这些问题,channel,goroutine。但是, 如何有效地、正确地使用这些“齿轮”构造出没有高效正确的代码,对于大多数CMer来说仍然是一个挑战。这个库就是将这些典型场景归纳起来,提供特定场景的通用解决方案,最终达到更少的代码实现更高效地逻辑控制。

常见场景

并发执行

有多个互相独立的,无互相依赖的任务需要快速执行。根据对于任务的处理结果,又能细分几种情景:

  • 需要所有的任务都被至少执行一次,并且等待最后一个任务结束。errgroup.Group
  • 假如有任务执行出错,则停止其他任务,无论正在执行还是在队列中等待。errgroup.WithContext, routine.ConcurrentTask
  • 有大量相似的计算任务需要执行,但是任务量远大于CPU的核数。为了最有效执行计算任务,通过限制并发的最大worker数量,限制任务切换、调度,来获取高吞吐量和低延时。routine.Map, routine.ProcessStream
批量处理

对于数据库、第三方API,往往提供批量处理的能力。如何将数据打包起来一起批处理,得到处理结果之后再逐一处理结果是个很常见的通用需求。routine.BulkEmitter

Map

在日常操作中,有大量如此操作:枚举一个数组,并行对每个元素做某种增删改除。当增删改除涉及到不稳定或者长时间的操作(数据库、调用其他服务),就会容易出现整体性能的下降和不可控。 对于这种场景,最常见的写法就是每个枚举元素开一个独立的goroutine去处理。但是这种做法容易造成超高的并发导致拥堵,最终反而降低了整体效率。

这个新增的routine库就是专门解决这个问题。使用Map或者更好一点的MapWithTimeout,可以控制最高并发数量numWorker,和每个子任务允许的操作时间timeout。一个具体例子长这样:

itemIDs := []int{1, 2, 3}
items := make([]*Item, len(itemIDs))
err := routine.MapWithTimeout(len(items), 5, time.Second, func(ctx context.Context, i int) {
    item := getItemByID(itemIDs[i])
    if ctx.Err() == nil {
        items[i] = item
    }
})

使用过程中回调函数的处理数据结果有些特殊的小技巧:

  1. 当需要处理的返回数据是放在一个slice里的话,可以直接make一个固定大小的slice,并且通过回调函数里的下标i,直接对slice内数据赋值(如上述例子),这个时候不需要加锁。
  2. 如果处理的时候会因为例如出错导致返回数据缺失,那么可以考虑用一个channel来返回数据,或者将slice留空值,然后使用slices.Compact将空值去掉。

BulkEmitter

当遇到某些操作需要批量集中起来一起执行,比如调用数据库的Batch API,但是请求方来自于不同的goroutine,这就需要有一个收集请求,并在某个时间统一处理,并且最后能将结果重新分派回对应请求者的机制。BulkEmitter就是为了简化这个操作而提出来的工具类。

具体用法分为两块,BulkEmitterHandlerDo()

使用者需要给出批量请求的处理函数,

type BulkEmitterHandler[T, K any] func(reqs map[string]T) (resps map[string]K, error)

集中起来的请求会放在reqs参数中,处理完成的结果需要返回resps。如果其中某些请求出错,可以单独将对应的错误在使用BulkError类型从error中返回。

一个具体例子如下,Upload()是一个服务器的API,用户每次调用都会创建一个新的goroutine。我们可以定义一个BatchUpload()的函数配合BulkEmitter可以清楚简单地实现批量操作:

func BatchUpload(reqs map[string]*SingleReq) (map[string]*SingleResp, error) {
    batchReq := &BatchReqs{}
    for _, req := range reqs {
        batchReq.Reqs = append(batchReq.Reqs, req)
    }
    resps, err := client.Do(ctx.Background(), batchReq)
    if err != nil {
        return nil, err
    }
    ret := make(map[string]*SingleResp)
    err = routine.BulkError{}
    for _, resp := range resps {
        if !resp.GetFound() {
            err[resp.GetId()] = errors.New("Not found")
            continue
        }
        ret[resp.GetId()] = resp
    }
    return ret, err
}

func NewServer() *server {
    return &server{
        UploadEmitter: routine.NewBulkUploadEmitter(
            BatchUpload,
            routine.WithInterval[*SingleReq, *SingleResp](time.MilliSecond * 50),
        ),
    }
}

func (s server) Upload(ctx context.Context, in *SingleReq) (*SingleResp, error) {
    return s.UploadEmitter.Do(ctx, in.GetId(), in)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConcurrentTasksWithCtxTimeout

func ConcurrentTasksWithCtxTimeout(baseCtx context.Context, numWorker int, tasks []*ConcurrentTask) error

ConcurrentTasksWithCtxTimeout process task functions concurrently

func Map

func Map(size, numWorker int, f func(i int) error) error

Map is a helper function to parallel operate on slice. Example usage: ```

itemIDs := []int{1, 2, 3}
items := make([]*Item, len(itemIDs))
routine.Map(len(items), 10, func(i int) {
  items[i] = getItemByID(itemIDs[i])
})

```

func MapWithContext

func MapWithContext(ctx context.Context, size, numWorker int, f func(ctx context.Context, i int) error) error

func MapWithCtxTimeout

func MapWithCtxTimeout(baseCtx context.Context, size, numWorker int, timeout time.Duration, f func(ctx context.Context, i int) error) error

func MapWithTimeout

func MapWithTimeout(size, numWorker int, timeout time.Duration, f func(ctx context.Context, i int) error) error

MapWithTimeout is the same as Map, except each function need to be executed within timeout. Important reminder to handle the context provided: It is better to check the `ctx.Err() == nil` before any write operation, because the function may be canceled at any momement, and the underlying resource refering may have already been discarded.

func ProcessStream

func ProcessStream[T any, K comparable](ctx context.Context, numWorker int, inputChan <-chan T, outputChan chan<- K, fn ProcessFunc[T, K]) error

ProcessStream provides a way to process stream data in multiple go routine, while preserving the ordinal order unchanged. The invoker just need to provide a process func to handle the concrete processing logic. If the func returns nil value or non-nil error, the value will be ignored. The parameter `inputChan` and `outputChan` accepts any kind of channels. Notice: The process func should honor the input context, otherwise it will result in routine leaking.

func RunTasks

func RunTasks(ctx context.Context, fs ...func(ctx context.Context) error) (err error)

func Select

func Select[T any](ctx context.Context, src []<-chan T) <-chan *ValueType[T]

Select function helps caller reading data from multiple channel on limit time, and caller can know result came from which channel by mapping ValueType.Index to input channels Example:

sources := make([]chan int32, 10)
output := routine.Select(ctx, sources)
for v := range output {
 fmt.Println(v.Index, v.Value)
}

func SuggestNum

func SuggestNum(i int) int

SuggestNum 基于当前运行条件,适合并发的 goroutine 数量. 目前的评估算法是:需要处理的任务数量与 CPU 核数*2 相比较,选择较小者.

func WithSignal

func WithSignal(ctx context.Context) context.Context

Types

type BulkEmitter

type BulkEmitter[T, K any] struct {
	// contains filtered or unexported fields
}

BulkEmitter is used to stack multiple requests to do batch requests. It's mostly used to invoke batch API of database or service.

func NewBulkEmitter

func NewBulkEmitter[T, K any](handler BulkEmitterHandler[T, K], opts ...BulkEmitterOption[T, K]) *BulkEmitter[T, K]

NewBulkEmitter returns a BulkEmitter object.

func (*BulkEmitter[T, K]) Close

func (be *BulkEmitter[T, K]) Close()

Close release the resource used by emitter. Panic when called more than once.

func (*BulkEmitter[T, K]) CloseAndWait

func (be *BulkEmitter[T, K]) CloseAndWait()

CloseAndWait release the resource used by emitter. Panic when called more than once. Wait until the the emitter finishes.

func (*BulkEmitter[T, K]) Commit

func (be *BulkEmitter[T, K]) Commit(ctx context.Context, key string, request T, handler func(reply K, err error)) error

Commit submits the individual request. Rather than waiting until the whole action is completed, it returns intermediately after the request has been accepted by the bulk emitter, with the contraint of the processor limit. Key will be used to identify and deduplicate the requests. The response will be handled in argument `handler` after the reply has returned. If the handler is nil, the reply and error will be ignored.

func (*BulkEmitter[T, K]) Do

func (be *BulkEmitter[T, K]) Do(ctx context.Context, key string, request T) (K, error)

Do submits the individual request. It will wait until the action is completed. Key will be used to identify and deduplicate the requests. The response will be the same for the request with same key.

WARNING: It's dangerous to invoke the Do() method in an unconstraint go routine, where routine leaking is inevitable. When in such case, it's strongly suggested to use Commit() instead of Do().

func (*BulkEmitter[T, K]) Run

func (be *BulkEmitter[T, K]) Run(ctx context.Context) error

Run starts emitter looping logic. Must be called once.

type BulkEmitterHandler

type BulkEmitterHandler[T, K any] func(requests map[string]T) (replies map[string]K, err error)

BulkEmitterHandler is callback to invoke when a batch of requests are grouped together, and desires actual action.

type BulkEmitterOption

type BulkEmitterOption[T, K any] func(be *BulkEmitter[T, K])

BulkEmitterOption helps set variables of BulkEmitter.

func WithInterval

func WithInterval[T, K any](interval time.Duration) BulkEmitterOption[T, K]

WithInterval sets emitting interval.

func WithMaxSize

func WithMaxSize[T, K any](size int) BulkEmitterOption[T, K]

WithMaxSize sets max size of a bulk.

func WithWorker

func WithWorker[T, K any](num int) BulkEmitterOption[T, K]

WithWorker sets the concurrent worker to coexists.

type BulkError

type BulkError map[string]error

BulkError can be returned in `BulkEmitterHandler` to indicate error for each individual response.

func (BulkError) Error

func (e BulkError) Error() string

Error returns error msg.

type ConcurrentTask

type ConcurrentTask struct {
	F                 func(ctx context.Context) error
	WithBlockSignal   bool
	IgnoreBlockSignal bool
	Timeout           time.Duration
}

ConcurrentTask define functions as tasks. F指的是具体的可以并行执行的任务; WithBlockSignal表示当前任务如果出错,是否发出一个block信号,去尝试阻止任务集群内更多的任务继续执行。(已经开始的任务要看任务内部是否处理相关信号。) IgnoreBlockSignal表示当前任务不关心block信号,仅会考虑超时的影响 Timeout表示每个任务自己的超时时间

type ConcurrentTaskRunner

type ConcurrentTaskRunner struct {
	// contains filtered or unexported fields
}

func NewConcurrentTaskRunner

func NewConcurrentTaskRunner(ctx context.Context, opts ...RunnerOption) *ConcurrentTaskRunner

func (*ConcurrentTaskRunner) AddTasks

func (c *ConcurrentTaskRunner) AddTasks(fs ...func(ctx context.Context) error) *ConcurrentTaskRunner

func (*ConcurrentTaskRunner) Run

func (c *ConcurrentTaskRunner) Run() (err error)

type ProcessFunc

type ProcessFunc[T any, K comparable] func(context.Context, T) (K, error)

type RunnerOption

type RunnerOption func(c *ConcurrentTaskRunner)

func WithNumOfWorker

func WithNumOfWorker(numOfWorker int) RunnerOption

func WithTimeout

func WithTimeout(duration time.Duration) RunnerOption

type Safe

type Safe struct {
	// contains filtered or unexported fields
}

Safe aids user access fields in struct in concurrent way. The returned error from given function will be passthrough. Suggested usage: ```

type MyClass struct {
  routine.Safe
  ... Other fields
}
err := myclass.View(func() error {
  got := myclass.somefield

  ... DO NOT invoke other method of the same instance,
      otherwise it may occurs deadlock by accident.
})

```

func (*Safe) Update

func (s *Safe) Update(fn func() error) error

func (*Safe) View

func (s *Safe) View(fn func() error) error

type ValueType

type ValueType[T any] struct {
	Value T
	Index int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL