parallel

package module
v0.0.0-...-9d5b0af Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2022 License: MIT Imports: 6 Imported by: 0

README

See godoc.

Allows easy parallelization of a known number of functions and waiting for them to finish.The examples show how you can return results from goroutines without having to use channels or locking. The general idea is to pre-reserve a slice for them and have each goroutine write into its own index in the slice. This means that you're possibly using more memory, but depending on how many things you're actually doing in parallel and what sort of results you're returning, this method will actually perform better than more complex solutions.

	 	// reserve enough space for the results
		results := make([]resultType, len(input))

		// this is the function we want to parallelize.
		// idx will go from 0 to len(input) (see Do call below)
		fn := Func(func(idx int) error {
			// no mutex needed because every goroutine gets a unique 
			//idx, so writes never overlap
			results[idx], err := doSomethingCPUIntensiveTo(input[idx])
			return err
		})

		// run fn in parallel len(input) times, with at most 
		// GOMAXPROCS at a time.
		err := fn.Do(len(input))

		for i, r := range results {
			// do something to all the results
		}

Documentation

Overview

Package parallel provides tooling for running a bounded number of functions in parallel and waiting for them to finish.

Returning results from goroutines

The Func type's examples show how you can return results from goroutines without having to use channels or locking. The general idea is to pre-reserve a slice for them and have each goroutine write into its own index in the slice. This means that you're possibly using more memory, but depending on how many things you're actually doing in parallel and what sort of results you're returning, this method will actually perform better than more complex solutions.

// reserve enough space for the results
results := make([]resultType, len(input))

// this is the function we want to parallelize.
// idx will go from 0 to len(input) (see Do call below)
fn := Func(func(idx int) error {
	// no mutex needed because every goroutine gets a unique
	// idx, so writes never overlap
	results[idx], err := doSomethingCPUIntensiveTo(input[idx])

	return err
})

// run fn in parallel len(input) times, with at most
// GOMAXPROCS at a time.
err := fn.Do(len(input))

for i, r := range results {
	// do something to all the results
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Errors

type Errors struct {
	// contains filtered or unexported fields
}

Errors is for gathering errors in a thread-safe manner. The zero value is usable.

Lock-free. If no errors are added, Errors uses a pointer's worth of memory.

func (*Errors) Add

func (p *Errors) Add(err error)

Add err to p. Thread-safe.

func (Errors) Err

func (p Errors) Err() error

Err returns a multierror (go.uber.org/multierr) of errors added to p, or nil if none were added. NOTE: not thread-safe.

func (Errors) List

func (p Errors) List() []error

List returns errors added to p, if any. NOTE: not thread-safe.

type Func

type Func func(idx int) error

Func is a parallelizable function. Each function running in parallel with others is given a unique index number, starting from 0

Example
package main

import (
	"fmt"
	"net/http"
	"net/url"
	"strings"

	"golang.org/x/net/html"
)

func main() {
	// get all the links in the Go blog, then in all the pages those
	// pages link to, up to 3 levels deep.
	links := Scrape(3, "https://blog.golang.org/examples")

	fmt.Printf("links %d: %v", len(links), links)
}

// HTMLLinks is a more real-world example of how Func can be used. It
// fetches the URLs given in parallel and returns all the unique links
// found.
func HTMLLinks(urls ...string) (links StringSet, err error) {
	nSites := len(urls)

	// we want to get a list of links from each of the sites in urls.
	//
	// Reserve space for each of their link sets up front. This
	// way each goroutine has its own "slot" in results that it can
	// write to without worrying about concurrent access.
	//
	// This trades off memory for ease of implementation and
	// simplicity.
	results := make([]StringSet, nSites)

	// We want to iterate over urls in parallel, fetching the HTML and
	// parsing the links from each.
	//
	// This is the func that does the work. idx will go from 0 to
	// nSites
	fn := Func(func(idx int) error {
		// reading urls is OK since nobody's modifying it concurrently
		r, err := http.Get(urls[idx])
		if err != nil {
			return err
		}
		defer r.Body.Close()
		doc, err := html.Parse(r.Body)
		if err != nil {
			return err
		}

		// writing to results[idx] without synchronization is OK since
		// this is the only goroutine that's writing to that location
		results[idx] = parseLinks(doc, results[idx])
		return nil
	})

	// run fn a total of nSites times. Run at most 5 concurrently.
	// Blocks until all are finished, and returns a multierror
	// (go.uber.org/multierr) of all the errors returned
	err = fn.Do(nSites, 5)

	// return unique links only
	var resultSet StringSet
	for _, res := range results {
		resultSet.AddSet(res)
	}
	return resultSet, err
}

func Scrape(maxDepth int, urls ...string) (links StringSet) {
	if maxDepth == 0 {
		return
	}
	// even if there was an error we might still have gotten some
	// links
	links, _ = HTMLLinks(urls...)
	// remove the original urls from found links
	links.Remove(urls...)

	// scrape the links we got
	more := Scrape(maxDepth-1, links.List()...)

	return links.AddSet(more)
}

// note: everything below here is just utilities

func parseLinks(n *html.Node, found StringSet) StringSet {
	if n.Type == html.ElementNode && n.Data == "a" {
		for _, a := range n.Attr {
			if a.Key == "href" && strings.HasPrefix(a.Val,
				"http:") || strings.HasPrefix(a.Val, "https:") {
				u, err := url.Parse(a.Val)
				if err != nil {
					break
				}
				u.RawQuery = ""
				u.RawFragment = ""
				u.Fragment = ""
				found.Add(u.String())
				break
			}
		}
	}
	for c := n.FirstChild; c != nil; c = c.NextSibling {
		found = parseLinks(c, found)
	}

	return found
}

type StringSet map[string]struct{}

func (s *StringSet) Add(strs ...string) *StringSet {
	if len(*s) == 0 {
		*s = make(StringSet, len(strs))
	}

	for _, h := range strs {
		(*s)[h] = struct{}{}
	}

	return s
}

func (s *StringSet) AddSet(strs StringSet) StringSet {
	if len(*s) == 0 {
		*s = make(StringSet, len(strs))
	}

	for v := range strs {
		(*s)[v] = struct{}{}
	}

	return *s
}

func (s *StringSet) Remove(vs ...string) StringSet {
	for _, v := range vs {
		delete(*s, v)
	}
	return *s
}

func (s *StringSet) Sub(vs StringSet) StringSet {
	for v := range vs {
		delete(*s, v)
	}
	return *s
}

func (s StringSet) List() (vs []string) {
	count := len(s)
	if count == 0 {
		return
	}
	vs = make([]string, 0, count)
	for handle := range s {
		vs = append(vs, handle)
	}
	return
}

func (s StringSet) String() string {
	var (
		sb        strings.Builder
		firstDone bool
	)
	sb.WriteByte('[')
	for str := range s {
		if firstDone {
			sb.WriteString(", ")
		} else {
			firstDone = true
		}
		sb.WriteByte('"')
		sb.WriteString(str)
		sb.WriteByte('"')
	}
	sb.WriteByte(']')
	return sb.String()
}
Output:

Example (Simple)
// we want to transform this list in parallel
input := []int{10, 20, 30, 40, 50}
count := len(input)

// reserve enough space for the results
results := make([]int, len(input))

// this is the function we'll run.
// idx goes from 0 to count.
fn := Func(func(idx int) error {
	// no mutex needed because every goroutine gets a unique idx,
	// so writes never overlap
	results[idx] = input[idx] * 10
	return nil
})

// run fn count times in parallel. Run at most 5 at a time
// (concurrently).
if err := fn.Do(count, 5); err != nil {
	panic(err)
}

fmt.Printf("%#v\n", results)
Output:

[]int{100, 200, 300, 400, 500}

func (Func) Do

func (fn Func) Do(count int, maxParallel ...int) (err error)

Do runs fn count times and in parallel. All are run even if one or more returns an error. The idx parameter for fn goes from 0 to count.

If a maxParallel is given, only that many are run concurrently. Defaults to GOMAXPROCS.

type Funcs

type Funcs []Func

Funcs allows running multiple different Funcs in parallel. Use Add to add a Func to be run, and Do to run them. The zero value of Funcs is usable

Example
// zero value is usable
var ops Funcs

// just here as an example to show that both funcs run
op1Ran := false
ops.Add(func(idx int) error {
	op1Ran = true
	// ... do some work here
	return nil
})

op2Ran := false
ops.Add(func(idx int) error {
	op2Ran = true
	// ... do some work here
	return nil
})

// run the operations we added. Runs at most GOMAXPROCS in
// parallel
if err := ops.Do(); err != nil {
	panic(err)
}

fmt.Println("done! Ops ran:", op1Ran, op2Ran)
Output:

done! Ops ran: true true

func (*Funcs) Add

func (fns *Funcs) Add(fn Func)

Add a new Func to fns

func (Funcs) Do

func (fns Funcs) Do(maxParallel ...int) error

Do runs all the Funcs in fns. Each Func gets its index number in fns as an argument.

Do is not idempotent: all Funcs run on each call.

If a maxParallel is given, only that many Funcs execure concurrently. Defaults to GOMAXPROCS.

type Semaphore

type Semaphore chan struct{}

Semaphore is a counting semaphore. It allows limiting concurrency.

s := NewSemaphore(10)
// we want only 10 of these to run in parallel
fn := go func() {
	// Calls s.AcquireRelease() which acquires the
	// semaphore, and then defers the release
	// func returned by AcquireRelease.
	defer s.AcquireRelease()()
	// ... do stuff ...
}
// start a bunch of fn

func NewSemaphore

func NewSemaphore(max int) Semaphore

NewSemaphore creates a new Semaphore with a maximum count of max.

func (Semaphore) Acquire

func (s Semaphore) Acquire()

Acquire the semaphore. Blocks until available. See also AcquireRelease

func (Semaphore) AcquireRelease

func (s Semaphore) AcquireRelease() (release func())

AcquireRelease acquires the semaphore and returns a func that can be called to release it. Blocks until available. Intended to be used with defer:

// Calls s.AcquireRelease() which acquires the semaphore,
// and then defers the release
// func returned by AcquireRelease.
defer s.AcquireRelease()()

func (Semaphore) MaybeAcquire

func (s Semaphore) MaybeAcquire() (ok bool, release func())

MaybeAcquire is like AcquireRelease but it doesn't block. ok is false if the semaphore couldn't be acquired. The release func is a no-op if ok was false, so it's always safe to call:

ok, release := adder.MaybeAcquire()
defer release()
if !ok {
	// do stuff
}

func (Semaphore) Release

func (s Semaphore) Release()

Release the semaphore. See also AcquireRelease

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL