workerpool

package module
v0.0.0-...-1f86c8e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: MIT Imports: 10 Imported by: 0

README

workerpool — High-Performance Batch Worker Pool for Go

workerpool is a high-throughput, bounded-concurrency worker pool for Go. It is built around a lock-free segmented FIFO queue and batch-based scheduling to minimize contention and allocation overhead under load.

It is designed for workloads where:

  • job execution is cheap,
  • submission rate is high,
  • contention must be minimized,
  • memory allocation must be predictable.

Module:

github.com/azargarov/wpool

Performance at a Glance

Measured on: AMD Ryzen 7 8845HS, Go 1.22, Linux
Configuration: Workers = GOMAXPROCS, SegmentSize = 4096, SegmentCount = auto

Scenario Result
Steady-state throughput ~19.7 M jobs/sec
Scheduler + queue + minimal job cost ~50 ns/op
Allocations per operation 0 allocs/op
Queue type Lock-free segmented FIFO
Producers / consumers MPMC safe

Benchmark excerpt:

BenchmarkPool/W=GOMAX_S,C=128-16
22668182 ops · 50.7 ns/op · 19.73 Mjobs/sec · 0 allocs/op

These numbers reflect scheduler + queue overhead only with minimal job bodies. Actual throughput depends on job cost, segment sizing, and CPU topology.


Core Design

At its core, workerpool combines:

  • A lock-free segmented queue (MPMC)
  • Batch draining instead of per-job wakeups
  • Bounded concurrency with a fixed worker set
  • Minimal synchronization between producers and consumers
  • Explicit memory reuse via a segment pool

The queue is optimized to keep producers and consumers mostly independent, reducing cache-line contention and CAS pressure.


Features

  • Bounded concurrency
    • Fixed number of worker goroutines
    • No unbounded goroutine spawning
  • Lock-free segmented FIFO queue
    • Multiple producers
    • Batch-based consumption
  • Batch scheduling
    • Workers process jobs in batches for cache efficiency
    • Reduces wakeups and atomic traffic
  • Explicit memory reuse
    • Preallocated queue segments
    • Segment recycling with generation counters
  • Context-aware jobs
    • Submission respects context.Context
  • Panic-safe execution
    • Workers are isolated from job panics
  • Graceful shutdown
    • Deadline-aware draining
  • Low-overhead metrics hook
    • Metrics policy is injected, not hardcoded
  • Optional CPU pinning
    • Linux-only, workload-dependent

Installation

go get github.com/azargarov/wpool

Quick Start

package main

import (
	"context"
	"fmt"

	wp "github.com/azargarov/wpool"
)

func main() {
	pool := wp.NewPool(
		wp.NoopMetrics{},
		wp.WithWorkers(4),
		wp.WithSegmentSize(4096),
		wp.WithSegmentCount(64),
	)

	defer pool.Stop()

	_ = pool.Submit(wp.Job[int]{
		Payload: 42,
		Ctx:     context.Background(),
		Fn: func(n int) error {
			fmt.Println("processing", n)
			return nil
		},
	}, 0)
}

Job Model

type Job[T any] struct {
	Payload     T
	Fn          func(T) error
	Ctx         context.Context
	CleanupFunc func()
}
  • Ctx is checked before execution
  • CleanupFunc is guaranteed to run after execution
  • Jobs are dequeued in FIFO order within the scheduler queue

Note: basePrio is currently unused by the default queue and exists as a forward-compatible hook for future schedulers.


Queue Implementation

Segmented Queue
  • Queue consists of linked segments
  • Each segment contains:
    • job buffer
    • readiness bitmap
    • producer / consumer cursors
  • Producers append using CAS on a per-segment reserve index
  • Consumers drain contiguous ready ranges as batches

Key properties:

  • No global locks
  • No per-job wakeups
  • Minimal false sharing
  • Segment reuse via generation counters (ABA-safe)

Batch Processing

Workers wake up only when:

  • enough jobs are pending, or
  • a batch timer fires

This allows:

  • amortized synchronization cost
  • better cache locality
  • predictable throughput under load

Shutdown Semantics

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := pool.Shutdown(ctx); err != nil {
	// deadline exceeded
}
  • Prevents new submissions
  • Drains queued work
  • Waits for workers to finish or deadline to expire

Metrics

Metrics are policy-driven:

type MetricsPolicy interface {
	IncQueued()
	BatchDecQueued(n int)
}

This keeps the hot path free of unnecessary overhead.


Options

type Options struct {
	Workers       int
	SegmentSize   uint32
	SegmentCount  uint32
	PoolCapacity  uint32
	QT            QueueType
	PinWorkers    bool
}

Defaults are applied automatically via FillDefaults().


QueueType

Currently implemented:

  • SegmentedQueue — lock-free FIFO queue

QueueType exists as an extension point. Other schedulers (bucketed, priority, aging) are not yet wired in.


What This Is (and Isn’t)

This is:

  • a high-performance execution engine
  • suitable for internal systems, pipelines, schedulers
  • ideal when you care about ns/op and cache lines

This is not:

  • a feature-rich task framework
  • a priority scheduler (yet)
  • a general-purpose job system

Roadmap (Explicitly Non-Promissory)

Planned directions (not yet implemented):

  • Bucket-based priority scheduler
  • Aging via queue rotation
  • Adaptive segment provisioning
  • NUMA-aware worker placement

These are design directions, not guarantees.

Documentation

Overview

Package workerpool provides high-performance concurrency primitives for building scalable worker pools and schedulers.

Design goals

The package is designed around the following principles:

  • Minimize allocations and garbage collection pressure
  • Avoid locks on hot paths
  • Reduce scheduler and wake-up overhead
  • Provide predictable throughput under high contention

Rather than optimizing for minimal latency of a single task, workerpool optimizes for sustained throughput and stability when handling large volumes of short-lived jobs.

Architecture overview

The worker pool is composed of three loosely coupled layers:

  1. Scheduling (schedQueue) Responsible for ordering, batching, and dequeuing jobs. Different queue implementations may be plugged in without modifying the pool or worker logic.

  2. Execution (Pool / workers) Workers fetch batches of jobs and execute them sequentially. Parallelism is achieved across workers, not within a batch.

  3. Job lifecycle Jobs carry their payload, execution function, optional context, and optional cleanup logic.

Batching model

Jobs are dequeued in batches to amortize scheduling overhead such as atomic operations, cache misses, and worker wake-ups.

Important: batching amortizes scheduling, not execution.

Jobs within a batch are executed sequentially by a single worker. This preserves cache locality, avoids goroutine churn, and keeps execution costs predictable.

Parallelism is achieved by running multiple workers concurrently, each processing its own batches.

Queue design

The default scheduler uses a lock-free segmented FIFO queue. Jobs are stored in fixed-size segments linked together dynamically.

Key properties of the segmented queue:

  • Multiple producers can enqueue concurrently
  • Consumers dequeue contiguous batches
  • Memory is aggressively reused via segment recycling
  • Generation counters prevent ABA issues without clearing buffers

The queue design is optimized for workloads with many producers and relatively small, fast jobs.

Error handling

The pool distinguishes between two classes of errors:

  • Job errors: returned by job functions or produced by panic recovery
  • Internal errors: unexpected failures inside the pool itself

Errors are reported via user-provided handlers and do not stop worker execution. Panics inside jobs are recovered to prevent worker termination.

CPU pinning

On Linux, workers may optionally be pinned to specific CPUs. When enabled, workers are locked to OS threads and restricted to run on a single CPU core.

This can improve cache locality and reduce scheduler-induced migration for CPU-bound workloads, but is not universally beneficial.

Intended use cases

workerpool is well suited for:

  • High-throughput task execution
  • Fan-in / fan-out pipelines
  • CPU-bound or cache-sensitive workloads
  • Systems where allocation behavior matters

It is not intended as a general-purpose goroutine replacement or for workloads dominated by blocking I/O.

Extensibility

The scheduling layer is intentionally abstracted to allow experimentation with alternative queue designs, such as:

  • Priority queues
  • Bucket-based schedulers
  • Time-sliced or aging queues

New queue types can be introduced without changing the worker execution model or public API.

Index

Constants

View Source
const (
	// DefaultSegmentSize is the default number of jobs per segment.
	// It should be large enough to amortize allocation costs but
	// small enough to fit comfortably in cache.
	DefaultSegmentSize = 4096

	DefaultFastPutGet = 1024
)

Variables

View Source
var (
	ErrInvalidPriority = errors.New("bucket queue: invalid priority")
	ErrPushToActive    = errors.New("bucket queue: push to active bucket")
	ErrPushSegmentedQ  = errors.New("bucket queue: failed to push into segmented queue")
)
View Source
var (
	// ErrQueueFull is returned when the underlying queue
	// cannot accept more jobs.
	ErrQueueFull = errors.New("queue: queue is full")

	// ErrNilFunc is returned when a submitted Job has a nil Fn.
	ErrNilFunc = errors.New("queue: job func is nil")
)
View Source
var DefaultSegmentCount uint32 = uint32(runtime.GOMAXPROCS(0) * 16)

DefaultSegmentCount defines the default number of preallocated segments. It scales with GOMAXPROCS to reduce contention under load.

View Source
var (
	// ErrClosed is returned when submitting a job to a pool
	// that has already been shut down.
	ErrClosed = errors.New("workerpool: pool is closed")
)
View Source
var ErrPoolPanic = errors.New("workerpool: job panic. ")

ErrPoolPanic is returned when a job function panics.

Panics are recovered to prevent worker termination and are converted into regular errors.

Functions

func NewSegmentPool

func NewSegmentPool[T any](pageSize uint32, prefill int, maxKeep int, fastPut int, fastGet int) *segmentPool[T]

func NewSegmentedQ

func NewSegmentedQ[T any](opts Options, spool segmentPoolProvider[T]) *segmentedQ[T]

NewSegmentedQ initializes a segmented queue with preallocated segments.

func PinToCPU

func PinToCPU(cpu int) error

PinToCPU pins the current OS thread to a specific CPU.

It restricts the calling thread to run only on the given CPU core. This is typically used in conjunction with runtime.LockOSThread to improve cache locality and reduce scheduler-induced migration.

This function is Linux-specific and has no effect on other platforms.

func ShedDumpStats

func ShedDumpStats()

Types

type AtomicMetrics

type AtomicMetrics struct {
	// contains filtered or unexported fields
}

AtomicMetrics is a lock-free metrics implementation backed by atomics.

Writes are optimized for hot paths. Reads are intended for cold-path observation.

func (*AtomicMetrics) BatchDecQueued

func (m *AtomicMetrics) BatchDecQueued(n int64)

BatchDecQueued decrements the queued jobs counter by

func (*AtomicMetrics) Executed

func (m *AtomicMetrics) Executed() uint64

Executed returns the total number of executed jobs. Intended for cold-path observation.

func (*AtomicMetrics) IncExecuted

func (m *AtomicMetrics) IncExecuted()

IncExecuted increments the executed jobs counter by one.

func (*AtomicMetrics) IncQueued

func (m *AtomicMetrics) IncQueued()

IncQueued increments the queued jobs counter by one.

func (*AtomicMetrics) Queued

func (m *AtomicMetrics) Queued() int64

Queued returns the current number of queued jobs. Intended for cold-path observation.

type Batch

type Batch[T any] struct {
	Jobs []Job[T]
	Seg  *segment[T]
	End  uint32

	// Meta is an optional, queue-private field.
	// It is opaque to the pool and interpreted only by the queue implementation.
	Meta any
}

Batch represents a contiguous group of jobs dequeued from a schedQueue.

The batch must be completed by calling OnBatchDone on the originating queue to allow proper resource reclamation.

type ErrorHandler

type ErrorHandler func(e error)

ErrorHandler is a user-provided callback invoked on internal or job-level errors.

type Job

type Job[T any] struct {
	Payload T
	Fn      JobFunc[T]
	Flags   uint64
	Meta    *JobMeta
}

Job represents a single unit of work submitted to the pool.

Payload is passed to Fn when executed. Ctx controls cancellation before execution. CleanupFunc, if set, is executed after job completion.

func (Job[T]) GetPriority

func (j Job[T]) GetPriority() JobPriority

func (*Job[T]) SetPriority

func (j *Job[T]) SetPriority(p JobPriority)

type JobFunc

type JobFunc[T any] func(T) error

JobFunc is the function executed by a worker for a given job payload.

type JobMeta

type JobMeta struct {
	Ctx         context.Context
	CleanupFunc func()
}

type JobPriority

type JobPriority uint8
const (
	MinBucketPriority JobPriority = 1
	MaxBucketPriority JobPriority = 63
	BucketCount                   = 64
)

type MetricsPolicy

type MetricsPolicy interface {

	// IncExecuted increments the executed jobs counter.
	IncExecuted()

	// IncQueued increments the queued jobs counter.
	IncQueued()

	// BatchDecQueued decrements the queued counter by n.
	//
	// This is typically used when a batch of jobs is removed
	// from the scheduling queue.
	BatchDecQueued(n int64)
}

MetricsPolicy defines hooks used by the worker pool to report queueing and execution activity.

Implementations must be safe for concurrent use. All methods are expected to be lightweight and non-blocking

type NoopMetrics

type NoopMetrics struct{}

NoopMetrics is a MetricsPolicy implementation that discards all metric updates.

It can be used when metrics collection is disabled and zero overhead is desired.

func (*NoopMetrics) BatchDecQueued

func (m *NoopMetrics) BatchDecQueued(n int64)

func (*NoopMetrics) IncExecuted

func (m *NoopMetrics) IncExecuted()

func (*NoopMetrics) IncQueued

func (m *NoopMetrics) IncQueued()

type Option

type Option func(*Options)

Option configures a worker Pool.

func WithPinnedWorkers

func WithPinnedWorkers(enabled bool) Option

WithPinnedWorkers enables CPU pinning for workers.

func WithQT

func WithQT(qt QueueType) Option

WithQT sets the scheduler queue type.

func WithSegmentCount

func WithSegmentCount(n int) Option

WithSegmentCount sets the queue initial segment count.

func WithSegmentSize

func WithSegmentSize(n uint32) Option

WithSegmentSize sets the queue segment size.

func WithWorkers

func WithWorkers(n int) Option

WithWorkers sets the number of worker goroutines.

type Options

type Options struct {
	// Workers is the number of worker goroutines.
	//
	// Defaults to runtime.GOMAXPROCS(0).
	Workers int

	// SegmentSize is the number of jobs stored in a single queue segment.
	//
	// Larger values reduce segment churn but increase batch scan cost.
	SegmentSize uint32

	// SegmentCount is the number of queue segments preallocated on startup.
	//
	// Increasing this value reduces allocations under load at the cost
	// of higher baseline memory usage.
	SegmentCount uint32

	// PoolCapacity limits the number of reusable segments kept
	// in the internal segment pool.
	//
	// Larger values trade memory for fewer allocations.
	PoolCapacity uint32

	// QT selects the scheduler queue implementation.
	QT QueueType

	// PinWorkers enables CPU pinning for worker goroutines.
	//
	// When enabled, workers may be locked to OS threads to reduce
	// migration and improve cache locality.
	PinWorkers bool
}

Options configure the behavior of a worker Pool.

Any zero-value fields are replaced with sensible defaults when FillDefaults is called.

func (*Options) FillDefaults

func (o *Options) FillDefaults()

FillDefaults replaces zero-value fields with default settings.

It is called internally by the Pool constructor and may also be used by callers who construct Options manually.

type Pool

type Pool[T any, M MetricsPolicy] struct {

	// OnInternaError is called when the pool encounters
	// an unexpected internal error.
	OnInternalError ErrorHandler

	// OnJobError is called when a job function returns an error.
	OnJobError ErrorHandler
	// contains filtered or unexported fields
}

Pool is a high-performance worker pool with batched scheduling.

It combines:

  • a lock-free / low-contention queue
  • explicit worker wake-ups
  • batching to amortize scheduling overhead

The pool is safe for concurrent use.

func NewPool

func NewPool[M MetricsPolicy, T any](metrics M, opts ...Option) *Pool[T, M]

NewPool creates a new Pool using the provided metrics implementation and optional configuration options.

func NewPoolFromOptions

func NewPoolFromOptions[M MetricsPolicy, T any](metrics M, opts Options) *Pool[T, M]

NewPoolFromOptions creates a new Pool from a fully specified Options struct.

func (*Pool[T, M]) ActiveWorkers

func (p *Pool[T, M]) ActiveWorkers() int

ActiveWorkers returns the number of workers currently marked as active.

A worker is considered active if it has been started and not yet exited. This does not necessarily mean the worker is currently executing a job.

func (*Pool[T, M]) GetIdleLen

func (p *Pool[T, M]) GetIdleLen() int64

GetIdleLen returns the number of currently idle workers.

func (*Pool[T, M]) Metrics

func (p *Pool[T, M]) Metrics() *M

Metrics returns a snapshot of the current pool metrics.

The returned value should be treated as read-only. Metrics collection is implementation-defined by the MetricsPolicy.

func (*Pool[T, M]) Shutdown

func (p *Pool[T, M]) Shutdown(ctx context.Context) error

Shutdown gracefully stops the pool, waiting for workers to finish or until the provided context is canceled.

func (*Pool[T, M]) StatSnapshot

func (p *Pool[T, M]) StatSnapshot() string

func (*Pool[T, M]) Stop

func (p *Pool[T, M]) Stop()

Stop shuts down the pool using a background context.

func (*Pool[T, M]) Submit

func (p *Pool[T, M]) Submit(job Job[T]) error

Submit enqueues a job for execution.

It may trigger a worker wake-up depending on batching state. Submit is non-blocking and safe for concurrent use.

type QueueType

type QueueType int

QueueType defines the scheduling strategy used by the worker pool.

Different queue types determine how jobs are ordered, grouped, and selected for execution by the scheduler.

const (
	// SegmentedQueue uses a lock-free segmented FIFO queue.
	//
	// It is optimized for high-throughput workloads with many producers
	// and batch-oriented consumption.
	SegmentedQueue QueueType = iota

	RevolvingBucketQueue
)

func (QueueType) String

func (qt QueueType) String() string

String returns the human-readable name of the queue type.

type RevolvingBucketOptions

type RevolvingBucketOptions struct {
	SegmentSize  uint32
	SegmentCount uint32
	PoolCapacity int
}

type RevolvingBucketQ

type RevolvingBucketQ[T any] struct {
	// contains filtered or unexported fields
}

func NewRevolvingBucketQ

func NewRevolvingBucketQ[T any](opts Options) *RevolvingBucketQ[T]

func (*RevolvingBucketQ[T]) BatchPop

func (rq *RevolvingBucketQ[T]) BatchPop() (Batch[T], bool)

func (*RevolvingBucketQ[T]) Len

func (rq *RevolvingBucketQ[T]) Len() int

Len returns an approximate number of jobs in the queue. Currently unimplemented.

func (*RevolvingBucketQ[T]) MaybeHasWork

func (rq *RevolvingBucketQ[T]) MaybeHasWork() bool

MaybeHasWork performs a fast, approximate check for available work.

func (*RevolvingBucketQ[T]) OnBatchDone

func (rq *RevolvingBucketQ[T]) OnBatchDone(b Batch[T])

func (*RevolvingBucketQ[T]) Push

func (rq *RevolvingBucketQ[T]) Push(job Job[T]) error

func (*RevolvingBucketQ[T]) StatSnapshot

func (rq *RevolvingBucketQ[T]) StatSnapshot() string

type WakeupWorker

type WakeupWorker chan struct{}

WakeupWorker is a lightweight signal channel used to wake an idle worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL