pool

package
v1.0.208 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2026 License: MIT Imports: 9 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAllTasksDone = errors.New("all tasks completed")

ErrAllTasksDone is the cancellation cause set when all tasks have completed and the pool shuts down normally via Wait or Stream.

Functions

This section is empty.

Types

type Pool

type Pool[T any] struct {
	// contains filtered or unexported fields
}

Pool[T any] is a goroutine pool that allows parallel task execution.

It supports two consumption modes:

  • Wait: blocks until all tasks finish, returns collected results. Each Go call spawns a goroutine, limited by a semaphore (tokens).
  • Stream: returns a channel that emits results in real-time. Uses a fixed worker pool — goroutine count equals the concurrency limit, regardless of how many tasks are submitted.

These modes are mutually exclusive — use one or the other per pool lifecycle.

func New

func New[T any]() *Pool[T]

New creates a new goroutine pool.

Example (Wait mode):

p := pool.New[int]()
p.Go(func() Result[int] { return Ok(42) })
results := p.Wait().Collect()

Example (Stream mode):

p := pool.New[int]().Limit(5)
ch := p.Stream(func() {
    for i := range 100 {
        p.Go(func() Result[int] { return Ok(i) })
    }
})
for r := range ch {
    fmt.Println(r.Ok())
}

func (*Pool[T]) ActiveTasks

func (p *Pool[T]) ActiveTasks() int

ActiveTasks returns the number of tasks currently running.

func (*Pool[T]) Cancel

func (p *Pool[T]) Cancel(err ...error)

Cancel cancels all tasks in the pool. An optional error provides the cancellation cause, retrievable via Cause. Defaults to context.Canceled.

func (*Pool[T]) CancelOnError

func (p *Pool[T]) CancelOnError() *Pool[T]

CancelOnError configures the pool to cancel all remaining tasks when any task returns an error or panics.

In Stream mode, the triggering error is guaranteed to be delivered; subsequent results may be dropped.

func (*Pool[T]) Cause

func (p *Pool[T]) Cause() error

Cause returns the reason for the pool's cancellation. Returns nil if the pool has not been canceled.

func (*Pool[T]) ClearMetrics

func (p *Pool[T]) ClearMetrics()

ClearMetrics resets total and failed task counters to zero.

func (*Pool[T]) Context

func (p *Pool[T]) Context(ctx context.Context) *Pool[T]

Context replaces the pool's context with the provided context. If ctx is nil, context.Background() is used. The previous context is canceled before replacement.

func (*Pool[T]) FailedTasks

func (p *Pool[T]) FailedTasks() int

FailedTasks returns the number of tasks that completed with an error.

func (*Pool[T]) GetContext

func (p *Pool[T]) GetContext() context.Context

GetContext returns the current context associated with the pool.

func (*Pool[T]) Go

func (p *Pool[T]) Go(fn func() Result[T])

Go submits a task for execution.

In Wait mode, Go blocks the caller until a worker slot is available (backpressure via semaphore), then spawns a goroutine to execute the task.

In Stream mode, Go sends the task to the worker pool's job queue. It blocks if all workers are busy (backpressure via channel).

If fn is nil, the task completes with an error. If fn panics, the panic is recovered and recorded as an error with a stack trace.

func (*Pool[T]) Limit

func (p *Pool[T]) Limit(workers int) *Pool[T]

Limit sets the maximum number of concurrently running tasks. In Wait mode, this controls the semaphore size. In Stream mode, this determines the number of worker goroutines. Zero or negative values remove the limit (unlimited in Wait mode, GOMAXPROCS workers in Stream mode). Cannot be changed while tasks are running.

On non-Windows systems, the limit is capped by the process stack rlimit.

func (*Pool[T]) Reset

func (p *Pool[T]) Reset() error

Reset restores the pool to its initial state for reuse. Returns an error if tasks are still running.

Example:

p.Wait() // or drain Stream channel
p.Reset()
p.Go(func() Result[int] { return Ok(1) })
p.Wait()

func (*Pool[T]) Stream added in v1.0.208

func (p *Pool[T]) Stream(fn func(), buffer ...int) <-chan Result[T]

Stream spawns a fixed worker pool, runs fn to submit tasks, and returns a channel that emits results as each task completes.

fn is executed in a separate goroutine. Inside fn, call Go to submit tasks. When fn returns, the job queue is closed automatically and workers drain remaining tasks. The channel closes once all workers finish.

The number of worker goroutines equals the Limit (or GOMAXPROCS if no limit is set). Memory usage is constant regardless of how many tasks are submitted.

An optional buffer size prevents slow consumers from blocking workers. With CancelOnError, the first error is guaranteed to be delivered; subsequent results may be dropped after cancellation.

Stream and Wait are mutually exclusive.

Example:

p := pool.New[int]().Limit(5)
ch := p.Stream(func() {
    for i := range 100 {
        p.Go(func() Result[int] { return Ok(i * i) })
    }
})

for r := range ch {
    fmt.Println(r.Ok())
}

func (*Pool[T]) TotalTasks

func (p *Pool[T]) TotalTasks() int

TotalTasks returns the total number of tasks submitted.

func (*Pool[T]) Wait

func (p *Pool[T]) Wait() SeqResult[T]

Wait blocks until all submitted tasks finish and returns their results. Results are returned in an iterator; order is not guaranteed.

Example:

p := pool.New[int]()
p.Go(func() Result[int] { return Ok(1) })
p.Go(func() Result[int] { return Ok(2) })
for r := range p.Wait() {
    fmt.Println(r.Ok())
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL