Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
Workers int `default:"100"`
}
Config is the configuration for the pool implementation
type ErrCantAcquireWorker ¶
type ErrCantAcquireWorker struct {
// contains filtered or unexported fields
}
ErrCantAcquireWorker is returned by Pool.Worker() function when worker can't be acquired
func (ErrCantAcquireWorker) Error ¶
func (e ErrCantAcquireWorker) Error() string
func (ErrCantAcquireWorker) Unwrap ¶
func (e ErrCantAcquireWorker) Unwrap() error
Unwrap implements the errors unwrapping
type Pool ¶
type Pool interface { // Worker returns a Worker if can be acquired before context is canceled, otherwise returns the // ctx.Err() result Worker(ctx context.Context) (Worker, error) // Stats provides statistics about pool's size, status & config Stats() Stats }
Pool represents a pool of runners, understanding runners as func(func()) functions like func(f func()) { go f() }
Example ¶
package main import ( "context" "fmt" "log" "sync" "time" "github.com/cabify/runnerpool" ) func main() { const workers = 2 const tasks = workers + 1 const timeout = 250 * time.Millisecond cfg := runnerpool.Config{ Workers: workers, } runner := func(f func()) { go f() } pool := runnerpool.New(cfg, runner) err := pool.Start() if err != nil { log.Fatal(err) } wg := sync.WaitGroup{} wg.Add(tasks) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() for i := 1; i <= tasks; i++ { go func(i int) { time.Sleep(time.Duration(i) * timeout / 10) worker, err := pool.Worker(ctx) if err != nil { fmt.Printf("Can't acquire worker %d\n", i) wg.Done() return } defer worker.Release() worker.Run(func(ctx context.Context) { time.Sleep(time.Duration(i) * timeout * 2) fmt.Printf("Worker %d done\n", i) wg.Done() }) }(i) } wg.Wait() }
Output: Can't acquire worker 3 Worker 1 done Worker 2 done
type Worker ¶
type Worker interface { // Run runs the given function. // The context provided is will be canceled if WorkerPool.Stop is called. Run(func(context.Context)) // Release should be called at least once every time a worker is acquired // It is safe to call Release twice and calling Release on a worker that is already running has no effect, // so the safest way is to defer worker.Release() once the worker has been acquired. Release() }
Worker represents a pool worker that offers a one-time usable runner
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool implements Pool using a runner & goroutines pool
func New ¶
func New(cfg Config, runner func(func())) *WorkerPool
New returns a new WorkerPool, this should be started after being used Run should be provided to choose whether to recover or report panics, notice that if you recover from panics, the workers won't be restablished so you'll eventually exhaust them
func (*WorkerPool) Stats ¶
func (p *WorkerPool) Stats() Stats
Stats provides thread-safe statistics about pool's size, status & config
Directories ¶
Path | Synopsis |
---|---|
Package runnerpooltest provides an implementation of runnerpool.Pool to be used from tests of third party libraries where we just need to provide a dependency that will execute the code synchronously.
|
Package runnerpooltest provides an implementation of runnerpool.Pool to be used from tests of third party libraries where we just need to provide a dependency that will execute the code synchronously. |