Documentation ¶
Index ¶
- type Depend
- type Middleware
- type Property
- type Result
- type RunFunc
- type Task
- type TaskSet
- func (ts *TaskSet) Eager(task *Task)
- func (ts *TaskSet) New(run RunFunc, properties ...Property) *Task
- func (ts *TaskSet) NewLazy(run RunFunc, properties ...Property) *Task
- func (ts *TaskSet) Result(ctx context.Context, task *Task) Result
- func (ts *TaskSet) Start(ctx context.Context)
- func (ts *TaskSet) Wait(ctx context.Context)
- func (ts *TaskSet) WaitC() <-chan struct{}
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Depend ¶
Depend is used by RunFunc to declare a dependency on another task from the same task set. DO NOT use TaskSet.Result to get a result of a task from the same task set. Depend will block until the dependent task's results are ready. To wait for multiple tasks in parallel, use Depend.ErrGroup or Depend.SyncGroup. Depend will implicitly start lazy tasks if they weren't running already.
func (Depend) C ¶ added in v0.2.0
C is a convenience method. It returns a channel with buffer 1. The result of the task will be sent to this channel when the task finishes.
func (Depend) ErrGroup ¶
ErrGroup starts waiting for a list of tasks in parallel. If any of the tasks fail, ErrGroup won't wait for the other tasks' results and will return the failed task immediately. In this case, context will NOT be cancelled for the other tasks. If all tasks complete successfully, ErrGroup will return nil.
Example ¶
package main import ( "context" "errors" "fmt" "time" "github.com/bennydictor/taskset" ) func main() { ctx := context.Background() taskSet := taskset.NewTaskSet() taskA := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { return nil, errors.New("fail") }) taskB := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { time.Sleep(2 * time.Second) return 2, nil }) taskC := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { if errTask := depend.ErrGroup(ctx, taskA, taskB); errTask != nil { return nil, depend(ctx, errTask).Err } a := depend(ctx, taskA).Value.(int) b := depend(ctx, taskB).Value.(int) return a + b, nil }) start := time.Now() taskSet.Start(ctx) cResult := taskSet.Result(ctx, taskC) totalTime := time.Since(start) fmt.Printf("total time: %.0fs\n", totalTime.Seconds()) if cResult.Err != nil { fmt.Println("C failed:", cResult.Err.Error()) } else { fmt.Println("C result:", cResult.Value.(int)) } }
Output: total time: 0s C failed: fail
func (Depend) SyncGroup ¶
SyncGroup waits for a list of tasks in parallel, blocking until every task has returned a result, successful or not.
SyncGroup(ctx, tasks...) is different from calling depend() on each task in succession: SyncGroup ensures that all tasks are actually running before waiting for all of them to complete, whereas waiting for tasks in succession won't start running a lazy task until every task before it completes.
Example ¶
package main import ( "context" "errors" "fmt" "time" "github.com/bennydictor/taskset" ) func main() { ctx := context.Background() taskSet := taskset.NewTaskSet() taskA := taskSet.NewLazy(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { time.Sleep(2 * time.Second) return nil, errors.New("fail") }) taskB := taskSet.NewLazy(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { time.Sleep(2 * time.Second) return 2, nil }) taskC := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { depend.SyncGroup(ctx, taskA, taskB) var a, b int if err := depend(ctx, taskA).Err; err == nil { a = depend(ctx, taskA).Value.(int) } if err := depend(ctx, taskB).Err; err == nil { b = depend(ctx, taskB).Value.(int) } return a + b, nil }) start := time.Now() taskSet.Start(ctx) cResult := taskSet.Result(ctx, taskC) totalTime := time.Since(start) fmt.Printf("total time: %.0fs\n", totalTime.Seconds()) if cResult.Err != nil { fmt.Println("C failed:", cResult.Err.Error()) } else { fmt.Println("C result:", cResult.Value.(int)) } }
Output: total time: 2s C result: 2
type Middleware ¶
type Middleware struct { // Run injects code into task execution. Middlewares must call next() exactly once // during Run. Middlewares may examine and modify task's properties at any point // during Run. Middlewares may pass a modified context to next(), although it must // be derived from the input context. Middlewares may examine and modify the // task's result before returning it. Leave Run equal to nil to not do anything on // task execution. Run func(ctx context.Context, task *Task, next func(ctx context.Context) Result) Result // Depend injects code into task dependency declaration. Middlewares must call // next() exactly once during Depend. Middlewares may examine and modify both // tasks' properties at any point during Depend. Middlewares may pass a modified // context to next(), although it must be derived from input context. Middlewares // may examine and modify the result of the dependency task before returning it. // Note that if the result is modified, the changes are only propagated to the // caller of depend(), it doesn't actually modify the dependent task's result. // Leave Depend equal to nil to not do anything on dependency declaration. Depend func(ctx context.Context, task, dependency *Task, next func(ctx context.Context) Result) Result }
Middleware is used to inject code into task execution, as well as into dependency declaration.
For some examples of middlewares, see github.com/bennydictor/taskset/middlewares.
Example ¶
package main import ( "context" "fmt" "github.com/bennydictor/taskset" "github.com/bennydictor/taskset/properties" ) func NewPrinter() taskset.Middleware { return taskset.Middleware{ Run: func(ctx context.Context, task *taskset.Task, next func(ctx context.Context) taskset.Result) taskset.Result { fmt.Println(properties.Name(task), "starting") result := next(ctx) fmt.Println(properties.Name(task), "finished") return result }, Depend: func(ctx context.Context, task, dependency *taskset.Task, next func(ctx context.Context) taskset.Result) taskset.Result { fmt.Println(properties.Name(task), "depend on", properties.Name(dependency), "starting") result := next(ctx) fmt.Println(properties.Name(task), "depend on", properties.Name(dependency), "finished") return result }, } } func main() { ctx := context.Background() taskSet := taskset.NewTaskSet( NewPrinter(), ) taskA := taskSet.NewLazy(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { return 1, nil }, properties.WithName("A"), ) taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { a := depend(ctx, taskA).Value.(int) return a + 1, nil }, properties.WithName("B"), ) taskSet.Start(ctx) taskSet.Wait(ctx) }
Output: B starting B depend on A starting A starting A finished B depend on A finished B finished
type Property ¶
type Property func(*Task)
Property represents a key-value property assigned to a task on creation. Properties are used by Middlewares.
For some examples of properties, see github.com/bennydictor/taskset/properties.
type Result ¶
type Result struct { Value interface{} Err error }
Result is the result of running a Task. Result is considered a success if Err == nil, and a failure if Err != nil.
type RunFunc ¶
RunFunc is the body of a Task. This function will be run in a separate goroutine. Context is passed to this function from TaskSet.Start.
The returned values will be stored in the task's Result.
A task may depend on other tasks' results. To declare a dependency, use the provided Depend function, DO NOT use TaskSet.Result.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task is the basic unit of work and concurrency. Each task runs in a separate goroutine. A task may depend on other tasks' results.
Tasks are created by a TaskSet using a RunFunc and Properties.
func (*Task) ModifyProperty ¶
func (t *Task) ModifyProperty(key interface{}, modify func(interface{}) interface{})
ModifyProperty modifies a property for this task. The modify function runs under a mutex, to allow for serializable transactions. Because of that, you shouldn't do any long operations in the modify function.
You should define your own key type to use with this method, like for context.WithValue.
type TaskSet ¶
type TaskSet struct {
// contains filtered or unexported fields
}
TaskSet creates and runs Tasks.
Example ¶
package main import ( "context" "fmt" "time" "github.com/bennydictor/taskset" ) func main() { ctx := context.Background() taskSet := taskset.NewTaskSet() taskA := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { time.Sleep(2 * time.Second) return 1, nil }) taskB := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { time.Sleep(2 * time.Second) return 2, nil }) taskC := taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { if t := depend.ErrGroup(ctx, taskA, taskB); t != nil { return nil, depend(ctx, t).Err } a := depend(ctx, taskA).Value.(int) b := depend(ctx, taskB).Value.(int) time.Sleep(2 * time.Second) return a + b, nil }) start := time.Now() taskSet.Start(ctx) taskSet.Wait(ctx) totalTime := time.Since(start) fmt.Printf("total time: %.0fs\n", totalTime.Seconds()) fmt.Println("result:", taskSet.Result(ctx, taskC).Value) }
Output: total time: 4s result: 3
func NewTaskSet ¶
func NewTaskSet(middlewares ...Middleware) *TaskSet
NewTaskSet creates a new TaskSet.
func (*TaskSet) Eager ¶
Eager marks a lazy task to be non-lazy. The provided task must belong to this TaskSet.
func (*TaskSet) New ¶
New creates a new Task given its RunFunc and Properties. The created task will run upon calling Start.
func (*TaskSet) NewLazy ¶
NewLazy is like New, except the created Task will not run automatically upon calling Start.
The created Task will start running the first time any other task declares a dependency on it. If no tasks declare dependency on the created task, it will not run at all.
A lazy task can be later converted to a non-lazy with Eager.
Example ¶
A lazy task that no other task depends on will never run.
package main import ( "context" "fmt" "sort" "strings" "sync" "github.com/bennydictor/taskset" ) func main() { ctx := context.Background() taskSet := taskset.NewTaskSet() var mu sync.Mutex var tasksStarted []string taskStarted := func(task string) { mu.Lock() defer mu.Unlock() tasksStarted = append(tasksStarted, task) } taskA := taskSet.NewLazy(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { taskStarted("A") return 1, nil }) taskB := taskSet.NewLazy(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { taskStarted("B") return 2, nil }) taskSet.NewLazy(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { taskStarted("C") return 3, nil }) taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) { taskStarted("D") if t := depend.ErrGroup(ctx, taskA, taskB); t != nil { return nil, depend(ctx, t).Err } a := depend(ctx, taskA).Value.(int) b := depend(ctx, taskB).Value.(int) return a + b, nil }) taskSet.Start(ctx) taskSet.Wait(ctx) sort.Strings(tasksStarted) fmt.Println(strings.Join(tasksStarted, ", ")) }
Output: A, B, D
func (*TaskSet) Result ¶
Result returns the Result of a given Task, blocking until it is ready. The provided task must belong to this TaskSet. Context is only used to cancel Result, it is not passed to any of the tasks' RunFuncs.
Result does not run a task, it only waits for the result. If you call Result on a task that is never run, it will block forever.
DO NOT use Result from inside a RunFunc to get a result of a task from the same task set. Such a call to Result will panic. You should use depend(ctx, task) instead.
func (*TaskSet) Start ¶
Start runs all non-lazy Tasks created by this task set. Context will be passed to all the tasks' run functions.