Documentation ¶
Index ¶
Examples ¶
Constants ¶
const ( ErrPoolAlreadyRunning = Error("pool already running") ErrPoolHasNilSleepTimeFunc = Error("pool has a nil sleep time func") )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool represents a group of workers performing the same task simultaneously
Example ¶
startedWorkers := safeCounter{} finishedWorkers := safeCounter{} workInputChan := make(chan string, 5) var work WorkFunc = func(ctx context.Context) error { startedWorkers.Inc() fmt.Println("Starting worker") for { select { case <-ctx.Done(): finishedWorkers.Inc() fmt.Println("Finishing worker") return ctx.Err() case x, ok := <-workInputChan: if !ok { return fmt.Errorf("unexpected closed input chan") } fmt.Printf("Doing work: %s\n", x) } } } var workerCount WorkerCountFunc = func() uint64 { return 3 } var sleepTime SleepTimeFunc = func() time.Duration { return time.Millisecond * 100 } safeCounterHasVal := func(started *safeCounter, startedVal int, finished *safeCounter, finishedVal int) func() error { return func() error { got := started.Val() if got != startedVal { return fmt.Errorf("expected %d workers to be started, got %d", startedVal, got) } got = finished.Val() if got != finishedVal { return fmt.Errorf("expected %d workers to be finished, got %d", finishedVal, got) } return nil } } p := NewPool("test", work, workerCount, sleepTime, context.Background()) if err := timeoutAfter(time.Second, safeCounterHasVal(&startedWorkers, 0, &finishedWorkers, 0)); err != nil { panic(err) } fmt.Printf("%d started - %d finished\n", startedWorkers.Val(), finishedWorkers.Val()) cancel, err := p.Start() if err != nil { panic(err) } defer cancel() if err := timeoutAfter(time.Second, safeCounterHasVal(&startedWorkers, 3, &finishedWorkers, 0)); err != nil { panic(err) } fmt.Printf("%d started - %d finished\n", startedWorkers.Val(), finishedWorkers.Val()) // Note: sleeps are used to ensure output order workInputChan <- "One" time.Sleep(time.Millisecond * 20) workInputChan <- "Two" time.Sleep(time.Millisecond * 20) workInputChan <- "Three" time.Sleep(time.Millisecond * 20) cancel() if err := timeoutAfter(time.Second, safeCounterHasVal(&startedWorkers, 3, &finishedWorkers, 3)); err != nil { panic(err) } time.Sleep(time.Millisecond * 50) fmt.Printf("%d started - %d finished\n", startedWorkers.Val(), finishedWorkers.Val())
Output: 0 started - 0 finished Starting worker Starting worker Starting worker 3 started - 0 finished Doing work: One Doing work: Two Doing work: Three Finishing worker Finishing worker Finishing worker 3 started - 3 finished
func NewPool ¶
func NewPool(id string, work WorkFunc, desiredWorkerCount WorkerCountFunc, sleepTime SleepTimeFunc, ctx context.Context) *Pool
NewPool returns a new pool
func (*Pool) Done ¶
func (p *Pool) Done() <-chan struct{}
Done returns a channel that is closed when the pool is no longer running. Done returns nil if called when the pool hasn't started running yet.
func (*Pool) Start ¶
func (p *Pool) Start() (context.CancelFunc, error)
Start initiates the pool monitor
func (*Pool) StartOnce ¶
func (p *Pool) StartOnce() (context.CancelFunc, error)
Example ¶
StartOnce will start a pool with the desired worker count, and will close the done channel when all workers have finished.
ctx := context.TODO() // outputSlice is just a concurrently safe slice outputSlice := &safeIntSlice{} // notice that it initially contains 10 values of 0 outputSlice.Init([]int{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) workerInput := make(chan int) var work WorkFunc = func(ctx context.Context) error { fmt.Println("worker started") for { select { case <-ctx.Done(): return ctx.Err() case key, ok := <-workerInput: if !ok { return nil } // when we are given a key, the worker will set it to a value of 1 outputSlice.Set(key, 1) } } } // in this case we will have 3 workers var workerCount WorkerCountFunc = func() uint64 { return 3 } p := NewPool("test", work, workerCount, nil, ctx) fmt.Println("check #1") for i := 0; i < 10; i++ { fmt.Printf("%d = %d\n", i, outputSlice.Get(i)) } fmt.Println("starting workers") cancel, err := p.StartOnce() if err != nil { panic(err) } defer cancel() <-time.After(time.Millisecond * 100) workerInput <- 0 workerInput <- 2 workerInput <- 4 workerInput <- 6 workerInput <- 8 <-time.After(time.Millisecond * 100) fmt.Println("check #2") for i := 0; i < 10; i++ { fmt.Printf("%d = %d\n", i, outputSlice.Get(i)) } workerInput <- 1 workerInput <- 3 workerInput <- 5 workerInput <- 7 fmt.Println("closing worker input chan") close(workerInput) select { case <-p.Done(): fmt.Println("done") case <-time.After(time.Second): panic("timeout") } fmt.Println("check #3") for i := 0; i < 10; i++ { fmt.Printf("%d = %d\n", i, outputSlice.Get(i)) }
Output: check #1 0 = 0 1 = 0 2 = 0 3 = 0 4 = 0 5 = 0 6 = 0 7 = 0 8 = 0 9 = 0 starting workers worker started worker started worker started check #2 0 = 1 1 = 0 2 = 1 3 = 0 4 = 1 5 = 0 6 = 1 7 = 0 8 = 1 9 = 0 closing worker input chan done check #3 0 = 1 1 = 1 2 = 1 3 = 1 4 = 1 5 = 1 6 = 1 7 = 1 8 = 1 9 = 0
type SleepTimeFunc ¶
SleepTimeFunc should return a time.Duration that we should sleep between checking the worker count of a pool
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker is the definition for a single worker
Example ¶
ctx := context.TODO() inputChan := make(chan string, 5) var work WorkFunc = func(ctx context.Context) error { for { select { case <-ctx.Done(): return ctx.Err() case in, ok := <-inputChan: if !ok { return nil } fmt.Print(in) } } return nil } w := NewWorker("name printer", work, ctx) inputChan <- "Tom" inputChan <- "Jim" inputChan <- "Frank" inputChan <- "John" inputChan <- "Tony" close(inputChan) fmt.Print("Start") cancel := w.Start() defer cancel() timer := time.NewTimer(time.Second) defer timer.Stop() select { case <-timer.C: panic("worker did not finish in time") case <-w.Done(): fmt.Print("Done") }
Output: StartTomJimFrankJohnTonyDone
func (*Worker) Done ¶
func (w *Worker) Done() chan struct{}
Done returns a channel you can use to pick up on when a worker has finished
func (*Worker) Start ¶
func (w *Worker) Start() context.CancelFunc
Start initiates a go routine for the worker and returns the cancel context
type WorkerCountFunc ¶
type WorkerCountFunc func() uint64
WorkerCountFunc should return the number of workers you want to be running in the pool