workgroup

package module
v0.23.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2023 License: MIT Imports: 5 Imported by: 1

README

Workgroup GoDoc

Workgroup is a generic Go concurrent task runner. It requires Go 1.20+.

Execute heterogenous tasks

To execute heterogenous tasks with a set number of workers, use workgroup.DoFuncs:

err := workgroup.DoFuncs(3,
    func() error {
        return doThingA(),
    },
    func() error {
        return doThingB(),
    },
    func() error {
        return doThingC(),
    })

Execute homogenous tasks

To execute homogenous tasks with a set number of workers, use workgroup.DoTasks:

things := []someType{thingA, thingB, thingC}

err := workgroup.DoTasks(len(things), things, func(thing someType) error {
    foo := thing.Frobincate()
    return foo.DoSomething()
})

Manage tasks that spawn new tasks

For tasks that may create more work, use workgroup.Do. Create a manager that will be serially executed, and have it save the results and examine the output of tasks to decide if there is more work to do.

// Task fetches a page and extracts the URLs
task := func(u string) ([]string, error) {
    page, err := getURL(ctx, u)
    if err != nil {
        return nil, err
    }
    return getLinks(page), nil
}

// Map from page to links
// Doesn't need a lock because only the manager touches it
results := map[string][]string{}

// Manager keeps track of which pages have been visited and the results graph
manager := func(req string, links []string, err error) ([]string, error) {
    // Halt execution after the first error
    if err != nil {
        return nil, err
    }
    // Save final results in map
    results[req] = urls

    // Check for new pages to scrape
    var newpages []string
    for _, link := range links {
        if _, ok := results[link]; ok {
            // Seen it, try the next link
            continue
        }
        // Add to list of new pages
        newpages = append(newpages, link)
        // Add placeholder to map to prevent double scraping
        results[link] = nil
    }
    return newpages, nil
}

// Process the tasks with as many workers as GOMAXPROCS
err := workgroup.Do(workgroup.MaxProcs, task, manager, "http://example.com/")
if err != nil {
    fmt.Println("error", err)
}

Documentation

Overview

Package workgroup contains generic concurrent task runners.

Index

Examples

Constants

View Source
const MaxProcs = -1

Use GOMAXPROCS workers when doing tasks.

Variables

This section is empty.

Functions

func Do added in v0.23.1

func Do[Input, Output any](n int, task Task[Input, Output], manager Manager[Input, Output], initial ...Input) error

Do tasks using n concurrent workers (or GOMAXPROCS workers if n < 1) which produce output consumed by a serially run manager. The manager should return a slice of new task inputs based on prior task results, or return an error to halt processing. If a task panics during execution, the panic will be caught and returned as an error halting further execution.

Example
package main

import (
	"fmt"
	"io"
	"net/http"
	"net/http/httptest"
	"strings"
	"testing/fstest"

	"github.com/carlmjohnson/workgroup"
	"golang.org/x/exp/maps"
	"golang.org/x/exp/slices"
)

func main() {
	// Example site to crawl with recursive links
	srv := httptest.NewServer(http.FileServer(http.FS(fstest.MapFS{
		"index.html": &fstest.MapFile{
			Data: []byte("/a.html"),
		},
		"a.html": &fstest.MapFile{
			Data: []byte("/b1.html\n/b2.html"),
		},
		"b1.html": &fstest.MapFile{
			Data: []byte("/c.html"),
		},
		"b2.html": &fstest.MapFile{
			Data: []byte("/c.html"),
		},
		"c.html": &fstest.MapFile{
			Data: []byte("/"),
		},
	})))
	defer srv.Close()
	cl := srv.Client()

	// Task fetches a page and extracts the URLs
	task := func(u string) ([]string, error) {
		res, err := cl.Get(srv.URL + u)
		if err != nil {
			return nil, err
		}
		defer res.Body.Close()
		body, err := io.ReadAll(res.Body)
		if err != nil {
			return nil, err
		}

		return strings.Split(string(body), "\n"), nil
	}

	// Manager keeps track of which pages have been visited and the results graph
	tried := map[string]int{}
	results := map[string][]string{}
	manager := func(req string, urls []string, err error) ([]string, error) {
		if err != nil {
			// If there's a problem fetching a page, try three times
			if tried[req] < 3 {
				tried[req]++
				return []string{req}, nil
			}
			return nil, err
		}
		results[req] = urls
		var newurls []string
		for _, u := range urls {
			if tried[u] == 0 {
				newurls = append(newurls, u)
				tried[u]++
			}
		}
		return newurls, nil
	}

	// Process the tasks with as many workers as GOMAXPROCS
	err := workgroup.Do(workgroup.MaxProcs, task, manager, "/")
	if err != nil {
		fmt.Println("error", err)
	}

	keys := maps.Keys(results)
	slices.Sort(keys)
	for _, key := range keys {
		fmt.Println(key, "links to:")
		for _, v := range results[key] {
			fmt.Println("- ", v)
		}
	}

}
Output:

/ links to:
-  /a.html
/a.html links to:
-  /b1.html
-  /b2.html
/b1.html links to:
-  /c.html
/b2.html links to:
-  /c.html
/c.html links to:
-  /

func DoFuncs added in v0.23.1

func DoFuncs(n int, fns ...func() error) error

DoFuncs starts n concurrent workers (or GOMAXPROCS workers if n < 1) that execute each function. Errors returned by a function do not halt execution, but are joined into a multierror return value. If a function panics during execution, the panic will be caught and returned as an error halting further execution.

Example
package main

import (
	"fmt"
	"time"

	"github.com/carlmjohnson/workgroup"
)

func main() {
	start := time.Now()
	err := workgroup.DoFuncs(3, func() error {
		time.Sleep(50 * time.Millisecond)
		fmt.Println("hello")
		return nil
	}, func() error {
		time.Sleep(100 * time.Millisecond)
		fmt.Println("world")
		return nil
	}, func() error {
		time.Sleep(200 * time.Millisecond)
		fmt.Println("from workgroup.DoFuncs")
		return nil
	})
	if err != nil {
		fmt.Println("error", err)
	}
	fmt.Println("executed concurrently?", time.Since(start) < 300*time.Millisecond)
}
Output:

hello
world
from workgroup.DoFuncs
executed concurrently? true

func DoTasks added in v0.23.1

func DoTasks[Input any](n int, items []Input, task func(Input) error) error

DoTasks starts n concurrent workers (or GOMAXPROCS workers if n < 1) and processes each initial input as a task. Errors returned by a task do not halt execution, but are joined into a multierror return value. If a task panics during execution, the panic will be caught and returned as an error halting further execution.

Example
package main

import (
	"fmt"
	"time"

	"github.com/carlmjohnson/workgroup"
)

func main() {
	times := []time.Duration{
		50 * time.Millisecond,
		100 * time.Millisecond,
		200 * time.Millisecond,
	}
	start := time.Now()
	err := workgroup.DoTasks(3, times, func(d time.Duration) error {
		time.Sleep(d)
		fmt.Println("slept", d)
		return nil
	})
	if err != nil {
		fmt.Println("error", err)
	}
	fmt.Println("executed concurrently?", time.Since(start) < 300*time.Millisecond)
}
Output:

slept 50ms
slept 100ms
slept 200ms
executed concurrently? true
Example (Cancel)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/carlmjohnson/workgroup"
)

func main() {
	// To cancel execution early, communicate via a context.CancelFunc
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	times := []time.Duration{
		50 * time.Millisecond,
		100 * time.Millisecond,
		300 * time.Millisecond,
	}
	task := func(d time.Duration) error {
		// simulate doing some work with a context
		t := time.NewTimer(d)
		defer t.Stop()

		select {
		case <-t.C:
			fmt.Println("slept", d)
		case <-ctx.Done():
			fmt.Println("canceled")
		}

		// if some condition applies, cancel the context for everyone
		if d == 100*time.Millisecond {
			cancel()
		}
		return nil
	}
	start := time.Now()
	if err := workgroup.DoTasks(3, times, task); err != nil {
		fmt.Println("error", err)
	}
	fmt.Println("exited promptly?", time.Since(start) < 150*time.Millisecond)
}
Output:

slept 50ms
slept 100ms
canceled
exited promptly? true

Types

type Manager

type Manager[Input, Output any] func(Input, Output, error) ([]Input, error)

Manager is a function that serially examines Task results to see if it produced any new Inputs.

type Task

type Task[Input, Output any] func(in Input) (out Output, err error)

Task is a function that can concurrently transform an input into an output.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL