Documentation
¶
Overview ¶
Package wpool implements a bounded worker pool for safe, concurrent processing.
The wpool module provides a concurrent processing system that wraps a channel and spawns a fixed number of workers to process items. It features:
- Generic type support for channel payloads
- Configurable number of workers for bounded concurrency
- Safe submission of items via Submit(context.Context, Item)
- Graceful shutdown with Stop(context.Context) that waits for in-flight processing
- Protection against resource starvation during high-concurrency bursts
Use wpool when you need multiple senders with graceful shutdown capabilities and want to cap worker concurrency over channel receiving, especially for tasks that are not small/short-lived or when facing sudden bursts of submissions.
Example usage:
p := wpool.NewWorkerPool[string]( callback, ) p.Start(ctx, 10) // start 10 workers p.Submit(ctx, "item") p.Stop(ctx)
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrWorkerPoolStopped = errors.New("worker pool is stopped")
Functions ¶
func WithChannelBufferSize ¶
func WithChannelBufferSize(s int) func(*config)
func WithLogger ¶
func WithShutdownMode ¶ added in v1.1.0
func WithShutdownMode(m ShutdownMode) func(*config)
Types ¶
type ShutdownMode ¶ added in v1.1.0
type ShutdownMode int
ShutdownMode defines how the worker pool behaves during shutdown.
const ( // ShutdownModeDrain waits for all queued items to be processed before shutting down. // Uses read-write mutex locking to ensure thread safety during shutdown. // This is the default mode and provides the most graceful shutdown experience. ShutdownModeDrain ShutdownMode = iota // ShutdownModeImmediate stops workers immediately when shutdown is initiated. // Uses lock-free operations for better performance but may not process all queued items. // Provides faster shutdown but potentially loses pending work. ShutdownModeImmediate )
type WorkerPool ¶
type WorkerPool[T any] struct { // contains filtered or unexported fields }
func NewWorkerPool ¶
func NewWorkerPool[T any](callback func(ctx context.Context, item T), opts ...func(*config)) *WorkerPool[T]
func (*WorkerPool[T]) Stop ¶
func (p *WorkerPool[T]) Stop(ctx context.Context)
Click to show internal directories.
Click to hide internal directories.