Documentation
¶
Overview ¶
Package worker provides a bounded worker pool using errgroup. Designed for all-or-nothing fetch operations where any failure should cancel all other workers.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MapPartitions ¶
func MapPartitions[T any](ctx context.Context, limit int, partitions []Partition[T], fn func(context.Context, Partition[T]) error) error
MapPartitions executes a function for each partition with bounded concurrency. Returns on first error, cancelling remaining work.
Example:
partitions := []worker.Partition[TimeRange]{
{ID: "2023", Data: TimeRange{Start: "2023-01-01", End: "2023-12-31"}},
{ID: "2024", Data: TimeRange{Start: "2024-01-01", End: "2024-12-31"}},
}
err := worker.MapPartitions(ctx, 10, partitions, func(ctx context.Context, p Partition[TimeRange]) error {
return fetchPartition(ctx, p.Data)
})
func RunAll ¶
RunAll executes all tasks with bounded concurrency. Returns on first error, cancelling remaining tasks.
Example:
tasks := []worker.Task{
func(ctx context.Context) error { return fetchYear(ctx, 2023) },
func(ctx context.Context) error { return fetchYear(ctx, 2024) },
}
err := worker.RunAll(ctx, 10, tasks)
Types ¶
type Partition ¶
type Partition[T any] struct { ID string // Human-readable identifier (e.g., "2023", "page-5") Data T // Partition-specific data }
Partition represents a work partition with metadata.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool manages a bounded set of concurrent workers.
func NewPool ¶
NewPool creates a new worker pool with the given concurrency limit.
The pool uses errgroup.WithContext, which means:
- If any task returns an error, all other tasks are cancelled
- All tasks share the same context for cancellation
- Wait() returns the first error encountered
func (*Pool) Context ¶
Context returns the pool's context. Use this to check for cancellation within tasks.