Documentation
¶
Overview ¶
Package parallel provides generic parallel processing utilities.
Index ¶
- func ForEach[T any](ctx context.Context, items []T, config PoolConfig, ...) (processed int64, firstError error)
- func MapReduce[T any, M any, R any](ctx context.Context, items []T, config PoolConfig, ...) R
- func ParallelAggregate[T any, K comparable, V any](ctx context.Context, items []T, config PoolConfig, ...) map[K]V
- type AggregateResult
- type ChunkProcessor
- type PoolConfig
- type PoolMetrics
- type ProgressTracker
- type Task
- type TaskFunc
- type TaskResult
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ForEach ¶
func ForEach[T any]( ctx context.Context, items []T, config PoolConfig, fn func(ctx context.Context, item T) error, ) (processed int64, firstError error)
ForEach executes a function for each item in parallel. Returns the number of items processed and any error that occurred.
func MapReduce ¶
func MapReduce[T any, M any, R any]( ctx context.Context, items []T, config PoolConfig, mapper func(ctx context.Context, item T) M, reducer func(mapped []M) R, ) R
MapReduce applies a map function to each item in parallel and reduces the results.
func ParallelAggregate ¶
func ParallelAggregate[T any, K comparable, V any]( ctx context.Context, items []T, config PoolConfig, extractor func(item T) (key K, value V), merger func(existing, new V) V, ) map[K]V
ParallelAggregate aggregates data in parallel using per-worker local maps. This avoids lock contention by having each worker maintain its own map, then merging results at the end.
Types ¶
type AggregateResult ¶
type AggregateResult[K comparable, V any] struct { Data map[K]V }
AggregateResult holds the result of parallel aggregation.
type ChunkProcessor ¶
ChunkProcessor processes large datasets by splitting them into chunks and processing each chunk in parallel.
func NewChunkProcessor ¶
func NewChunkProcessor[T any, R any](config PoolConfig) *ChunkProcessor[T, R]
NewChunkProcessor creates a new chunk processor.
func (*ChunkProcessor[T, R]) ProcessChunks ¶
func (p *ChunkProcessor[T, R]) ProcessChunks( ctx context.Context, items []T, processor func(ctx context.Context, chunk []T, workerID int) R, reducer func(results []R) R, ) R
ProcessChunks splits the input into chunks and processes each chunk in parallel. The reducer function combines results from all chunks into a single result.
type PoolConfig ¶
type PoolConfig struct {
// MaxWorkers is the maximum number of concurrent workers.
// Default: min(runtime.NumCPU(), 8)
MaxWorkers int
// TaskBufferSize is the buffer size for the task channel.
// Default: MaxWorkers * 2
TaskBufferSize int
// Timeout is the maximum time for the entire operation.
// Default: 0 (no timeout)
Timeout time.Duration
// CollectMetrics enables collection of execution metrics.
CollectMetrics bool
}
PoolConfig configures the worker pool behavior.
func DefaultPoolConfig ¶
func DefaultPoolConfig() PoolConfig
DefaultPoolConfig returns a default pool configuration.
func (PoolConfig) WithMetrics ¶
func (c PoolConfig) WithMetrics() PoolConfig
WithMetrics returns a new config with metrics collection enabled.
func (PoolConfig) WithTimeout ¶
func (c PoolConfig) WithTimeout(d time.Duration) PoolConfig
WithTimeout returns a new config with the specified timeout.
func (PoolConfig) WithWorkers ¶
func (c PoolConfig) WithWorkers(n int) PoolConfig
WithWorkers returns a new config with the specified number of workers.
type PoolMetrics ¶
type PoolMetrics struct {
TotalTasks int64
CompletedTasks int64
FailedTasks int64
TotalDuration time.Duration
AvgTaskTime time.Duration
MaxTaskTime time.Duration
MinTaskTime time.Duration
}
PoolMetrics holds execution statistics.
type ProgressTracker ¶
type ProgressTracker struct {
// contains filtered or unexported fields
}
ProgressTracker tracks progress of parallel operations.
func NewProgressTracker ¶
func NewProgressTracker(total int64, callback func(completed, total int64), interval time.Duration) *ProgressTracker
NewProgressTracker creates a new progress tracker.
func (*ProgressTracker) Add ¶
func (pt *ProgressTracker) Add(n int64)
Add adds n to the completed count.
func (*ProgressTracker) Completed ¶
func (pt *ProgressTracker) Completed() int64
Completed returns the current completed count.
func (*ProgressTracker) Increment ¶
func (pt *ProgressTracker) Increment()
Increment increments the completed count.
func (*ProgressTracker) Start ¶
func (pt *ProgressTracker) Start(ctx context.Context)
Start begins progress tracking in a background goroutine.
type Task ¶
type Task[T any, R any] interface { // Execute performs the task and returns the result. Execute(ctx context.Context) (R, error) // Input returns the input data for this task. Input() T }
Task represents a unit of work that can be executed by the worker pool.
type TaskFunc ¶
TaskFunc is a function type that implements Task interface.
func NewTask ¶
func NewTask[T any, R any](input T, fn func(ctx context.Context, input T) (R, error)) *TaskFunc[T, R]
NewTask creates a new task from a function.
type TaskResult ¶
TaskResult holds the result of a task execution.
type WorkerPool ¶
WorkerPool manages a pool of workers for parallel task execution.
func NewWorkerPool ¶
func NewWorkerPool[T any, R any](config PoolConfig) *WorkerPool[T, R]
NewWorkerPool creates a new worker pool with the given configuration.
func (*WorkerPool[T, R]) Execute ¶
func (p *WorkerPool[T, R]) Execute(ctx context.Context, tasks []Task[T, R]) []TaskResult[T, R]
Execute runs all tasks in parallel and returns results. Results are returned in the same order as input tasks.
func (*WorkerPool[T, R]) ExecuteFunc ¶
func (p *WorkerPool[T, R]) ExecuteFunc(ctx context.Context, inputs []T, fn func(ctx context.Context, input T) (R, error)) []TaskResult[T, R]
ExecuteFunc is a convenience method that creates tasks from a function.
func (*WorkerPool[T, R]) Metrics ¶
func (p *WorkerPool[T, R]) Metrics() PoolMetrics
Metrics returns the current execution metrics.