Documentation
¶
Overview ¶
Package workgroup contains generic concurrent task runners.
Index ¶
Examples ¶
Constants ¶
const MaxProcs = -1
Use GOMAXPROCS workers when doing tasks.
Variables ¶
This section is empty.
Functions ¶
func Do ¶ added in v0.23.1
func Do[Input, Output any](n int, task Task[Input, Output], manager Manager[Input, Output], initial ...Input) error
Do tasks using n concurrent workers (or GOMAXPROCS workers if n < 1) which produce output consumed by a serially run manager. The manager should return a slice of new task inputs based on prior task results, or return an error to halt processing. If a task panics during execution, the panic will be caught and returned as an error halting further execution.
Example ¶
package main
import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing/fstest"
"github.com/carlmjohnson/workgroup"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)
func main() {
// Example site to crawl with recursive links
srv := httptest.NewServer(http.FileServer(http.FS(fstest.MapFS{
"index.html": &fstest.MapFile{
Data: []byte("/a.html"),
},
"a.html": &fstest.MapFile{
Data: []byte("/b1.html\n/b2.html"),
},
"b1.html": &fstest.MapFile{
Data: []byte("/c.html"),
},
"b2.html": &fstest.MapFile{
Data: []byte("/c.html"),
},
"c.html": &fstest.MapFile{
Data: []byte("/"),
},
})))
defer srv.Close()
cl := srv.Client()
// Task fetches a page and extracts the URLs
task := func(u string) ([]string, error) {
res, err := cl.Get(srv.URL + u)
if err != nil {
return nil, err
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
return strings.Split(string(body), "\n"), nil
}
// Manager keeps track of which pages have been visited and the results graph
tried := map[string]int{}
results := map[string][]string{}
manager := func(req string, urls []string, err error) ([]string, error) {
if err != nil {
// If there's a problem fetching a page, try three times
if tried[req] < 3 {
tried[req]++
return []string{req}, nil
}
return nil, err
}
results[req] = urls
var newurls []string
for _, u := range urls {
if tried[u] == 0 {
newurls = append(newurls, u)
tried[u]++
}
}
return newurls, nil
}
// Process the tasks with as many workers as GOMAXPROCS
err := workgroup.Do(workgroup.MaxProcs, task, manager, "/")
if err != nil {
fmt.Println("error", err)
}
keys := maps.Keys(results)
slices.Sort(keys)
for _, key := range keys {
fmt.Println(key, "links to:")
for _, v := range results[key] {
fmt.Println("- ", v)
}
}
}
Output: / links to: - /a.html /a.html links to: - /b1.html - /b2.html /b1.html links to: - /c.html /b2.html links to: - /c.html /c.html links to: - /
func DoFuncs ¶ added in v0.23.1
DoFuncs starts n concurrent workers (or GOMAXPROCS workers if n < 1) that execute each function. Errors returned by a function do not halt execution, but are joined into a multierror return value. If a function panics during execution, the panic will be caught and returned as an error halting further execution.
Example ¶
package main
import (
"fmt"
"time"
"github.com/carlmjohnson/workgroup"
)
func main() {
start := time.Now()
err := workgroup.DoFuncs(3, func() error {
time.Sleep(50 * time.Millisecond)
fmt.Println("hello")
return nil
}, func() error {
time.Sleep(100 * time.Millisecond)
fmt.Println("world")
return nil
}, func() error {
time.Sleep(200 * time.Millisecond)
fmt.Println("from workgroup.DoFuncs")
return nil
})
if err != nil {
fmt.Println("error", err)
}
fmt.Println("executed concurrently?", time.Since(start) < 300*time.Millisecond)
}
Output: hello world from workgroup.DoFuncs executed concurrently? true
func DoTasks ¶ added in v0.23.1
DoTasks starts n concurrent workers (or GOMAXPROCS workers if n < 1) and processes each initial input as a task. Errors returned by a task do not halt execution, but are joined into a multierror return value. If a task panics during execution, the panic will be caught and returned as an error halting further execution.
Example ¶
package main
import (
"fmt"
"time"
"github.com/carlmjohnson/workgroup"
)
func main() {
times := []time.Duration{
50 * time.Millisecond,
100 * time.Millisecond,
200 * time.Millisecond,
}
start := time.Now()
err := workgroup.DoTasks(3, times, func(d time.Duration) error {
time.Sleep(d)
fmt.Println("slept", d)
return nil
})
if err != nil {
fmt.Println("error", err)
}
fmt.Println("executed concurrently?", time.Since(start) < 300*time.Millisecond)
}
Output: slept 50ms slept 100ms slept 200ms executed concurrently? true
Example (Cancel) ¶
package main
import (
"context"
"fmt"
"time"
"github.com/carlmjohnson/workgroup"
)
func main() {
// To cancel execution early, communicate via a context.CancelFunc
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
times := []time.Duration{
50 * time.Millisecond,
100 * time.Millisecond,
300 * time.Millisecond,
}
task := func(d time.Duration) error {
// simulate doing some work with a context
t := time.NewTimer(d)
defer t.Stop()
select {
case <-t.C:
fmt.Println("slept", d)
case <-ctx.Done():
fmt.Println("canceled")
}
// if some condition applies, cancel the context for everyone
if d == 100*time.Millisecond {
cancel()
}
return nil
}
start := time.Now()
if err := workgroup.DoTasks(3, times, task); err != nil {
fmt.Println("error", err)
}
fmt.Println("exited promptly?", time.Since(start) < 150*time.Millisecond)
}
Output: slept 50ms slept 100ms canceled exited promptly? true