Documentation ¶
Overview ¶
Package goroutinepool provides handy functions for running goroutines concurrently similar to `multiprocess.Pool` in python or `concurrency.ThreadPoolExecutor` in java.
This package is safe for parallel execution.
Index ¶
- func DisableDebugLog()
- func EnableDebugLog()
- func NonBufferedChan(ctx context.Context, fns []func(context.Context)) chan func(context.Context)
- func PopulateAndCloseChan(ctx context.Context, ch chan func(context.Context), ...) chan func(context.Context)
- func RunInPool(ctx context.Context, poolSize int, fns []func(context.Context))
- func RunInPoolWithChan(ctx context.Context, poolSize int, ch chan func(context.Context))
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DisableDebugLog ¶
func DisableDebugLog()
DisableDebugLog makes further calls to RunInPool or RunInPoolWithChan not log trace messages.
func EnableDebugLog ¶
func EnableDebugLog()
EnableDebugLog makes further calls to RunInPool or RunInPoolWithChan to log trace messages.
func NonBufferedChan ¶
NonBufferedChan creates a non-buffered channel out of provided functions.
Channel is auto-closed once all its elements consumed or context canceled.
Example ¶
package main import ( "context" "fmt" "github.com/ykhrustalev/goroutinepool" "sync/atomic" ) func main() { ctx := context.Background() var counter int32 // a simple job to increment a number increment := func(delta int32) func(context.Context) { return func(ctx context.Context) { atomic.AddInt32(&counter, delta) } } // jobs fns := []func(context.Context){ increment(1), // executed and waited increment(10), // executed and waited increment(100), // will be cancelled by timeout } // use two workers ch := goroutinepool.NonBufferedChan(ctx, fns) for fn := range ch { fn(ctx) } fmt.Println(atomic.LoadInt32(&counter)) }
Output: 111
func PopulateAndCloseChan ¶
func PopulateAndCloseChan(ctx context.Context, ch chan func(context.Context), fns []func(context.Context)) chan func(context.Context)
PopulateAndCloseChan passes all provided elements into the given channel. This approach allows providing own buffered channel. The provided channel must not be populated anywhere else otherwise there will be a data race.
See RunInPoolWithChan examples for the use cases.
Channel is auto-closed once all its elements consumed or context canceled.
func RunInPool ¶
RunInPool concurrently runs provided functions with supplied concurrency level unless the context is triggered. Uses at least one worker.
Example (Basic) ¶
package main import ( "context" "fmt" "github.com/ykhrustalev/goroutinepool" "sync/atomic" ) func main() { ctx := context.Background() var counter int32 // a simple job to increment a number increment := func(delta int32) func(context.Context) { return func(ctx context.Context) { atomic.AddInt32(&counter, delta) } } // jobs fns := []func(context.Context){ increment(1), increment(10), increment(100), } // use two workers goroutinepool.RunInPool(ctx, 2, fns) fmt.Println(atomic.LoadInt32(&counter)) }
Output: 111
Example (Cancellation) ¶
package main import ( "context" "fmt" "github.com/ykhrustalev/goroutinepool" "sync/atomic" "time" ) func main() { ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() var counter int32 // a simple job to increment a number increment := func(delta int32) func(context.Context) { return func(ctx context.Context) { atomic.AddInt32(&counter, delta) select { case <-ctx.Done(): break // won't exit unless cancelled } time.Sleep(200 * time.Millisecond) } } // jobs fns := []func(context.Context){ increment(1), // executed and waited increment(10), // executed and waited increment(100), // will be cancelled by timeout } // use two workers goroutinepool.RunInPool(ctx, 2, fns) fmt.Println(atomic.LoadInt32(&counter)) }
Output: 11
func RunInPoolWithChan ¶
RunInPoolWithChan concurrently runs items from a provided channel unless till the context is triggered or channel is closed. Uses at least one worker.
Example (BufferedChannel) ¶
package main import ( "context" "fmt" "github.com/ykhrustalev/goroutinepool" "sync/atomic" ) func main() { ctx := context.Background() var counter int32 // a simple job to increment a number increment := func(delta int32) func(context.Context) { return func(ctx context.Context) { atomic.AddInt32(&counter, delta) } } // create a buffered channel ch := make(chan func(context.Context), 2) // extend channel with jobs // note, that channel gets closed using this function goroutinepool.PopulateAndCloseChan(ctx, ch, []func(context.Context){ increment(1), increment(10), increment(100), }) // use two workers goroutinepool.RunInPoolWithChan(ctx, 2, ch) fmt.Println(atomic.LoadInt32(&counter)) }
Output: 111
Types ¶
This section is empty.