parallel

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package parallel provides generic parallel processing utilities.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ForEach

func ForEach[T any](
	ctx context.Context,
	items []T,
	config PoolConfig,
	fn func(ctx context.Context, item T) error,
) (processed int64, firstError error)

ForEach executes a function for each item in parallel. Returns the number of items processed and any error that occurred.

func MapReduce

func MapReduce[T any, M any, R any](
	ctx context.Context,
	items []T,
	config PoolConfig,
	mapper func(ctx context.Context, item T) M,
	reducer func(mapped []M) R,
) R

MapReduce applies a map function to each item in parallel and reduces the results.

func ParallelAggregate

func ParallelAggregate[T any, K comparable, V any](
	ctx context.Context,
	items []T,
	config PoolConfig,
	extractor func(item T) (key K, value V),
	merger func(existing, new V) V,
) map[K]V

ParallelAggregate aggregates data in parallel using per-worker local maps. This avoids lock contention by having each worker maintain its own map, then merging results at the end.

Types

type AggregateResult

type AggregateResult[K comparable, V any] struct {
	Data map[K]V
}

AggregateResult holds the result of parallel aggregation.

type ChunkProcessor

type ChunkProcessor[T any, R any] struct {
	// contains filtered or unexported fields
}

ChunkProcessor processes large datasets by splitting them into chunks and processing each chunk in parallel.

func NewChunkProcessor

func NewChunkProcessor[T any, R any](config PoolConfig) *ChunkProcessor[T, R]

NewChunkProcessor creates a new chunk processor.

func (*ChunkProcessor[T, R]) ProcessChunks

func (p *ChunkProcessor[T, R]) ProcessChunks(
	ctx context.Context,
	items []T,
	processor func(ctx context.Context, chunk []T, workerID int) R,
	reducer func(results []R) R,
) R

ProcessChunks splits the input into chunks and processes each chunk in parallel. The reducer function combines results from all chunks into a single result.

type PoolConfig

type PoolConfig struct {
	// MaxWorkers is the maximum number of concurrent workers.
	// Default: min(runtime.NumCPU(), 8)
	MaxWorkers int

	// TaskBufferSize is the buffer size for the task channel.
	// Default: MaxWorkers * 2
	TaskBufferSize int

	// Timeout is the maximum time for the entire operation.
	// Default: 0 (no timeout)
	Timeout time.Duration

	// CollectMetrics enables collection of execution metrics.
	CollectMetrics bool
}

PoolConfig configures the worker pool behavior.

func DefaultPoolConfig

func DefaultPoolConfig() PoolConfig

DefaultPoolConfig returns a default pool configuration.

func (PoolConfig) WithMetrics

func (c PoolConfig) WithMetrics() PoolConfig

WithMetrics returns a new config with metrics collection enabled.

func (PoolConfig) WithTimeout

func (c PoolConfig) WithTimeout(d time.Duration) PoolConfig

WithTimeout returns a new config with the specified timeout.

func (PoolConfig) WithWorkers

func (c PoolConfig) WithWorkers(n int) PoolConfig

WithWorkers returns a new config with the specified number of workers.

type PoolMetrics

type PoolMetrics struct {
	TotalTasks     int64
	CompletedTasks int64
	FailedTasks    int64
	TotalDuration  time.Duration
	AvgTaskTime    time.Duration
	MaxTaskTime    time.Duration
	MinTaskTime    time.Duration
}

PoolMetrics holds execution statistics.

type ProgressTracker

type ProgressTracker struct {
	// contains filtered or unexported fields
}

ProgressTracker tracks progress of parallel operations.

func NewProgressTracker

func NewProgressTracker(total int64, callback func(completed, total int64), interval time.Duration) *ProgressTracker

NewProgressTracker creates a new progress tracker.

func (*ProgressTracker) Add

func (pt *ProgressTracker) Add(n int64)

Add adds n to the completed count.

func (*ProgressTracker) Completed

func (pt *ProgressTracker) Completed() int64

Completed returns the current completed count.

func (*ProgressTracker) Increment

func (pt *ProgressTracker) Increment()

Increment increments the completed count.

func (*ProgressTracker) Start

func (pt *ProgressTracker) Start(ctx context.Context)

Start begins progress tracking in a background goroutine.

func (*ProgressTracker) Stop

func (pt *ProgressTracker) Stop()

Stop stops progress tracking.

type Task

type Task[T any, R any] interface {
	// Execute performs the task and returns the result.
	Execute(ctx context.Context) (R, error)
	// Input returns the input data for this task.
	Input() T
}

Task represents a unit of work that can be executed by the worker pool.

type TaskFunc

type TaskFunc[T any, R any] struct {
	// contains filtered or unexported fields
}

TaskFunc is a function type that implements Task interface.

func NewTask

func NewTask[T any, R any](input T, fn func(ctx context.Context, input T) (R, error)) *TaskFunc[T, R]

NewTask creates a new task from a function.

func (*TaskFunc[T, R]) Execute

func (t *TaskFunc[T, R]) Execute(ctx context.Context) (R, error)

Execute implements Task interface.

func (*TaskFunc[T, R]) Input

func (t *TaskFunc[T, R]) Input() T

Input implements Task interface.

type TaskResult

type TaskResult[T any, R any] struct {
	Input    T
	Result   R
	Error    error
	Duration time.Duration
}

TaskResult holds the result of a task execution.

type WorkerPool

type WorkerPool[T any, R any] struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of workers for parallel task execution.

func NewWorkerPool

func NewWorkerPool[T any, R any](config PoolConfig) *WorkerPool[T, R]

NewWorkerPool creates a new worker pool with the given configuration.

func (*WorkerPool[T, R]) Execute

func (p *WorkerPool[T, R]) Execute(ctx context.Context, tasks []Task[T, R]) []TaskResult[T, R]

Execute runs all tasks in parallel and returns results. Results are returned in the same order as input tasks.

func (*WorkerPool[T, R]) ExecuteFunc

func (p *WorkerPool[T, R]) ExecuteFunc(ctx context.Context, inputs []T, fn func(ctx context.Context, input T) (R, error)) []TaskResult[T, R]

ExecuteFunc is a convenience method that creates tasks from a function.

func (*WorkerPool[T, R]) Metrics

func (p *WorkerPool[T, R]) Metrics() PoolMetrics

Metrics returns the current execution metrics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL