README

worker-pools

Go Report Card

Go package for managing a set of lazily constructed, self-expiring, concurrency-limited worker pools.

maxConcurrentWorkloads := 500
stalePoolExpiration := 10*time.Minute
maxPoolLifetime := 4*time.Hour
poolManager := pool.NewWorkerPoolManager(
  maxConcurrentWorkloads, stalePoolExpiration, maxPoolLifetime,
)

pool, doneUsing := poolManager.GetPool("pool 1")
pool.Submit(func() {
  // Do anything here. Only maxConcurrentWorkloads will be allowed to execute concurrently per pool.
  // This is useful for limiting concurrent usage of external resources.
})
close(doneUsing)

Each pool instance is constructed when it is required and cached for stalePoolExpiration each time it is used, up to a maximum of maxPoolLifetime if the pool is receiving constant usage. Multiple goroutines may safely reserve and use pools concurrently. The pool will spin up worker routines lazily as they're required, allowing for large levels of concurrency and a high cardinality of pools in the manager.

If you want to attach shared data or behavior to each pool instance:

type myPooledData struct {
	pool.WorkerPool

	// You can put shared data of any type here
	myData string
}
func (p *myPooledData) Dispose() {
	p.WorkerPool.Dispose()
	// Release shared data here.
}

poolManager := pool.NewWorkerPoolManager(500, 10*time.Minute, 4*time.Hour)

var poolFactory pool.Factory = func(maxSize int) (pool.WorkerPool, error) {
	workerPool, _ := pool.NewWorkerPool(maxSize)

	// Build shared resources here, return errors, etc.

	return &myPooledData{
		WorkerPool: workerPool,
		myData: "my shared data"
	}, nil
}

pool, doneUsing, err := s.ClientBundleManager.GetPoolWithFactory("pool 1", sendSize, bundleFactory)
fmt.Println(pool.(*myPooledData).myData)
// Any error returned in the Factory function will bubble up here
if err != nil {
  // Handle
}
pool.Submit(func() {
  // Do anything here. Only maxConcurrentWorkloads will be allowed to execute concurrently per pool.
})
close(doneUsing)

See GoDoc for more details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseWorkerPool

type BaseWorkerPool struct {
	// contains filtered or unexported fields
}

    BaseWorkerPool is the base implementation of WorkerPool

    func (*BaseWorkerPool) Dispose

    func (p *BaseWorkerPool) Dispose()

      Dispose the pool, closing down the workers and releasing any shared resources.

      func (*BaseWorkerPool) Submit

      func (p *BaseWorkerPool) Submit(w Work)

        Submit an item of Work to be executed.

        When all workers are busy, and an additional workerPoolMaxSize of pending work beyond that is also already enqueued, this method will block until workers become available.

        type Factory

        type Factory func(maxSize int) (WorkerPool, error)

          Factory builds a new WorkerPool

          type Work

          type Work func()

            Work - a unit of work

            type WorkerPool

            type WorkerPool interface {
            	Submit(w Work)
            	Dispose()
            	// contains filtered or unexported methods
            }

              WorkerPool is a fixed-size pool of workers.

              func NewWorkerPool

              func NewWorkerPool(maxSize int) (WorkerPool, error)

                NewWorkerPool builds a new BaseWorkerPool and return it as a WorkerPool. This is the default pool factory.

                type WorkerPoolManager

                type WorkerPoolManager struct {
                	// contains filtered or unexported fields
                }

                  WorkerPoolManager - Self-expiring, lazily constructed map of fixed-size worker pools safe for concurrent use

                  func NewWorkerPoolManager

                  func NewWorkerPoolManager(
                  	poolSize int, stalePoolExpiration time.Duration, maxPoolLifetime time.Duration,
                  ) *WorkerPoolManager

                    NewWorkerPoolManager factory constructor

                    * poolSize - The max number of workers for each key * stalePoolExpiration - how long to cache unused pools for * maxPoolLifetime - max time to allow pools to live

                    func (*WorkerPoolManager) GetPool

                    func (m *WorkerPoolManager) GetPool(key string, sendSize int) (WorkerPool, chan<- bool)

                      GetPool returns the WorkerPool for this key, building a BaseWorkerPool and caching it if necessary. Spawns sendSize workers, up to a max of the manager's poolSize.

                      This returns the pool in an "unexpirable" state - the caller should signal the returned done channel when it no longer requires the returned bundle.

                      func (*WorkerPoolManager) GetPoolWithFactory

                      func (m *WorkerPoolManager) GetPoolWithFactory(
                      	key string, sendSize int, factory Factory,
                      ) (WorkerPool, chan<- bool, error)

                        GetPoolWithFactory returns the WorkerPool for this key, allowing you to specify a custom pool.Factory if you want to build a custom WorkerPool implementation which embeds a BaseWorkerPool and attaches supplimentary shared data for the pool.