wpool

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2025 License: MIT Imports: 5 Imported by: 0

README

wpool

CI Status codecov Go Report Card PkgGoDev

The wpool module implements a bounded worker pool. It provides a safe, concurrent processing system by wrapping a channel and spawning multiple (fixed number of) workers to read from it. More specifically:

  • Generic type of the channel payload.
  • Spawns a fixed number of workers (configurable) that receive from channel and perform a configurable callback to each one.
  • The Submit(context.Context, Item) function lets external code enqueue items for processing.
  • Graceful shutdown when Stop(context.Context) is called, in steps:
    1. Stop blocks till all the next steps are done.
    2. Worker pool stops accepting new items, any code that performs submit after the Stop has been called, it will receive an error ErrWorkerPoolStopped.
    3. After it is ensured that no new items can be sent to the main channel, the main channel is closed.
    4. Wait for all workers go routines to finish mid-flight processing and pending items in channel and then return.
    5. Then Stop unblocks and finishes.

diagram

Why and What

The wpool should be used when you want to have multiple senders (e.g. events from different services/sources) with graceful shutdown and/or capped worker approach over channel receiving.

Bounded / Capped workers approach

A non-capped approach: In some cases, we can spawn a go routine for each item received from the channel. That might work well in cases where each task is small/short-lived and/or the overall number of tasks is bounded or we are sure that the system can handle the maximum concurrency.

But when the task performed for each item is not small/short-lived, or the maximum concurrency (submission of items) is not distributed smoothly over time and instead we have sudden and large bursts of submissions, then the bounded / capped worker approach is much more efficient performance wise since it will not "starve" cpu and memory resources trying to spawn (and context switch between) a large number of go routines.

Closing channel and graceful shutdown

In go code, and more specifically in the comment of channel close function this is documented (link):

The close built-in function closes a channel, which must be either bidirectional or send-only. It should be executed only by the sender, never the receiver, and has the effect of shutting down the channel after the last sent value is received.

This is, in general, the principle that should be followed: the sender should close the channel.

But in some cases (let's say an application-wide event queue), we might want different code parts to be able to send events. In that case, the sender is not the sole owner of the channel, and closing the channel could lead to panic if other parts of the code (other senders) try to send to it after closing.

This solution wraps a graceful shutdown process where it first stops accepting new submissions, meaning that any attempt for submission after that point will result in returning an error. Then, closes the main channel and waits for all workers to finish processing all items already submitted to the channel (in case of buffered channel) or finish mid-flight processing. After all the workers are done and return, the stop function unblocks and finishes.

Shutdown Modes

The worker pool supports two shutdown modes that control how the pool behaves during shutdown:

ShutdownModeDrain (Default)
  • Uses read-write mutex locking to ensure thread safety during shutdown
  • Waits for all queued items to be processed before shutting down
  • Guarantees that all submitted items are processed
  • Provides the most graceful shutdown experience
ShutdownModeImmediate
  • Uses lock-free operations for better performance
  • Workers stop immediately when the shutdown is initiated
  • May not process all queued items if shutdown occurs
  • Faster shutdown but potentially loses pending work

You can configure the shutdown mode using the WithShutdownMode option:

// Use drain mode (default)
p := wpool.NewWorkerPool(callback, wpool.WithShutdownMode(wpool.ShutdownModeDrain))

// Use immediate mode
p := wpool.NewWorkerPool(callback, wpool.WithShutdownMode(wpool.ShutdownModeImmediate))

Examples

package main

import (
	"context"
	"log/slog"

	"github.com/ifnotnil/wpool"
)

func Callback(ctx context.Context, item string) {
	slog.Default().Info("cb", slog.String("item", item))
}

func main() {
	logger := slog.Default()

	p := wpool.NewWorkerPool(
		Callback,
		wpool.WithChannelBufferSize(100),
		wpool.WithLogger(logger),
	)

	ctx := context.Background()

	p.Start(ctx, 10) // start 10 workers

	p.Submit(ctx, "one")
	p.Submit(ctx, "two")
	p.Submit(ctx, "three")

	p.Stop(context.Background())
}

Documentation

Overview

Package wpool implements a bounded worker pool for safe, concurrent processing.

The wpool module provides a concurrent processing system that wraps a channel and spawns a fixed number of workers to process items. It features:

  • Generic type support for channel payloads
  • Configurable number of workers for bounded concurrency
  • Safe submission of items via Submit(context.Context, Item)
  • Graceful shutdown with Stop(context.Context) that waits for in-flight processing
  • Protection against resource starvation during high-concurrency bursts

Use wpool when you need multiple senders with graceful shutdown capabilities and want to cap worker concurrency over channel receiving, especially for tasks that are not small/short-lived or when facing sudden bursts of submissions.

Example usage:

p := wpool.NewWorkerPool[string](
	callback,
)

p.Start(ctx, 10) // start 10 workers
p.Submit(ctx, "item")
p.Stop(ctx)

Index

Constants

This section is empty.

Variables

View Source
var ErrWorkerPoolStopped = errors.New("worker pool is stopped")

Functions

func WithChannelBufferSize

func WithChannelBufferSize(s int) func(*config)

func WithLogger

func WithLogger(l *slog.Logger) func(*config)

func WithShutdownMode added in v1.1.0

func WithShutdownMode(m ShutdownMode) func(*config)

Types

type ShutdownMode added in v1.1.0

type ShutdownMode int

ShutdownMode defines how the worker pool behaves during shutdown.

const (
	// ShutdownModeDrain waits for all queued items to be processed before shutting down.
	// Uses read-write mutex locking to ensure thread safety during shutdown.
	// This is the default mode and provides the most graceful shutdown experience.
	ShutdownModeDrain ShutdownMode = iota

	// ShutdownModeImmediate stops workers immediately when shutdown is initiated.
	// Uses lock-free operations for better performance but may not process all queued items.
	// Provides faster shutdown but potentially loses pending work.
	ShutdownModeImmediate
)

type WorkerPool

type WorkerPool[T any] struct {
	// contains filtered or unexported fields
}

func NewWorkerPool

func NewWorkerPool[T any](callback func(ctx context.Context, item T), opts ...func(*config)) *WorkerPool[T]

func (*WorkerPool[T]) Start

func (p *WorkerPool[T]) Start(ctx context.Context, numOfWorkers int)

func (*WorkerPool[T]) Stop

func (p *WorkerPool[T]) Stop(ctx context.Context)

func (*WorkerPool[T]) Submit

func (p *WorkerPool[T]) Submit(ctx context.Context, item T) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL