worker

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Overview

Package worker provides a bounded worker pool using errgroup. Designed for all-or-nothing fetch operations where any failure should cancel all other workers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MapPartitions

func MapPartitions[T any](ctx context.Context, limit int, partitions []Partition[T], fn func(context.Context, Partition[T]) error) error

MapPartitions executes a function for each partition with bounded concurrency. Returns on first error, cancelling remaining work.

Example:

partitions := []worker.Partition[TimeRange]{
    {ID: "2023", Data: TimeRange{Start: "2023-01-01", End: "2023-12-31"}},
    {ID: "2024", Data: TimeRange{Start: "2024-01-01", End: "2024-12-31"}},
}
err := worker.MapPartitions(ctx, 10, partitions, func(ctx context.Context, p Partition[TimeRange]) error {
    return fetchPartition(ctx, p.Data)
})

func RunAll

func RunAll(ctx context.Context, limit int, tasks []Task) error

RunAll executes all tasks with bounded concurrency. Returns on first error, cancelling remaining tasks.

Example:

tasks := []worker.Task{
    func(ctx context.Context) error { return fetchYear(ctx, 2023) },
    func(ctx context.Context) error { return fetchYear(ctx, 2024) },
}
err := worker.RunAll(ctx, 10, tasks)

Types

type Partition

type Partition[T any] struct {
	ID   string // Human-readable identifier (e.g., "2023", "page-5")
	Data T      // Partition-specific data
}

Partition represents a work partition with metadata.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool manages a bounded set of concurrent workers.

func NewPool

func NewPool(ctx context.Context, limit int) *Pool

NewPool creates a new worker pool with the given concurrency limit.

The pool uses errgroup.WithContext, which means:

  • If any task returns an error, all other tasks are cancelled
  • All tasks share the same context for cancellation
  • Wait() returns the first error encountered

func (*Pool) Context

func (p *Pool) Context() context.Context

Context returns the pool's context. Use this to check for cancellation within tasks.

func (*Pool) Go

func (p *Pool) Go(task func() error)

Go submits a task to the pool. Blocks if all workers are busy. Returns immediately if the context is already cancelled.

func (*Pool) Wait

func (p *Pool) Wait() error

Wait blocks until all tasks complete and returns the first error (if any).

type Task

type Task func(ctx context.Context) error

Task represents a unit of work with context.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL