Version: v0.0.0-...-9649366 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2019 License: Apache-2.0 Imports: 9 Imported by: 0



Package sync implements synchronization facililites such as worker pools.




This section is empty.


This section is empty.


This section is empty.


type NowFn

type NowFn func() time.Time

NowFn is a function that returns the current time.

type PooledWorkerPool

type PooledWorkerPool interface {
	// Init initializes the pool.

	// Go assign the Work to be executed by a Goroutine. Whether or not
	// it waits for an existing Goroutine to become available or not
	// is determined by the GrowOnDemand() option. If GrowOnDemand is not
	// set then the call to Go() will block until a goroutine is available.
	// If GrowOnDemand() is set then it will expand the pool of goroutines to
	// accommodate the work. The newly allocated goroutine will temporarily
	// participate in the pool in an effort to amortize its allocation cost, but
	// will eventually be killed. This allows the pool to dynamically respond to
	// workloads without causing excessive memory pressure. The pool will grow in
	// size when the workload exceeds its capacity and shrink back down to its
	// original size if/when the burst subsides.
	Go(work Work)

PooledWorkerPool provides a pool for goroutines, but unlike WorkerPool, the actual goroutines themselves are re-used. This can be useful from a performance perspective in scenarios where the allocation and growth of the new goroutine and its stack is a bottleneck. Specifically, if the work function being performed has a very deep call-stack, calls to runtime.morestack can dominate the workload. Re-using existing goroutines allows the stack to be grown once, and then re-used for many invocations.

In order to prevent abnormally large goroutine stacks from persisting over the life-cycle of an application, the PooledWorkerPool will randomly kill existing goroutines and spawn a new one.

The PooledWorkerPool also implements sharding of its underlying worker channels to prevent excessive lock contention.

func NewPooledWorkerPool

func NewPooledWorkerPool(size int, opts PooledWorkerPoolOptions) (PooledWorkerPool, error)

NewPooledWorkerPool creates a new worker pool.

type PooledWorkerPoolOptions

type PooledWorkerPoolOptions interface {
	// SetGrowOnDemand sets whether the GrowOnDemand feature is enabled.
	SetGrowOnDemand(value bool) PooledWorkerPoolOptions

	// GrowOnDemand returns whether the GrowOnDemand feature is enabled.
	GrowOnDemand() bool

	// SetNumShards sets the number of worker channel shards.
	SetNumShards(value int64) PooledWorkerPoolOptions

	// NumShards returns the number of worker channel shards.
	NumShards() int64

	// SetKillWorkerProbability sets the probability to kill a worker.
	SetKillWorkerProbability(value float64) PooledWorkerPoolOptions

	// KillWorkerProbability returns the probability to kill a worker.
	KillWorkerProbability() float64

	// SetNowFn sets the now function.
	SetNowFn(value NowFn) PooledWorkerPoolOptions

	// NowFn returns the now function.
	NowFn() NowFn

	// SetInstrumentOptions sets the instrument options.
	SetInstrumentOptions(value instrument.Options) PooledWorkerPoolOptions

	// InstrumentOptions returns the now function.
	InstrumentOptions() instrument.Options

PooledWorkerPoolOptions is the options for a PooledWorkerPool.

func NewPooledWorkerPoolOptions

func NewPooledWorkerPoolOptions() PooledWorkerPoolOptions

NewPooledWorkerPoolOptions returns a new PooledWorkerPoolOptions with default options

type Work

type Work func()

Work is a unit of item to be worked on.

type WorkerPool

type WorkerPool interface {
	// Init initializes the pool.

	// Go waits until the next wbyorker becomes available and executes it.
	Go(work Work)

	// GoIfAvailable performs the work inside a worker if one is available and
	// returns true, or false otherwise.
	GoIfAvailable(work Work) bool

	// GoWithTimeout waits up to the given timeout for a worker to become
	// available, returning true if a worker becomes available, or false
	// otherwise
	GoWithTimeout(work Work, timeout time.Duration) bool

WorkerPool provides a pool for goroutines.

package main

import (

	xsync ""

type response struct {
	a int

func main() {
	var (
		wg          sync.WaitGroup
		workers     = xsync.NewWorkerPool(3)
		errorCh     = make(chan error, 1)
		numRequests = 9
		responses   = make([]response, numRequests)


	for i := 0; i < numRequests; i++ {
		// Capture loop variable.
		i := i

		// Execute request on worker pool.
		workers.Go(func() {
			defer wg.Done()

			var err error

			// Perform some work which may fail.
			resp := response{a: i}

			if err != nil {
				// Return the first error that is encountered.
				select {
				case errorCh <- err:


			// Can concurrently modify responses since each iteration updates a
			// different index.
			responses[i] = resp

	// Wait for all requests to finish.

	if err := <-errorCh; err != nil {

	var total int
	for _, r := range responses {
		total += r.a

	fmt.Printf("Total is %v", total)

Total is 36

func NewWorkerPool

func NewWorkerPool(size int) WorkerPool

NewWorkerPool creates a new worker pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL