Back to godoc.org

Package workerpool

v0.0.9
Latest Go to latest
Published: Jun 24, 2020 | License: Apache-2.0 | Module: github.com/grailbio/base

Index

Package Files

type Task

type Task interface {
	Do(grp *TaskGroup) error
}

Task provides an interface for an individual task. Tasks are executed by workers by calling the Do function.

type TaskGroup

type TaskGroup struct {
	Name       string
	ErrHandler *multierror.MultiError
	Wp         *WorkerPool
	// contains filtered or unexported fields
}

TaskGroup is used group Tasks together so the consumer can wait for a specific subgroup of Tasks to Wait.

func (*TaskGroup) Enqueue

func (grp *TaskGroup) Enqueue(t Task, block bool) bool

Enqueue puts a Task in the queue. If block is true and the channel is full, then the function blocks. If block is false and the channel is full, then the function returns false.

func (*TaskGroup) Wait

func (grp *TaskGroup) Wait()

Wait blocks until all Tasks in this TaskGroup have completed.

type WorkerPool

type WorkerPool struct {
	Ctx         context.Context
	Concurrency int
	// contains filtered or unexported fields
}

WorkerPool provides a mechanism for executing Tasks with a specific concurrency. A Task is an interface containing a single function Do. A TaskGroup allows Tasks to be grouped together so the parent process can wait for all Tasks in a TaskGroup to Wait. Tasks can create new Tasks and add them to the TaskGroup or new TaskGroups and add them to the WorkerPool. A simple example looks like this:

wp := fileset.WorkerPool(context.Background(), 3) tg1 := wp.NewTaskGroup("context1") tg1.Enqueue(MyFirstTask, true) tg2 := wp.NewTaskGroup("context2") tg2.Enqueue(MyFourthTask, true) tg1.Enqueue(MySecondTask, true) tg2.Enqueue(MyFifthTask, true) tg1.Enqueue(MyThirdTask, true) tg1.Wait() tg2.Enqueue(MySixthTask, true) tg2.Wait() wp.Wait()

TaskGroups can come and go until wp.Wait() has been called. Tasks can come and go in a TaskGroup until tg.Wait() has been called. All the Tasks in this example are executed by 3 go routines.

Note: Each WorkerPool will create a goroutine to keep track of active TaskGroups. Each TaskGroup will create a goroutine to keep track of pending/active tasks.

func New

func New(ctx context.Context, concurrency int) *WorkerPool

New creates a WorkerPool with the given concurrency.

TODO(pknudsgaard): Should return a closure calling Wait.

func (*WorkerPool) Err

func (wp *WorkerPool) Err() error

Err returns the context.Context error to determine if WorkerPool Waitd due to the context.

func (*WorkerPool) NewTaskGroup

func (wp *WorkerPool) NewTaskGroup(name string, errHandler *multierror.MultiError) *TaskGroup

NewTaskGroup creates a TaskGroup for Tasks to be executed in.

TODO(pknudsgaard): TaskGroup should have a context.Context which is separate from the WorkerPool context.Context.

TODO(pknudsgaard): Should return a closure calling Wait.

func (*WorkerPool) Wait

func (wp *WorkerPool) Wait()

Wait blocks until all TaskGroups in the WorkerPool have Waitd.

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier