README

Actions Status release GoDoc license

neilotoole/errgroup

neilotoole/errgroup is a drop-in alternative to Go's wonderful sync/errgroup but limited to N goroutines. This is useful for interaction with rate-limited APIs, databases, and the like.

Overview

In effect, neilotoole/errgroup is sync/errgroup but with a worker pool of N goroutines. The exported API is identical but for an additional function WithContextN, which allows the caller to specify the maximum number of goroutines (numG) and the capacity of the queue channel (qSize) used to hold work before it is picked up by a worker goroutine. The zero Group and the Group returned by WithContext have numG and qSize equal to runtime.NumCPU.

Usage

The exported API of this package mirrors the sync/errgroup package. The only change needed is the import path of the package, from:

import (
  "golang.org/x/sync/errgroup"
)

to

import (
  "github.com/neilotoole/errgroup"
)

Then use in the normal manner. See the godoc for more.

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
    // do something
    return nil
})

err := g.Wait()

Many users will have no need to tweak the numG and qCh params. However, benchmarking may suggest particular values for your workload. For that you'll need WithContextN:

numG, qSize := 8, 4
g, ctx := errgroup.WithContextN(ctx, numG, qSize)

Performance

The motivation for creating neilotoole/errgroup was to provide rate-limiting while maintaining the lovely sync/errgroup semantics. Sacrificing some performance vs sync/errgroup was assumed. However, benchmarking suggests that this implementation can be more effective than sync/errgroup when tuned for a specific workload.

Below is a selection of benchmark results. How to read this: a workload is X tasks of Y complexity. The workload is executed for:

  • sync/errgroup, listed as sync_errgroup
  • a non-parallel implementation (sequential)
  • various {numG, qSize} configurations of neilotoole/errgroup, listed as errgroupn_{numG}_{qSize}
BenchmarkGroup_Short/complexity_5/tasks_50/errgroupn_default_16_16-16         	   25574	     46867 ns/op	     688 B/op	      12 allocs/op
BenchmarkGroup_Short/complexity_5/tasks_50/errgroupn_4_4-16                   	   24908	     48926 ns/op	     592 B/op	      12 allocs/op
BenchmarkGroup_Short/complexity_5/tasks_50/errgroupn_16_4-16                  	   24895	     48313 ns/op	     592 B/op	      12 allocs/op
BenchmarkGroup_Short/complexity_5/tasks_50/errgroupn_32_4-16                  	   24853	     48284 ns/op	     592 B/op	      12 allocs/op
BenchmarkGroup_Short/complexity_5/tasks_50/sync_errgroup-16                   	   18784	     65826 ns/op	    1858 B/op	      55 allocs/op
BenchmarkGroup_Short/complexity_5/tasks_50/sequential-16                      	   10000	    111483 ns/op	       0 B/op	       0 allocs/op

BenchmarkGroup_Short/complexity_20/tasks_50/errgroupn_default_16_16-16        	    3745	    325993 ns/op	    1168 B/op	      27 allocs/op
BenchmarkGroup_Short/complexity_20/tasks_50/errgroupn_4_4-16                  	    5186	    227034 ns/op	    1072 B/op	      27 allocs/op
BenchmarkGroup_Short/complexity_20/tasks_50/errgroupn_16_4-16                 	    3970	    312816 ns/op	    1076 B/op	      27 allocs/op
BenchmarkGroup_Short/complexity_20/tasks_50/errgroupn_32_4-16                 	    3715	    320757 ns/op	    1073 B/op	      27 allocs/op
BenchmarkGroup_Short/complexity_20/tasks_50/sync_errgroup-16                  	    2739	    432093 ns/op	    1862 B/op	      55 allocs/op
BenchmarkGroup_Short/complexity_20/tasks_50/sequential-16                     	    2306	    520947 ns/op	       0 B/op	       0 allocs/op

BenchmarkGroup_Short/complexity_40/tasks_250/errgroupn_default_16_16-16       	     354	   3602666 ns/op	    1822 B/op	      47 allocs/op
BenchmarkGroup_Short/complexity_40/tasks_250/errgroupn_4_4-16                 	     420	   2468605 ns/op	    1712 B/op	      47 allocs/op
BenchmarkGroup_Short/complexity_40/tasks_250/errgroupn_16_4-16                	     334	   3581349 ns/op	    1716 B/op	      47 allocs/op
BenchmarkGroup_Short/complexity_40/tasks_250/errgroupn_32_4-16                	     310	   3890316 ns/op	    1712 B/op	      47 allocs/op
BenchmarkGroup_Short/complexity_40/tasks_250/sync_errgroup-16                 	     253	   4740462 ns/op	    8303 B/op	     255 allocs/op
BenchmarkGroup_Short/complexity_40/tasks_250/sequential-16                    	     200	   5924693 ns/op	       0 B/op	       0 allocs/op

The overall impression is that neilotoole/errgroup can provide higher throughput than sync/errgroup for these (CPU-intensive) workloads, sometimes significantly so. As always, these benchmark results should not be taken as gospel: your results may vary.

Design Note

Why require an explicit qSize limit?

If the number of calls to Group.Go results in qCh becoming full, the Go method will block until worker goroutines relieve qCh. This behavior is in contrast to sync/errgroup's Go method, which doesn't block. While neilotoole/errgroup aims to be as much of a behaviorally similar "drop-in" alternative to sync/errgroup as possible, this blocking behavior is a conscious deviation.

Noting that the capacity of qCh is controlled by qSize, it's probable an alternative implementation could be built that uses a (growable) slice acting - if qCh is full - as a buffer for functions passed to Go. Consideration of this potential design led to this issue regarding unlimited capacity channels, or perhaps better characterized in this particular case as "growable capacity channels". If such a feature existed in the language, it's possible that this implementation might have taken advantage of it, at least in the first-pass release (benchmarking notwithstanding). However benchmarking seems to suggest that a relatively small qSize has performance benefits for some workloads, so it's possible that the explicit qSize requirement is a better design choice regardless.

Documentation

Overview

Package errgroup is a drop-in alternative to sync/errgroup but limited to N goroutines. In effect, neilotoole/errgroup is sync/errgroup but with a worker pool of N goroutines.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Group

type Group struct {
	// contains filtered or unexported fields
}

A Group is a collection of goroutines working on subtasks that are part of the same overall task.

A zero Group is valid and does not cancel on error.

This Group implementation differs from sync/errgroup in that instead of each call to Go spawning a new Go routine, the f passed to Go is sent to a queue channel (qCh), and is picked up by one of N worker goroutines. The number of goroutines (numG) and the queue channel size (qSize) are args to WithContextN. The zero Group and the Group returned by WithContext both use default values (the value of runtime.NumCPU) for the numG and qSize args. A side-effect of this implementation is that the Go method will block while qCh is full: in contrast, errgroup.Group's Go method never blocks (it always spawns a new goroutine).

Example (JustErrors)

JustErrors illustrates the use of a Group in place of a sync.WaitGroup to simplify goroutine counting and error handling. This example is derived from the sync.WaitGroup example at https://golang.org/pkg/sync/#example_WaitGroup.

package main

import (
	"fmt"
	"net/http"

	"github.com/neilotoole/errgroup"
)

func main() {
	var g errgroup.Group
	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
		"http://www.somestupidname.com/",
	}
	for _, url := range urls {
		// Launch a goroutine to fetch the URL.
		url := url // https://golang.org/doc/faq#closures_and_goroutines
		g.Go(func() error {
			// Fetch the URL.
			resp, err := http.Get(url)
			if err == nil {
				resp.Body.Close()
			}
			return err
		})
	}
	// Wait for all HTTP fetches to complete.
	if err := g.Wait(); err == nil {
		fmt.Println("Successfully fetched all URLs.")
	}
}
Output:

Example (Parallel)

Parallel illustrates the use of a Group for synchronizing a simple parallel task: the "Google Search 2.0" function from https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context and error-handling.

package main

import (
	"context"
	"fmt"
	"os"

	"github.com/neilotoole/errgroup"
)

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)

type Result string
type Search func(ctx context.Context, query string) (Result, error)

func fakeSearch(kind string) Search {
	return func(_ context.Context, query string) (Result, error) {
		return Result(fmt.Sprintf("%s result for %q", kind, query)), nil
	}
}

func main() {
	Google := func(ctx context.Context, query string) ([]Result, error) {
		g, ctx := errgroup.WithContext(ctx)

		searches := []Search{Web, Image, Video}
		results := make([]Result, len(searches))
		for i, search := range searches {
			i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines
			g.Go(func() error {
				result, err := search(ctx, query)
				if err == nil {
					results[i] = result
				}
				return err
			})
		}
		if err := g.Wait(); err != nil {
			return nil, err
		}
		return results, nil
	}

	results, err := Google(context.Background(), "golang")
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}
	for _, result := range results {
		fmt.Println(result)
	}

}
Output:

web result for "golang"
image result for "golang"
video result for "golang"
Example (Pipeline)

Pipeline demonstrates the use of a Group to implement a multi-stage pipeline: a version of the MD5All function with bounded parallelism from https://blog.golang.org/pipelines.

// Note: This file is copied directly from sync/errgroup
// with the one-line change that pkg neilotoole/errgroup is imported
// as errgroup. The purpose is to test if neilotoole/errgroup can be
// characterized as a "drop-in" replacement for sync/errgroup, by
// seamlessly passing all of sync/errgroup's tests.

package main

import (
	"context"
	"crypto/md5"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"

	"github.com/neilotoole/errgroup"
)

// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func main() {
	m, err := MD5All(context.Background(), ".")
	if err != nil {
		log.Fatal(err)
	}

	for k, sum := range m {
		fmt.Printf("%s:\t%x\n", k, sum)
	}
}

type result struct {
	path string
	sum  [md5.Size]byte
}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
	// ctx is canceled when g.Wait() returns. When this version of MD5All returns
	// - even in case of error! - we know that all of the goroutines have finished
	// and the memory they were using can be garbage-collected.
	g, ctx := errgroup.WithContext(ctx)
	paths := make(chan string)

	g.Go(func() error {
		defer close(paths)
		return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			select {
			case paths <- path:
			case <-ctx.Done():
				return ctx.Err()
			}
			return nil
		})
	})

	// Start a fixed number of goroutines to read and digest files.
	c := make(chan result)
	const numDigesters = 20
	for i := 0; i < numDigesters; i++ {
		g.Go(func() error {
			for path := range paths {
				data, err := ioutil.ReadFile(path)
				if err != nil {
					return err
				}
				select {
				case c <- result{path, md5.Sum(data)}:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return nil
		})
	}
	go func() {
		g.Wait()
		close(c)
	}()

	m := make(map[string][md5.Size]byte)
	for r := range c {
		m[r.path] = r.sum
	}
	// Check whether any of the goroutines failed. Since g is accumulating the
	// errors, we don't need to send them (or check for them) in the individual
	// results sent on the channel.
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return m, nil
}
Output:

func WithContext

func WithContext(ctx context.Context) (*Group, context.Context)

WithContext returns a new Group and an associated Context derived from ctx. It is equivalent to WithContextN(ctx, 0, 0).

func WithContextN

func WithContextN(ctx context.Context, numG, qSize int) (*Group, context.Context)

WithContextN returns a new Group and an associated Context derived from ctx.

The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first time Wait returns, whichever occurs first.

Param numG controls the number of worker goroutines. Param qSize controls the size of the queue channel that holds functions passed to method Go: while the queue channel is full, Go blocks. If numG <= 0, the value of runtime.NumCPU is used; if qSize is also <= 0, a qSize of runtime.NumCPU is used.

func (*Group) Go

func (g *Group) Go(f func() error)

Go adds the given function to a queue of functions that are called by one of g's worker goroutines.

The first call to return a non-nil error cancels the group; its error will be returned by Wait.

Go may block while g's qCh is full.

func (*Group) Wait

func (g *Group) Wait() error

Wait blocks until all function calls from the Go method have returned, then returns the first non-nil error (if any) from them.

Source Files