workerpool

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: GPL-3.0 Imports: 5 Imported by: 0

README

Worker Pool

A thread-safe, context-aware worker pool implementation for concurrent task processing in Go.

Features

  • Bounded Parallelism: Configurable number of worker goroutines
  • Context-Aware: Full support for context cancellation and timeouts
  • Graceful Shutdown: Drains in-flight work before terminating
  • Thread-Safe: Safe concurrent submission from multiple goroutines
  • Channel-Based: No shared mutable state, communication via channels only
  • Error Coordination: Uses errgroup for worker lifecycle management

Installation

go get github.com/grhili/cd-operator/pkg/workerpool

Usage

Basic Example
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/grhili/cd-operator/pkg/workerpool"
)

func main() {
    // Define work processing function
    workerFunc := func(ctx context.Context, work workerpool.Work) error {
        task := work.(string)
        fmt.Printf("Processing: %s\n", task)
        return nil
    }

    // Create pool with 5 workers
    pool := workerpool.New(5, workerFunc)

    ctx := context.Background()

    // Start the pool
    if err := pool.Start(ctx); err != nil {
        panic(err)
    }

    // Submit work items
    for i := 0; i < 10; i++ {
        if err := pool.Submit(ctx, fmt.Sprintf("task-%d", i)); err != nil {
            panic(err)
        }
    }

    // Graceful shutdown with timeout
    shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    if err := pool.Shutdown(shutdownCtx); err != nil {
        panic(err)
    }
}
Context Cancellation
// Create cancellable context
ctx, cancel := context.WithCancel(context.Background())

pool := workerpool.New(3, workerFunc)
pool.Start(ctx)

// Submit work
pool.Submit(ctx, "task-1")

// Cancel context to stop all workers
cancel()

// Graceful shutdown
shutdownCtx, _ := context.WithTimeout(context.Background(), time.Second)
pool.Shutdown(shutdownCtx)
Concurrent Submissions
pool := workerpool.New(5, workerFunc)
pool.Start(ctx)

var wg sync.WaitGroup

// Multiple goroutines can safely submit work
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(id int) {
        defer wg.Done()
        pool.Submit(ctx, id)
    }(i)
}

wg.Wait()
pool.Shutdown(ctx)

API

Types
Work
type Work interface{}

Represents a unit of work. Can be any type.

WorkerFunc
type WorkerFunc func(ctx context.Context, work Work) error

Function signature for processing work items.

Pool
type Pool interface {
    Start(ctx context.Context) error
    Submit(ctx context.Context, work Work) error
    Shutdown(ctx context.Context) error
}
Functions
New
func New(size int, workerFunc WorkerFunc) Pool

Creates a new worker pool with the specified number of workers. The work channel is buffered to size * 2.

Methods
Start
func (p *Pool) Start(ctx context.Context) error

Spawns worker goroutines and begins processing. Must be called before Submit. Returns error if called multiple times or after shutdown.

Submit
func (p *Pool) Submit(ctx context.Context, work Work) error

Adds work to the pool's queue. Blocks if queue is full until space is available or context is cancelled.

Returns:

  • ErrPoolNotStarted if called before Start
  • ErrPoolShutdown if called after Shutdown
  • Context error if context is cancelled
Shutdown
func (p *Pool) Shutdown(ctx context.Context) error

Initiates graceful shutdown. Closes work channel, waits for in-flight work to complete, and returns when all workers finish or context times out.

Idempotent - safe to call multiple times.

Design

Architecture
  • Work Channel: Buffered channel (size * 2) for work queue
  • Worker Goroutines: Fixed number spawned at Start()
  • errgroup: Coordinates worker lifecycle and error propagation
  • Thread Safety: sync.RWMutex protects pool state
Lifecycle
  1. Creation: New() creates pool with work channel
  2. Start: Spawns worker goroutines that listen on work channel
  3. Submit: Pushes work items to channel (blocks if full)
  4. Processing: Workers pull from channel and call WorkerFunc
  5. Shutdown: Closes channel, workers drain and exit
Cancellation

Context cancellation propagates to:

  • Workers (stop processing new work)
  • Submit calls (fail immediately)
  • WorkerFunc (via context parameter)
Memory
  • Work channel buffer: workers * 2 items
  • Memory usage scales with: O(workers + buffer_size)
  • No unbounded queues - backpressure via blocking Submit

Error Handling

  • ErrPoolNotStarted: Submit/Shutdown called before Start
  • ErrPoolShutdown: Submit called after Shutdown
  • Context errors propagate from cancelled contexts
  • WorkerFunc errors are ignored (workers continue processing)

Testing

Run tests:

go test ./pkg/workerpool/...

Run with race detector:

go test -race ./pkg/workerpool/...

Performance

  • Throughput: Scales linearly with worker count (up to CPU limit)
  • Latency: Minimal overhead - channel operations only
  • Memory: Fixed allocation based on worker count and buffer size
  • Concurrency: Lock-free submission path when buffer has space

Best Practices

  1. Worker Count: Set to runtime.NumCPU() for CPU-bound work, higher for I/O-bound
  2. WorkerFunc: Keep lightweight, handle errors internally if needed
  3. Shutdown Timeout: Set generously based on max work item duration
  4. Context: Always pass context for cancellation support
  5. Work Type: Use specific types, not interface{} when possible (via type assertion)

Thread Safety

All methods are thread-safe:

  • Multiple goroutines can call Submit concurrently
  • Start, Shutdown protected by mutex
  • Channel operations are inherently thread-safe

Limitations

  • Fixed worker count (no dynamic scaling)
  • FIFO queue only (no priority support)
  • No work retry mechanism (implement in WorkerFunc if needed)
  • No work result collection (implement in WorkerFunc if needed)

Documentation

Overview

Example

Example demonstrates basic usage of the worker pool.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/grhili/cd-operator/pkg/workerpool"
)

func main() {
	// Define work processing function.
	workerFunc := func(ctx context.Context, work workerpool.Work) error {
		item := work.(string)
		fmt.Printf("Processing: %s\n", item)
		return nil
	}

	// Create pool with 3 workers.
	pool := workerpool.New(3, workerFunc)

	ctx := context.Background()

	// Start the pool.
	if err := pool.Start(ctx); err != nil {
		panic(err)
	}

	// Submit work items.
	items := []string{"task1", "task2", "task3"}
	for _, item := range items {
		if err := pool.Submit(ctx, item); err != nil {
			panic(err)
		}
	}

	// Graceful shutdown with timeout.
	shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()

	if err := pool.Shutdown(shutdownCtx); err != nil {
		panic(err)
	}

	fmt.Println("All work completed")
}
Example (Backpressure)

Example_backpressure demonstrates handling backpressure when the queue is full.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/grhili/cd-operator/pkg/workerpool"
)

func main() {
	// Slow worker to create backpressure.
	workerFunc := func(ctx context.Context, work workerpool.Work) error {
		time.Sleep(200 * time.Millisecond)
		return nil
	}

	// Small pool with limited buffer.
	pool := workerpool.New(1, workerFunc)

	ctx := context.Background()
	if err := pool.Start(ctx); err != nil {
		panic(err)
	}

	// Fill buffer (size=1, buffer=2*size=2, so total capacity=3).
	// First submit starts processing, next 2 go in buffer, 4th will block.
	for i := 0; i < 4; i++ {
		submitCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
		err := pool.Submit(submitCtx, i)
		cancel()

		if err != nil {
			fmt.Printf("Submit %d: blocked\n", i)
		} else {
			fmt.Printf("Submit %d: queued\n", i)
		}
	}

	shutdownCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
	defer cancel()
	if err := pool.Shutdown(shutdownCtx); err != nil {
		panic(err)
	}

}
Output:
Submit 0: queued
Submit 1: queued
Submit 2: queued
Submit 3: blocked
Example (ContextCancellation)

Example_contextCancellation demonstrates context cancellation.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/grhili/cd-operator/pkg/workerpool"
)

func main() {
	workerFunc := func(ctx context.Context, work workerpool.Work) error {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-time.After(100 * time.Millisecond):
			fmt.Printf("Processed: %v\n", work)
			return nil
		}
	}

	pool := workerpool.New(2, workerFunc)

	// Create cancellable context.
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	if err := pool.Start(ctx); err != nil {
		panic(err)
	}

	// Submit some work.
	_ = pool.Submit(ctx, "item1")

	// Cancel context to stop workers.
	cancel()

	// Wait for graceful shutdown.
	shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), time.Second)
	defer shutdownCancel()

	_ = pool.Shutdown(shutdownCtx)

	fmt.Println("Pool stopped")
}
Example (GracefulShutdown)

Example_gracefulShutdown demonstrates proper shutdown handling.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/grhili/cd-operator/pkg/workerpool"
)

func main() {
	completed := make(chan int, 5)

	workerFunc := func(ctx context.Context, work workerpool.Work) error {
		id := work.(int)
		time.Sleep(50 * time.Millisecond)
		completed <- id
		return nil
	}

	pool := workerpool.New(2, workerFunc)

	ctx := context.Background()
	if err := pool.Start(ctx); err != nil {
		panic(err)
	}

	// Submit work.
	for i := 0; i < 5; i++ {
		if err := pool.Submit(ctx, i); err != nil {
			panic(err)
		}
	}

	// Shutdown waits for all in-flight work to complete.
	shutdownCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
	defer cancel()

	if err := pool.Shutdown(shutdownCtx); err != nil {
		panic(err)
	}
	close(completed)

	// All work completes before shutdown returns.
	count := 0
	for range completed {
		count++
	}

	fmt.Printf("Completed %d tasks before shutdown\n", count)
}
Output:
Completed 5 tasks before shutdown
Example (RealWorld)

Example_realWorld demonstrates a realistic use case processing tasks with error handling and metrics.

package main

import (
	"context"
	"fmt"
	"sync/atomic"
	"time"

	"github.com/grhili/cd-operator/pkg/workerpool"
)

// Task represents a unit of work with metadata.
type Task struct {
	ID       int
	Payload  string
	Priority int
}

func main() {
	var (
		processed atomic.Int64
		failed    atomic.Int64
	)

	// Worker function with error handling and metrics.
	workerFunc := func(ctx context.Context, work workerpool.Work) error {
		task := work.(Task)

		// Process task.
		if task.ID%10 == 9 {
			// Simulate occasional failure.
			failed.Add(1)
			return fmt.Errorf("task %d failed", task.ID)
		}

		processed.Add(1)
		return nil
	}

	// Create pool with optimal worker count.
	pool := workerpool.New(3, workerFunc)

	ctx := context.Background()

	// Start pool.
	if err := pool.Start(ctx); err != nil {
		panic(err)
	}

	// Submit tasks.
	for i := 0; i < 5; i++ {
		task := Task{
			ID:       i,
			Payload:  fmt.Sprintf("data-%d", i),
			Priority: i % 3,
		}

		if err := pool.Submit(ctx, task); err != nil {
			fmt.Printf("Failed to submit task %d: %v\n", i, err)
		}
	}

	// Graceful shutdown with generous timeout.
	shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()

	if err := pool.Shutdown(shutdownCtx); err != nil {
		fmt.Printf("Shutdown error: %v\n", err)
	}

	// Report metrics.
	fmt.Printf("Processed: %d, Failed: %d\n", processed.Load(), failed.Load())
}
Output:
Processed: 5, Failed: 0

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrPoolNotStarted is returned when Submit is called before Start.
	ErrPoolNotStarted = errors.New("pool not started")

	// ErrPoolShutdown is returned when Submit is called after Shutdown.
	ErrPoolShutdown = errors.New("pool is shut down")
)

Functions

This section is empty.

Types

type Pool

type Pool interface {
	// Start spawns the worker goroutines and begins processing work.
	// It must be called before Submit. The pool will run until the context
	// is cancelled or Shutdown is called.
	Start(ctx context.Context) error

	// Submit adds work to the pool's queue for processing.
	// It blocks if the queue is full until space becomes available or the context is cancelled.
	// Returns an error if the context is cancelled or the pool is shut down.
	Submit(ctx context.Context, work Work) error

	// Shutdown initiates graceful shutdown of the pool.
	// It stops accepting new work, waits for in-flight work to complete,
	// and returns when all workers have finished or the context times out.
	Shutdown(ctx context.Context) error
}

Pool manages a fixed number of worker goroutines that process work items concurrently. It provides bounded parallelism and graceful shutdown capabilities.

func New

func New(size int, workerFunc WorkerFunc) Pool

New creates a new worker pool with the specified number of workers. The size parameter determines the number of concurrent workers. The workerFunc is called by each worker to process work items. The work channel is buffered to size * 2 to allow some queueing.

type Work

type Work interface{}

Work represents a unit of work to be processed by the worker pool. Implementations should be lightweight and contain only the data needed to perform the work. The actual processing logic is provided via WorkerFunc.

type WorkerFunc

type WorkerFunc func(ctx context.Context, work Work) error

WorkerFunc defines the function signature for processing work items. It receives a context for cancellation and the work item to process. The context will be cancelled if the pool is shutting down or if the parent context is cancelled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL