taskset

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2021 License: MIT Imports: 4 Imported by: 3

README

taskset

Go Reference

Taskset is a library for running concurrent tasks. Tasks can do arbitrary work and depend on other tasks' results.

You can decorate the behaviour of all your tasks with middlewares, for example, you can add logging, measure execution duration, or limit the number of tasks allowed to run at the same time. Here's a list of standard middlewares.

A basic example:

ctx := context.Background()

taskSet := taskset.NewTaskSet()

taskA := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
	time.Sleep(2 * time.Second)
	return 1, nil
})

taskB := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
	time.Sleep(2 * time.Second)
	return 2, nil
})

taskC := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
	if t := depend.ErrGroup(ctx, taskA, taskB); t != nil {
		return nil, depend(ctx, t).Err
	}

	a := depend(ctx, taskA).Value.(int)
	b := depend(ctx, taskB).Value.(int)

	time.Sleep(2 * time.Second)
	return a + b, nil
})

start := time.Now()
taskSet.Start(ctx)
taskSet.Wait(ctx)
totalTime := time.Since(start)

fmt.Printf("total time: %.0fs\n", totalTime.Seconds())
fmt.Println("result:", taskSet.Result(ctx, taskC).Value)

// Output:
// total time: 4s
// result: 3

3rd-party middlewares

We also provide some middlewares that integrate with 3rd-party libraries:

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Depend

type Depend func(context.Context, *Task) Result

Depend is used by RunFunc to declare a dependency on another task from the same task set. DO NOT use TaskSet.Result to get a result of a task from the same task set. Depend will block until the dependent task's results are ready. To wait for multiple tasks in parallel, use Depend.ErrGroup or Depend.SyncGroup. Depend will implicitly start lazy tasks if they weren't running already.

func (Depend) C added in v0.2.0

func (depend Depend) C(ctx context.Context, task *Task) <-chan Result

C is a convenience method. It returns a channel with buffer 1. The result of the task will be sent to this channel when the task finishes.

func (Depend) ErrGroup

func (depend Depend) ErrGroup(ctx context.Context, tasks ...*Task) (result *Task)

ErrGroup starts waiting for a list of tasks in parallel. If any of the tasks fail, ErrGroup won't wait for the other tasks' results and will return the failed task immediately. In this case, context will NOT be cancelled for the other tasks. If all tasks complete successfully, ErrGroup will return nil.

Example
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/bennydictor/taskset"
)

func main() {
	ctx := context.Background()

	taskSet := taskset.NewTaskSet()

	taskA := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		return nil, errors.New("fail")
	})

	taskB := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 2, nil
	})

	taskC := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		if errTask := depend.ErrGroup(ctx, taskA, taskB); errTask != nil {
			return nil, depend(ctx, errTask).Err
		}

		a := depend(ctx, taskA).Value.(int)
		b := depend(ctx, taskB).Value.(int)

		return a + b, nil
	})

	start := time.Now()
	taskSet.Start(ctx)
	cResult := taskSet.Result(ctx, taskC)
	totalTime := time.Since(start)

	fmt.Printf("total time: %.0fs\n", totalTime.Seconds())
	if cResult.Err != nil {
		fmt.Println("C failed:", cResult.Err.Error())
	} else {
		fmt.Println("C result:", cResult.Value.(int))
	}

}
Output:

total time: 0s
C failed: fail

func (Depend) SyncGroup

func (depend Depend) SyncGroup(ctx context.Context, tasks ...*Task)

SyncGroup waits for a list of tasks in parallel, blocking until every task has returned a result, successful or not.

SyncGroup(ctx, tasks...) is different from calling depend() on each task in succession: SyncGroup ensures that all tasks are actually running before waiting for all of them to complete, whereas waiting for tasks in succession won't start running a lazy task until every task before it completes.

Example
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/bennydictor/taskset"
)

func main() {
	ctx := context.Background()

	taskSet := taskset.NewTaskSet()

	taskA := taskSet.NewLazy(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return nil, errors.New("fail")
	})

	taskB := taskSet.NewLazy(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 2, nil
	})

	taskC := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		depend.SyncGroup(ctx, taskA, taskB)

		var a, b int
		if err := depend(ctx, taskA).Err; err == nil {
			a = depend(ctx, taskA).Value.(int)
		}
		if err := depend(ctx, taskB).Err; err == nil {
			b = depend(ctx, taskB).Value.(int)
		}

		return a + b, nil
	})

	start := time.Now()
	taskSet.Start(ctx)
	cResult := taskSet.Result(ctx, taskC)
	totalTime := time.Since(start)

	fmt.Printf("total time: %.0fs\n", totalTime.Seconds())
	if cResult.Err != nil {
		fmt.Println("C failed:", cResult.Err.Error())
	} else {
		fmt.Println("C result:", cResult.Value.(int))
	}

}
Output:

total time: 2s
C result: 2

type Middleware

type Middleware struct {
	// Run injects code into task execution. Middlewares must call next() exactly once
	// during Run.  Middlewares may examine and modify task's properties at any point
	// during Run.  Middlewares may pass a modified context to next(), although it must
	// be derived from the input context.  Middlewares may examine and modify the
	// task's result before returning it.  Leave Run equal to nil to not do anything on
	// task execution.
	Run func(ctx context.Context, task *Task, next func(ctx context.Context) Result) Result

	// Depend injects code into task dependency declaration.  Middlewares must call
	// next() exactly once during Depend.  Middlewares may examine and modify both
	// tasks' properties at any point during Depend.  Middlewares may pass a modified
	// context to next(), although it must be derived from input context.  Middlewares
	// may examine and modify the result of the dependency task before returning it.
	// Note that if the result is modified, the changes are only propagated to the
	// caller of depend(), it doesn't actually modify the dependent task's result.
	// Leave Depend equal to nil to not do anything on dependency declaration.
	Depend func(ctx context.Context, task, dependency *Task, next func(ctx context.Context) Result) Result
}

Middleware is used to inject code into task execution, as well as into dependency declaration.

For some examples of middlewares, see github.com/bennydictor/taskset/middlewares.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bennydictor/taskset"
	"github.com/bennydictor/taskset/properties"
)

func NewPrinter() taskset.Middleware {
	return taskset.Middleware{
		Run: func(ctx context.Context, task *taskset.Task, next func(ctx context.Context) taskset.Result) taskset.Result {
			fmt.Println(properties.Name(task), "starting")
			result := next(ctx)
			fmt.Println(properties.Name(task), "finished")
			return result
		},
		Depend: func(ctx context.Context, task, dependency *taskset.Task, next func(ctx context.Context) taskset.Result) taskset.Result {
			fmt.Println(properties.Name(task), "depend on", properties.Name(dependency), "starting")
			result := next(ctx)
			fmt.Println(properties.Name(task), "depend on", properties.Name(dependency), "finished")
			return result
		},
	}
}

func main() {
	ctx := context.Background()

	taskSet := taskset.NewTaskSet(
		NewPrinter(),
	)

	taskA := taskSet.NewLazy(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		return 1, nil
	},
		properties.WithName("A"),
	)

	taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		a := depend(ctx, taskA).Value.(int)
		return a + 1, nil
	},
		properties.WithName("B"),
	)

	taskSet.Start(ctx)
	taskSet.Wait(ctx)

}
Output:

B starting
B depend on A starting
A starting
A finished
B depend on A finished
B finished

type Property

type Property func(*Task)

Property represents a key-value property assigned to a task on creation. Properties are used by Middlewares.

For some examples of properties, see github.com/bennydictor/taskset/properties.

type Result

type Result struct {
	Value interface{}
	Err   error
}

Result is the result of running a Task. Result is considered a success if Err == nil, and a failure if Err != nil.

type RunFunc

type RunFunc func(context.Context, Depend) (interface{}, error)

RunFunc is the body of a Task. This function will be run in a separate goroutine. Context is passed to this function from TaskSet.Start.

The returned values will be stored in the task's Result.

A task may depend on other tasks' results. To declare a dependency, use the provided Depend function, DO NOT use TaskSet.Result.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task is the basic unit of work and concurrency. Each task runs in a separate goroutine. A task may depend on other tasks' results.

Tasks are created by a TaskSet using a RunFunc and Properties.

func (*Task) ModifyProperty

func (t *Task) ModifyProperty(key interface{}, modify func(interface{}) interface{})

ModifyProperty modifies a property for this task. The modify function runs under a mutex, to allow for serializable transactions. Because of that, you shouldn't do any long operations in the modify function.

You should define your own key type to use with this method, like for context.WithValue.

func (*Task) Property

func (t *Task) Property(key interface{}) interface{}

Property retrieves this task's property by the given key. If there's no property for the given key, nil is returned.

This method should only be used by Middlewares.

type TaskSet

type TaskSet struct {
	// contains filtered or unexported fields
}

TaskSet creates and runs Tasks.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/bennydictor/taskset"
)

func main() {
	ctx := context.Background()

	taskSet := taskset.NewTaskSet()

	taskA := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 1, nil
	})

	taskB := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 2, nil
	})

	taskC := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		if t := depend.ErrGroup(ctx, taskA, taskB); t != nil {
			return nil, depend(ctx, t).Err
		}

		a := depend(ctx, taskA).Value.(int)
		b := depend(ctx, taskB).Value.(int)

		time.Sleep(2 * time.Second)
		return a + b, nil
	})

	start := time.Now()
	taskSet.Start(ctx)
	taskSet.Wait(ctx)
	totalTime := time.Since(start)

	fmt.Printf("total time: %.0fs\n", totalTime.Seconds())
	fmt.Println("result:", taskSet.Result(ctx, taskC).Value)

}
Output:

total time: 4s
result: 3

func NewTaskSet

func NewTaskSet(middlewares ...Middleware) *TaskSet

NewTaskSet creates a new TaskSet.

func (*TaskSet) Eager

func (ts *TaskSet) Eager(task *Task)

Eager marks a lazy task to be non-lazy. The provided task must belong to this TaskSet.

func (*TaskSet) New

func (ts *TaskSet) New(run RunFunc, properties ...Property) *Task

New creates a new Task given its RunFunc and Properties. The created task will run upon calling Start.

func (*TaskSet) NewLazy

func (ts *TaskSet) NewLazy(run RunFunc, properties ...Property) *Task

NewLazy is like New, except the created Task will not run automatically upon calling Start.

The created Task will start running the first time any other task declares a dependency on it. If no tasks declare dependency on the created task, it will not run at all.

A lazy task can be later converted to a non-lazy with Eager.

Example

A lazy task that no other task depends on will never run.

package main

import (
	"context"
	"fmt"
	"sort"
	"strings"
	"sync"

	"github.com/bennydictor/taskset"
)

func main() {
	ctx := context.Background()

	taskSet := taskset.NewTaskSet()

	var mu sync.Mutex
	var tasksStarted []string
	taskStarted := func(task string) {
		mu.Lock()
		defer mu.Unlock()
		tasksStarted = append(tasksStarted, task)
	}

	taskA := taskSet.NewLazy(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		taskStarted("A")
		return 1, nil
	})

	taskB := taskSet.NewLazy(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		taskStarted("B")
		return 2, nil
	})

	taskSet.NewLazy(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		taskStarted("C")
		return 3, nil
	})

	taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		taskStarted("D")

		if t := depend.ErrGroup(ctx, taskA, taskB); t != nil {
			return nil, depend(ctx, t).Err
		}

		a := depend(ctx, taskA).Value.(int)
		b := depend(ctx, taskB).Value.(int)

		return a + b, nil
	})

	taskSet.Start(ctx)
	taskSet.Wait(ctx)

	sort.Strings(tasksStarted)
	fmt.Println(strings.Join(tasksStarted, ", "))

}
Output:

A, B, D

func (*TaskSet) Result

func (ts *TaskSet) Result(ctx context.Context, task *Task) Result

Result returns the Result of a given Task, blocking until it is ready. The provided task must belong to this TaskSet. Context is only used to cancel Result, it is not passed to any of the tasks' RunFuncs.

Result does not run a task, it only waits for the result. If you call Result on a task that is never run, it will block forever.

DO NOT use Result from inside a RunFunc to get a result of a task from the same task set. Such a call to Result will panic. You should use depend(ctx, task) instead.

func (*TaskSet) Start

func (ts *TaskSet) Start(ctx context.Context)

Start runs all non-lazy Tasks created by this task set. Context will be passed to all the tasks' run functions.

func (*TaskSet) Wait

func (ts *TaskSet) Wait(ctx context.Context)

Wait waits for all non-lazy tasks to complete. Context is only used to cancel Wait, it is not passed to any of the tasks' RunFuncs.

Wait does not run any tasks, it only waits for them to finish. If you call Wait and never call Start, it will block forever.

func (*TaskSet) WaitC added in v0.2.0

func (ts *TaskSet) WaitC() <-chan struct{}

WaitC is a convenience method. It returns a channel that will be closed when Wait(context.Background()) would unblock.

Directories

Path Synopsis
opentracing Module
prometheus Module
zap Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL