Documentation ¶
Overview ¶
Package parallel provides tooling for running a bounded number of functions in parallel and waiting for them to finish.
Returning results from goroutines ¶
The Func type's examples show how you can return results from goroutines without having to use channels or locking. The general idea is to pre-reserve a slice for them and have each goroutine write into its own index in the slice. This means that you're possibly using more memory, but depending on how many things you're actually doing in parallel and what sort of results you're returning, this method will actually perform better than more complex solutions.
// reserve enough space for the results results := make([]resultType, len(input)) // this is the function we want to parallelize. // idx will go from 0 to len(input) (see Do call below) fn := Func(func(idx int) error { // no mutex needed because every goroutine gets a unique // idx, so writes never overlap results[idx], err := doSomethingCPUIntensiveTo(input[idx]) return err }) // run fn in parallel len(input) times, with at most // GOMAXPROCS at a time. err := fn.Do(len(input)) for i, r := range results { // do something to all the results }
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Errors ¶
type Errors struct {
// contains filtered or unexported fields
}
Errors is for gathering errors in a thread-safe manner. The zero value is usable.
Lock-free. If no errors are added, Errors uses a pointer's worth of memory.
type Func ¶
Func is a parallelizable function. Each function running in parallel with others is given a unique index number, starting from 0
Example ¶
package main import ( "fmt" "net/http" "net/url" "strings" "golang.org/x/net/html" ) func main() { // get all the links in the Go blog, then in all the pages those // pages link to, up to 3 levels deep. links := Scrape(3, "https://blog.golang.org/examples") fmt.Printf("links %d: %v", len(links), links) } // HTMLLinks is a more real-world example of how Func can be used. It // fetches the URLs given in parallel and returns all the unique links // found. func HTMLLinks(urls ...string) (links StringSet, err error) { nSites := len(urls) // we want to get a list of links from each of the sites in urls. // // Reserve space for each of their link sets up front. This // way each goroutine has its own "slot" in results that it can // write to without worrying about concurrent access. // // This trades off memory for ease of implementation and // simplicity. results := make([]StringSet, nSites) // We want to iterate over urls in parallel, fetching the HTML and // parsing the links from each. // // This is the func that does the work. idx will go from 0 to // nSites fn := Func(func(idx int) error { // reading urls is OK since nobody's modifying it concurrently r, err := http.Get(urls[idx]) if err != nil { return err } defer r.Body.Close() doc, err := html.Parse(r.Body) if err != nil { return err } // writing to results[idx] without synchronization is OK since // this is the only goroutine that's writing to that location results[idx] = parseLinks(doc, results[idx]) return nil }) // run fn a total of nSites times. Run at most 5 concurrently. // Blocks until all are finished, and returns a multierror // (go.uber.org/multierr) of all the errors returned err = fn.Do(nSites, 5) // return unique links only var resultSet StringSet for _, res := range results { resultSet.AddSet(res) } return resultSet, err } func Scrape(maxDepth int, urls ...string) (links StringSet) { if maxDepth == 0 { return } // even if there was an error we might still have gotten some // links links, _ = HTMLLinks(urls...) // remove the original urls from found links links.Remove(urls...) // scrape the links we got more := Scrape(maxDepth-1, links.List()...) return links.AddSet(more) } // note: everything below here is just utilities func parseLinks(n *html.Node, found StringSet) StringSet { if n.Type == html.ElementNode && n.Data == "a" { for _, a := range n.Attr { if a.Key == "href" && strings.HasPrefix(a.Val, "http:") || strings.HasPrefix(a.Val, "https:") { u, err := url.Parse(a.Val) if err != nil { break } u.RawQuery = "" u.RawFragment = "" u.Fragment = "" found.Add(u.String()) break } } } for c := n.FirstChild; c != nil; c = c.NextSibling { found = parseLinks(c, found) } return found } type StringSet map[string]struct{} func (s *StringSet) Add(strs ...string) *StringSet { if len(*s) == 0 { *s = make(StringSet, len(strs)) } for _, h := range strs { (*s)[h] = struct{}{} } return s } func (s *StringSet) AddSet(strs StringSet) StringSet { if len(*s) == 0 { *s = make(StringSet, len(strs)) } for v := range strs { (*s)[v] = struct{}{} } return *s } func (s *StringSet) Remove(vs ...string) StringSet { for _, v := range vs { delete(*s, v) } return *s } func (s *StringSet) Sub(vs StringSet) StringSet { for v := range vs { delete(*s, v) } return *s } func (s StringSet) List() (vs []string) { count := len(s) if count == 0 { return } vs = make([]string, 0, count) for handle := range s { vs = append(vs, handle) } return } func (s StringSet) String() string { var ( sb strings.Builder firstDone bool ) sb.WriteByte('[') for str := range s { if firstDone { sb.WriteString(", ") } else { firstDone = true } sb.WriteByte('"') sb.WriteString(str) sb.WriteByte('"') } sb.WriteByte(']') return sb.String() }
Output:
Example (Simple) ¶
// we want to transform this list in parallel input := []int{10, 20, 30, 40, 50} count := len(input) // reserve enough space for the results results := make([]int, len(input)) // this is the function we'll run. // idx goes from 0 to count. fn := Func(func(idx int) error { // no mutex needed because every goroutine gets a unique idx, // so writes never overlap results[idx] = input[idx] * 10 return nil }) // run fn count times in parallel. Run at most 5 at a time // (concurrently). if err := fn.Do(count, 5); err != nil { panic(err) } fmt.Printf("%#v\n", results)
Output: []int{100, 200, 300, 400, 500}
type Funcs ¶
type Funcs []Func
Funcs allows running multiple different Funcs in parallel. Use Add to add a Func to be run, and Do to run them. The zero value of Funcs is usable
Example ¶
// zero value is usable var ops Funcs // just here as an example to show that both funcs run op1Ran := false ops.Add(func(idx int) error { op1Ran = true // ... do some work here return nil }) op2Ran := false ops.Add(func(idx int) error { op2Ran = true // ... do some work here return nil }) // run the operations we added. Runs at most GOMAXPROCS in // parallel if err := ops.Do(); err != nil { panic(err) } fmt.Println("done! Ops ran:", op1Ran, op2Ran)
Output: done! Ops ran: true true
type Semaphore ¶
type Semaphore chan struct{}
Semaphore is a counting semaphore. It allows limiting concurrency.
s := NewSemaphore(10) // we want only 10 of these to run in parallel fn := go func() { // Calls s.AcquireRelease() which acquires the // semaphore, and then defers the release // func returned by AcquireRelease. defer s.AcquireRelease()() // ... do stuff ... } // start a bunch of fn
func NewSemaphore ¶
NewSemaphore creates a new Semaphore with a maximum count of max.
func (Semaphore) Acquire ¶
func (s Semaphore) Acquire()
Acquire the semaphore. Blocks until available. See also AcquireRelease
func (Semaphore) AcquireRelease ¶
func (s Semaphore) AcquireRelease() (release func())
AcquireRelease acquires the semaphore and returns a func that can be called to release it. Blocks until available. Intended to be used with defer:
// Calls s.AcquireRelease() which acquires the semaphore, // and then defers the release // func returned by AcquireRelease. defer s.AcquireRelease()()
func (Semaphore) MaybeAcquire ¶
MaybeAcquire is like AcquireRelease but it doesn't block. ok is false if the semaphore couldn't be acquired. The release func is a no-op if ok was false, so it's always safe to call:
ok, release := adder.MaybeAcquire() defer release() if !ok { // do stuff }