Documentation
¶
Overview ¶
Package workerpool provides high-performance concurrency primitives for building scalable worker pools and schedulers.
Design goals ¶
The package is designed around the following principles:
- Minimize allocations and garbage collection pressure
- Avoid locks on hot paths
- Reduce scheduler and wake-up overhead
- Provide predictable throughput under high contention
Rather than optimizing for minimal latency of a single task, workerpool optimizes for sustained throughput and stability when handling large volumes of short-lived jobs.
Architecture overview ¶
The worker pool is composed of three loosely coupled layers:
Scheduling (schedQueue) Responsible for ordering, batching, and dequeuing jobs. Different queue implementations may be plugged in without modifying the pool or worker logic.
Execution (Pool / workers) Workers fetch batches of jobs and execute them sequentially. Parallelism is achieved across workers, not within a batch.
Job lifecycle Jobs carry their payload, execution function, optional context, and optional cleanup logic.
Batching model ¶
Jobs are dequeued in batches to amortize scheduling overhead such as atomic operations, cache misses, and worker wake-ups.
Important: batching amortizes scheduling, not execution.
Jobs within a batch are executed sequentially by a single worker. This preserves cache locality, avoids goroutine churn, and keeps execution costs predictable.
Parallelism is achieved by running multiple workers concurrently, each processing its own batches.
Queue design ¶
The default scheduler uses a lock-free segmented FIFO queue. Jobs are stored in fixed-size segments linked together dynamically.
Key properties of the segmented queue:
- Multiple producers can enqueue concurrently
- Consumers dequeue contiguous batches
- Memory is aggressively reused via segment recycling
- Generation counters prevent ABA issues without clearing buffers
The queue design is optimized for workloads with many producers and relatively small, fast jobs.
Error handling ¶
The pool distinguishes between two classes of errors:
- Job errors: returned by job functions or produced by panic recovery
- Internal errors: unexpected failures inside the pool itself
Errors are reported via user-provided handlers and do not stop worker execution. Panics inside jobs are recovered to prevent worker termination.
CPU pinning ¶
On Linux, workers may optionally be pinned to specific CPUs. When enabled, workers are locked to OS threads and restricted to run on a single CPU core.
This can improve cache locality and reduce scheduler-induced migration for CPU-bound workloads, but is not universally beneficial.
Intended use cases ¶
workerpool is well suited for:
- High-throughput task execution
- Fan-in / fan-out pipelines
- CPU-bound or cache-sensitive workloads
- Systems where allocation behavior matters
It is not intended as a general-purpose goroutine replacement or for workloads dominated by blocking I/O.
Extensibility ¶
The scheduling layer is intentionally abstracted to allow experimentation with alternative queue designs, such as:
- Priority queues
- Bucket-based schedulers
- Time-sliced or aging queues
New queue types can be introduced without changing the worker execution model or public API.
Index ¶
- Constants
- Variables
- func NewSegmentPool[T any](pageSize uint32, prefill int, maxKeep int, fastPut int, fastGet int) *segmentPool[T]
- func NewSegmentedQ[T any](opts Options, spool segmentPoolProvider[T]) *segmentedQ[T]
- func PinToCPU(cpu int) error
- func ShedDumpStats()
- type AtomicMetrics
- type Batch
- type ErrorHandler
- type Job
- type JobFunc
- type JobMeta
- type JobPriority
- type MetricsPolicy
- type NoopMetrics
- type Option
- type Options
- type Pool
- type QueueType
- type RevolvingBucketOptions
- type RevolvingBucketQ
- func (rq *RevolvingBucketQ[T]) BatchPop() (Batch[T], bool)
- func (rq *RevolvingBucketQ[T]) Len() int
- func (rq *RevolvingBucketQ[T]) MaybeHasWork() bool
- func (rq *RevolvingBucketQ[T]) OnBatchDone(b Batch[T])
- func (rq *RevolvingBucketQ[T]) Push(job Job[T]) error
- func (rq *RevolvingBucketQ[T]) StatSnapshot() string
- type WakeupWorker
Constants ¶
const ( // DefaultSegmentSize is the default number of jobs per segment. // It should be large enough to amortize allocation costs but // small enough to fit comfortably in cache. DefaultSegmentSize = 4096 DefaultFastPutGet = 1024 )
Variables ¶
var ( ErrInvalidPriority = errors.New("bucket queue: invalid priority") ErrPushToActive = errors.New("bucket queue: push to active bucket") ErrPushSegmentedQ = errors.New("bucket queue: failed to push into segmented queue") )
var ( // ErrQueueFull is returned when the underlying queue // cannot accept more jobs. ErrQueueFull = errors.New("queue: queue is full") // ErrNilFunc is returned when a submitted Job has a nil Fn. ErrNilFunc = errors.New("queue: job func is nil") )
var DefaultSegmentCount uint32 = uint32(runtime.GOMAXPROCS(0) * 16)
DefaultSegmentCount defines the default number of preallocated segments. It scales with GOMAXPROCS to reduce contention under load.
var ( // ErrClosed is returned when submitting a job to a pool // that has already been shut down. ErrClosed = errors.New("workerpool: pool is closed") )
var ErrPoolPanic = errors.New("workerpool: job panic. ")
ErrPoolPanic is returned when a job function panics.
Panics are recovered to prevent worker termination and are converted into regular errors.
Functions ¶
func NewSegmentPool ¶
func NewSegmentedQ ¶
NewSegmentedQ initializes a segmented queue with preallocated segments.
func PinToCPU ¶
PinToCPU pins the current OS thread to a specific CPU.
It restricts the calling thread to run only on the given CPU core. This is typically used in conjunction with runtime.LockOSThread to improve cache locality and reduce scheduler-induced migration.
This function is Linux-specific and has no effect on other platforms.
func ShedDumpStats ¶
func ShedDumpStats()
Types ¶
type AtomicMetrics ¶
type AtomicMetrics struct {
// contains filtered or unexported fields
}
AtomicMetrics is a lock-free metrics implementation backed by atomics.
Writes are optimized for hot paths. Reads are intended for cold-path observation.
func (*AtomicMetrics) BatchDecQueued ¶
func (m *AtomicMetrics) BatchDecQueued(n int64)
BatchDecQueued decrements the queued jobs counter by
func (*AtomicMetrics) Executed ¶
func (m *AtomicMetrics) Executed() uint64
Executed returns the total number of executed jobs. Intended for cold-path observation.
func (*AtomicMetrics) IncExecuted ¶
func (m *AtomicMetrics) IncExecuted()
IncExecuted increments the executed jobs counter by one.
func (*AtomicMetrics) IncQueued ¶
func (m *AtomicMetrics) IncQueued()
IncQueued increments the queued jobs counter by one.
func (*AtomicMetrics) Queued ¶
func (m *AtomicMetrics) Queued() int64
Queued returns the current number of queued jobs. Intended for cold-path observation.
type Batch ¶
type Batch[T any] struct { Jobs []Job[T] Seg *segment[T] End uint32 // Meta is an optional, queue-private field. // It is opaque to the pool and interpreted only by the queue implementation. Meta any }
Batch represents a contiguous group of jobs dequeued from a schedQueue.
The batch must be completed by calling OnBatchDone on the originating queue to allow proper resource reclamation.
type ErrorHandler ¶
type ErrorHandler func(e error)
ErrorHandler is a user-provided callback invoked on internal or job-level errors.
type Job ¶
Job represents a single unit of work submitted to the pool.
Payload is passed to Fn when executed. Ctx controls cancellation before execution. CleanupFunc, if set, is executed after job completion.
func (Job[T]) GetPriority ¶
func (j Job[T]) GetPriority() JobPriority
func (*Job[T]) SetPriority ¶
func (j *Job[T]) SetPriority(p JobPriority)
type JobPriority ¶
type JobPriority uint8
const ( MinBucketPriority JobPriority = 1 MaxBucketPriority JobPriority = 63 BucketCount = 64 )
type MetricsPolicy ¶
type MetricsPolicy interface {
// IncExecuted increments the executed jobs counter.
IncExecuted()
// IncQueued increments the queued jobs counter.
IncQueued()
// BatchDecQueued decrements the queued counter by n.
//
// This is typically used when a batch of jobs is removed
// from the scheduling queue.
BatchDecQueued(n int64)
}
MetricsPolicy defines hooks used by the worker pool to report queueing and execution activity.
Implementations must be safe for concurrent use. All methods are expected to be lightweight and non-blocking
type NoopMetrics ¶
type NoopMetrics struct{}
NoopMetrics is a MetricsPolicy implementation that discards all metric updates.
It can be used when metrics collection is disabled and zero overhead is desired.
func (*NoopMetrics) BatchDecQueued ¶
func (m *NoopMetrics) BatchDecQueued(n int64)
func (*NoopMetrics) IncExecuted ¶
func (m *NoopMetrics) IncExecuted()
func (*NoopMetrics) IncQueued ¶
func (m *NoopMetrics) IncQueued()
type Option ¶
type Option func(*Options)
Option configures a worker Pool.
func WithPinnedWorkers ¶
WithPinnedWorkers enables CPU pinning for workers.
func WithSegmentCount ¶
WithSegmentCount sets the queue initial segment count.
func WithSegmentSize ¶
WithSegmentSize sets the queue segment size.
type Options ¶
type Options struct {
// Workers is the number of worker goroutines.
//
// Defaults to runtime.GOMAXPROCS(0).
Workers int
// SegmentSize is the number of jobs stored in a single queue segment.
//
// Larger values reduce segment churn but increase batch scan cost.
SegmentSize uint32
// SegmentCount is the number of queue segments preallocated on startup.
//
// Increasing this value reduces allocations under load at the cost
// of higher baseline memory usage.
SegmentCount uint32
// PoolCapacity limits the number of reusable segments kept
// in the internal segment pool.
//
// Larger values trade memory for fewer allocations.
PoolCapacity uint32
// QT selects the scheduler queue implementation.
QT QueueType
// PinWorkers enables CPU pinning for worker goroutines.
//
// When enabled, workers may be locked to OS threads to reduce
// migration and improve cache locality.
PinWorkers bool
}
Options configure the behavior of a worker Pool.
Any zero-value fields are replaced with sensible defaults when FillDefaults is called.
func (*Options) FillDefaults ¶
func (o *Options) FillDefaults()
FillDefaults replaces zero-value fields with default settings.
It is called internally by the Pool constructor and may also be used by callers who construct Options manually.
type Pool ¶
type Pool[T any, M MetricsPolicy] struct { // OnInternaError is called when the pool encounters // an unexpected internal error. OnInternalError ErrorHandler // OnJobError is called when a job function returns an error. OnJobError ErrorHandler // contains filtered or unexported fields }
Pool is a high-performance worker pool with batched scheduling.
It combines:
- a lock-free / low-contention queue
- explicit worker wake-ups
- batching to amortize scheduling overhead
The pool is safe for concurrent use.
func NewPool ¶
func NewPool[M MetricsPolicy, T any](metrics M, opts ...Option) *Pool[T, M]
NewPool creates a new Pool using the provided metrics implementation and optional configuration options.
func NewPoolFromOptions ¶
func NewPoolFromOptions[M MetricsPolicy, T any](metrics M, opts Options) *Pool[T, M]
NewPoolFromOptions creates a new Pool from a fully specified Options struct.
func (*Pool[T, M]) ActiveWorkers ¶
ActiveWorkers returns the number of workers currently marked as active.
A worker is considered active if it has been started and not yet exited. This does not necessarily mean the worker is currently executing a job.
func (*Pool[T, M]) GetIdleLen ¶
GetIdleLen returns the number of currently idle workers.
func (*Pool[T, M]) Metrics ¶
func (p *Pool[T, M]) Metrics() *M
Metrics returns a snapshot of the current pool metrics.
The returned value should be treated as read-only. Metrics collection is implementation-defined by the MetricsPolicy.
func (*Pool[T, M]) Shutdown ¶
Shutdown gracefully stops the pool, waiting for workers to finish or until the provided context is canceled.
func (*Pool[T, M]) StatSnapshot ¶
type QueueType ¶
type QueueType int
QueueType defines the scheduling strategy used by the worker pool.
Different queue types determine how jobs are ordered, grouped, and selected for execution by the scheduler.
type RevolvingBucketOptions ¶
type RevolvingBucketQ ¶
type RevolvingBucketQ[T any] struct { // contains filtered or unexported fields }
func NewRevolvingBucketQ ¶
func NewRevolvingBucketQ[T any](opts Options) *RevolvingBucketQ[T]
func (*RevolvingBucketQ[T]) BatchPop ¶
func (rq *RevolvingBucketQ[T]) BatchPop() (Batch[T], bool)
func (*RevolvingBucketQ[T]) Len ¶
func (rq *RevolvingBucketQ[T]) Len() int
Len returns an approximate number of jobs in the queue. Currently unimplemented.
func (*RevolvingBucketQ[T]) MaybeHasWork ¶
func (rq *RevolvingBucketQ[T]) MaybeHasWork() bool
MaybeHasWork performs a fast, approximate check for available work.
func (*RevolvingBucketQ[T]) OnBatchDone ¶
func (rq *RevolvingBucketQ[T]) OnBatchDone(b Batch[T])
func (*RevolvingBucketQ[T]) Push ¶
func (rq *RevolvingBucketQ[T]) Push(job Job[T]) error
func (*RevolvingBucketQ[T]) StatSnapshot ¶
func (rq *RevolvingBucketQ[T]) StatSnapshot() string
type WakeupWorker ¶
type WakeupWorker chan struct{}
WakeupWorker is a lightweight signal channel used to wake an idle worker.