errgroup

package
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2023 License: Apache-2.0 Imports: 3 Imported by: 12

README

Package cloudeng.io/sync/errgroup

CircleCI Go Report Card

import cloudeng.io/sync/errgroup

Package errgroup simplifies common patterns of goroutine use, in particular making it straightforward to reliably wait on parallel or pipelined goroutines, exiting either when the first error is encountered or waiting for all goroutines to finish regardless of error outcome. Contexts are used to control cancelation. It is modeled on golang.org/x/sync/errgroup and other similar packages. It makes use of cloudeng.io/errors to simplify collecting multiple errors.

Types

Type T
type T struct {
	// contains filtered or unexported fields
}

T represents a set of goroutines working on some common coordinated sets of tasks.

T may be instantiated directly, in which case, all go routines will run to completion and all errors will be collected and made available vie the Errors field and the return value of Wait. Alternatively WithContext can be used to create Group with an embedded cancel function that will be called once either when the first error occurs or when Wait is called. WithCancel behaves like WithContext but allows both the context and cancel function to be supplied which is required for working with context.WithDeadline and context.WithTimeout.

Functions
func WithCancel(cancel func()) *T

WithCancel returns a new T that will call the supplied cancel function once on either a first non-nil error being returned or when Wait is called.

func WithConcurrency(g *T, n int) *T

WithConcurrency returns a new Group that will limit the number of goroutines to n. Note that the Go method will block when this limit is reached. A value of 0 for n implies no limit on the number of goroutines to use.

func WithContext(ctx context.Context) (*T, context.Context)

WithContext returns a new Group that will call the cancel function derived from the supplied context once on either a first non-nil error being returned by a goroutine or when Wait is called.

Methods
func (g *T) Go(f func() error)

Go runs the supplied function from a goroutine. If this group was created using WithLimit then Go will block until a goroutine is available.

func (g *T) GoContext(ctx context.Context, f func() error)

GoContext is a drop-in alternative to the Go method that checks for ctx.Done() before calling g.Go. If the ctx has been canceled it will return immediately recoding the error and calling the internal stored cancel function.

func (g *T) Wait() error

Wait waits for all goroutines to finish.

Examples

ExampleT
ExampleT_parallel
ExampleT_pipeline

Documentation

Overview

Package errgroup simplifies common patterns of goroutine use, in particular making it straightforward to reliably wait on parallel or pipelined goroutines, exiting either when the first error is encountered or waiting for all goroutines to finish regardless of error outcome. Contexts are used to control cancelation. It is modeled on golang.org/x/sync/errgroup and other similar packages. It makes use of cloudeng.io/errors to simplify collecting multiple errors.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type T

type T struct {
	// contains filtered or unexported fields
}

T represents a set of goroutines working on some common coordinated sets of tasks.

T may be instantiated directly, in which case, all go routines will run to completion and all errors will be collected and made available vie the Errors field and the return value of Wait. Alternatively WithContext can be used to create Group with an embedded cancel function that will be called once either when the first error occurs or when Wait is called. WithCancel behaves like WithContext but allows both the context and cancel function to be supplied which is required for working with context.WithDeadline and context.WithTimeout.

Example
package main

import (
	"fmt"
	"sort"
	"strings"

	"cloudeng.io/errors"
	"cloudeng.io/sync/errgroup"
)

func main() {
	// Wait for all goroutines to finish and catalogue all of their
	// errors.
	var g errgroup.T
	msg := []string{"a", "b", "c"}
	for _, m := range msg {
		m := m
		g.Go(func() error {
			return errors.New(m)
		})
	}
	err := g.Wait()
	if err == nil {
		fmt.Print("no errors - that's an error")
	}
	// Sort the error messages for stable output.
	out := strings.Split(err.Error(), "\n")
	sort.Strings(out)
	fmt.Println(strings.Join(out, "\n"))
}
Output:

--- 1 of 3 errors
  --- 2 of 3 errors
  --- 3 of 3 errors
  a
  b
  c
Example (Parallel)
package main

import (
	"fmt"
	"sort"
	"strings"

	"cloudeng.io/sync/errgroup"
)

func main() {
	// Execute a set of gourtines in parallel.
	var g errgroup.T
	msg := []string{"a", "b", "c"}
	out := make([]string, len(msg))
	for i, m := range msg {
		i, m := i, m
		g.Go(func() error {
			out[i] = m
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		fmt.Printf("failed: %v", err)
	}
	// Sort the error messages for stable output.
	sort.Strings(out)
	fmt.Println(strings.Join(out, "\n"))
}
Output:

a
b
c
Example (Pipeline)
package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"

	"cloudeng.io/errors"
	"cloudeng.io/sync/errgroup"
)

func main() {
	// A pipeline to generate random numbers and measure the uniformity of
	// their distribution. The pipeline runs for 2 seconds.
	// The use of errgroup.T ensures that on return all of the goroutines
	// have completed and the channels used are closed.

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
	g := errgroup.WithCancel(cancel)
	numGenerators, numCounters := 4, 8

	numCh := make(chan int64)
	src := rand.New(rand.NewSource(1234))
	var srcMu sync.Mutex

	// numGenerators goroutines produce random numbers in the range of 0..99.
	for i := 0; i < numGenerators; i++ {
		g.Go(func() error {
			for {
				srcMu.Lock()
				n := src.Int63n(100)
				srcMu.Unlock()
				select {
				case numCh <- n:
				case <-ctx.Done():
					err := ctx.Err()
					if errors.Is(err, context.DeadlineExceeded) {
						return nil
					}
					return err
				default:
					break
				}
			}
		})
	}

	counters := make([]int64, 10)
	var total int64

	// numCounters consume the random numbers and count which decile
	// each one falls into.
	for i := 0; i < numCounters; i++ {
		g.Go(func() error {
			for {
				select {
				case num := <-numCh:
					atomic.AddInt64(&counters[num%10], 1)
					atomic.AddInt64(&total, 1)
				case <-ctx.Done():
					err := ctx.Err()
					if errors.Is(err, context.DeadlineExceeded) {
						return nil
					}
					return err
				}
			}
		})
	}

	go func() {
		if err := g.Wait(); err != nil {
			panic(err)
		}
		close(numCh)
	}()

	if err := g.Wait(); err != nil {
		fmt.Printf("failed: %v", err)
	}
	// After some time, measure the normalized number of random numbers
	// per decile with appropriate rounding. Print the distribution
	// to verify the expected values.
	for i, v := range counters {
		ratio := total / v
		if ratio >= 8 || ratio <= 12 {
			// 8..12 is close enough to an even distribution so round
			// it up to 10.
			ratio = 10
		}
		fmt.Printf("%v: %v\n", i, ratio)
	}
}
Output:

0: 10
1: 10
2: 10
3: 10
4: 10
5: 10
6: 10
7: 10
8: 10
9: 10

func WithCancel

func WithCancel(cancel func()) *T

WithCancel returns a new T that will call the supplied cancel function once on either a first non-nil error being returned or when Wait is called.

Example
package main

import (
	"context"
	"fmt"
	"sort"
	"strings"
	"time"

	"cloudeng.io/sync/errgroup"
)

func main() {
	// Exit all goroutines when a deadline has passed.
	ctx, cancel := context.WithDeadline(context.Background(), time.Now())
	g := errgroup.WithCancel(cancel)
	var msg = []string{"a", "b", "c"}
	for _, m := range msg {
		m := m
		g.Go(func() error {
			ctx.Done()
			// deadline is already past.
			return fmt.Errorf("%v: %w", m, ctx.Err())
		})
	}
	err := g.Wait()
	if err == nil {
		fmt.Print("no errors - that's an error")
	}
	// Sort the error messages for stable output.
	out := strings.Split(err.Error(), "\n")
	sort.Strings(out)
	fmt.Println(strings.Join(out, "\n"))
}
Output:

--- 1 of 3 errors
  --- 2 of 3 errors
  --- 3 of 3 errors
  a: context deadline exceeded
  b: context deadline exceeded
  c: context deadline exceeded

func WithConcurrency added in v0.0.5

func WithConcurrency(g *T, n int) *T

WithConcurrency returns a new Group that will limit the number of goroutines to n. Note that the Go method will block when this limit is reached. A value of 0 for n implies no limit on the number of goroutines to use.

func WithContext

func WithContext(ctx context.Context) (*T, context.Context)

WithContext returns a new Group that will call the cancel function derived from the supplied context once on either a first non-nil error being returned by a goroutine or when Wait is called.

Example
package main

import (
	"context"
	"fmt"
	"sort"
	"strings"

	"cloudeng.io/errors"
	"cloudeng.io/sync/errgroup"
)

func main() {
	// Terminate all remaining goroutines after a single error is encountered.
	g, ctx := errgroup.WithContext(context.Background())
	var msg = []string{"a", "b", "c"}
	for i, m := range msg {
		i, m := i, m
		g.Go(func() error {
			if i == 1 {
				return errors.New("first")
			}
			<-ctx.Done()
			return fmt.Errorf("%v: %w", m, ctx.Err())
		})
	}
	err := g.Wait()
	if err == nil {
		fmt.Print("no errors - that's an error")
	}
	// Sort the error messages for stable output.
	out := strings.Split(err.Error(), "\n")
	sort.Strings(out)
	fmt.Println(strings.Join(out, "\n"))
}
Output:

--- 1 of 3 errors
  --- 2 of 3 errors
  --- 3 of 3 errors
  a: context canceled
  c: context canceled
  first

func (*T) Go

func (g *T) Go(f func() error)

Go runs the supplied function from a goroutine. If this group was created using WithLimit then Go will block until a goroutine is available.

func (*T) GoContext added in v0.0.5

func (g *T) GoContext(ctx context.Context, f func() error)

GoContext is a drop-in alternative to the Go method that checks for ctx.Done() before calling g.Go. If the ctx has been canceled it will return immediately recoding the error and calling the internal stored cancel function.

func (*T) Wait

func (g *T) Wait() error

Wait waits for all goroutines to finish.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL