laminar

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2023 License: AGPL-3.0 Imports: 7 Imported by: 0

Documentation

Overview

Package laminar manages goroutines that depend on each other, respecting dependency relationships and context cancellation. It combines an errgroup.Group with a DAG, and executes tasks in topological order.

Create a new group, which encapsulates a dependency graph, with laminar.NewGroup. Then add tasks to the group and declare dependencies with Group.NewTask().After(). Once all tasks have been added, start the group asynchronously with Group.Start, and wait for the group to finish with Group.Wait.

Index

Examples

Constants

View Source
const (
	// NoLimit indicates that the Group can run any number of goroutines at once.
	NoLimit int = -1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Group

type Group struct {
	// contains filtered or unexported fields
}
Example
// This example simulates long-running operations to get two values
// then sums them up after both operations complete.
g := NewGroup(context.Background(), NoLimit)

// Since tasks cannot return values to their dependents,
// they can instead communicate with shared variables
// captured by the anonymous functions passed to NewTask.
// If this is too unidiomatic, channels could be shared too.
var one, two int

getOne := g.NewTask("getOne", func(ctx context.Context) error {
	// Context cancellation should be respected when blocking
	// in a long-running operation. Here it's time.After, but
	// in practice this would be some kind of I/O.
	// In other words, pass ctx into http.NewRequestWithContext,
	// sql.DB.QueryContext, etc...
	select {
	case <-time.After(10 * time.Millisecond):
		one = 1
	case <-ctx.Done():
		fmt.Println("getOne context:", ctx.Err().Error())
	}

	fmt.Println("getOne exits")
	return nil
})

getTwo := g.NewTask("getTwo", func(ctx context.Context) error {
	select {
	case <-time.After(20 * time.Millisecond):
		two = 2
	case <-ctx.Done():
		fmt.Println("getTwo context:", ctx.Err().Error())
	}

	fmt.Println("getTwo exits")
	return nil
})

g.NewTask("sum", func(ctx context.Context) error {
	// If context is canceled before this starts,
	// or if getOne or getTwo return an error,
	// this won't start
	fmt.Println("sum starts")

	// This is race-free as the writes to one and two
	// happen-before the reads here
	fmt.Println(one + two)
	return nil
}).After(getOne, getTwo)

// The dependency graph looks like:
// getOne -.
//         |--> sum
// getTwo -'
//
// The execution order of getOne relative to getTwo is
// not guaranteed, however it is guaranteed that sum
// starts after getOne and getTwo finish.

// This will print the group state
// as well as its task relationships
fmt.Println(g)
fmt.Println()

err := g.Start()
if err != nil {
	panic(err)
}

err = g.Wait()
if err != nil {
	panic(err)
}

fmt.Println()
fmt.Println(g)
fmt.Println()
Output:

Group: started=false
getOne [created] -> sum [created]
getTwo [created] -> sum [created]
sum [created] ->

getOne exits
getTwo exits
sum starts
3

Group: started=true
getOne [finished] -> sum [finished]
getTwo [finished] -> sum [finished]
sum [finished] ->

func NewGroup

func NewGroup(ctx context.Context, limit int) *Group

NewGroup creates a new Group. It accepts a context from which another context is derived and passed to tasks in the group. limit is the maximum number of goroutines that can run simultaneously. Pass NoLimit to disable the limit.

func (*Group) NewTask

func (g *Group) NewTask(name string, f func(context.Context) error) *Task

NewTask creates a new Task in this Group. f accepts a context that is canceled after any other f passed to NewTask returns an error. name is shown when g.String() is called. There is no "get task by name" method: instead, retain the *Task and use it directly. NewTask must not be called after Start.

func (*Group) Start

func (g *Group) Start() error

Start starts the Tasks added to the Group in their dependency order. It returns an error if the order cannot be established because there is a cyclic dependency. Start must not be called twice.

The actual task execution order is not guaranteed to be the same across multiple starts of the same group, created anew with the same dependency graph.

func (*Group) String

func (g *Group) String() string

String returns the string representation of this Group.

func (*Group) Wait

func (g *Group) Wait() error

Wait waits for all goroutines started in the Group to exit. Wait returns the first error returned from any task, or else any context error that prevented all tasks from starting. If Wait returns nil, all tasks completed successfully.

type Task

type Task struct {
	// contains filtered or unexported fields
}

func (*Task) After

func (t *Task) After(befores ...*Task) *Task

After establishes an ordering: all befores complete before this task starts. After must not be called after the parent Group has started. Only one goroutine may call After on any given Task at once. However multiple goroutines may call After on different Tasks. After returns its Task to make this pattern possible:

task := group.NewTask(...).After(beforeTask)

This should feel familiar to users of libraries like gomock.

func (*Task) String

func (t *Task) String() string

String returns the string representation of this Task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL