concurrent

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: MIT Imports: 4 Imported by: 0

README

concurrent CI Go Reference

A modern take on structured concurrency in Go.

Motivation

Go provides excellent low-level concurrency primitives (goroutines, channels, sync package), but composing them into higher-level patterns—fan-out pipelines, bounded worker pools, ordered result collection—requires repetitive boilerplate and careful coordination.

The concurrent package provides a small set of composable building blocks that handle goroutine lifecycle, ordering, error propagation, and panic safety. All concurrent operations respect a context-based concurrency limit, making it easy to control resource usage across an entire call tree.

Results are returned as iter.Seq / iter.Seq2 iterators, so they compose naturally with the standard library and range loops.

Usage

concurrent.WithLimit

Control parallelism via context. The limit propagates through the call tree and can only be decreased, never increased.

ctx := concurrent.WithLimit(ctx, 4) // at most 4 concurrent operations
concurrent.Pipeline

The core primitive — use when you have an iter.Seq2 stream and a transform function. All other APIs are convenience wrappers built on top of Pipeline.

results := concurrent.Pipeline(ctx, inputSeq, func(ctx context.Context, in T) (Out, error) {
    ...
})

for out, err := range results {
    // results arrive in input order
    ...
}
concurrent.Run / concurrent.RunTasks

Convenience wrappers for when your input is a []T slice. Run collects results as an iterator, RunTasks for functions that have side-effects but don't return any errors.

for result, err := range concurrent.Run(ctx, urls,
    func(ctx context.Context, url string) (Result, error) {
        ...
    },
) {
    ...
}
err := concurrent.RunTasks(ctx, items, func(ctx context.Context, item Item) error {
    ...
})
concurrent.Exec / concurrent.Query

Use when you have a small, fixed set of independent functions rather than a homogeneous slice. Exec for error-only tasks, Query when each task returns a value.

for err := range concurrent.Exec(ctx, task1, task2, task3) {
    if err != nil {
        ...
    }
}
for result, err := range concurrent.Query(ctx, query1, query2) {
    ...
}
concurrent.Queue / concurrent.Process

Use for producer-consumer patterns where jobs arrive dynamically over time rather than being known upfront.

q := concurrent.NewQueue[Result]()

// Producer goroutine
go func() {
    for job := range jobs {
        q.Push(func(ctx context.Context, yield func(Result, error) bool) {
            yield(process(ctx, job))
        })
    }
}()

// Consumer — blocks until queue.Done() is called and all jobs are processed
for result, err := range concurrent.Process(ctx, q) {
    // ...
}

Contributing

Contributions are welcome! To get started:

  1. Ensure you have Go 1.25+ installed
  2. Run go test ./... to verify tests pass

Please report bugs and feature requests via GitHub Issues.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Overview

Package concurrent provides structured concurrency primitives for Go, built on iterators (iter.Seq / iter.Seq2).

All concurrent operations respect a context-based concurrency limit set via WithLimit and propagate panics safely across goroutine boundaries. Results are returned in input order regardless of completion order.

The core primitive is Pipeline, which applies a transform function to an iterator stream using a bounded worker pool. Higher-level helpers—Exec, Query, Run, RunTasks—cover common patterns like processing slices, maps, or fixed sets of independent tasks.

For producer-consumer patterns where jobs arrive dynamically, use Queue with Process.

Index

Examples

Constants

View Source
const DefaultLimit = 10

DefaultLimit is the default maximum number of concurrent operations that are performed by functions that use goroutines to manage concurrency.

View Source
const DefaultQueueCapacity = 1_000_000

DefaultQueueCapacity is the default maximum number of jobs that can be queued when using NewQueue. This provides a reasonable default for most use cases while preventing unbounded memory growth.

Variables

View Source
var ErrFull = errors.New("job queue is full")

ErrFull is returned when a job queue has reached its capacity limit. When the queue is full, additional jobs will be replaced with error-emitting jobs that yield this error when processed.

Functions

func Exec

func Exec(ctx context.Context, tasks ...func(context.Context) error) iter.Seq[error]

Exec executes multiple tasks concurrently and returns an iterator of errors.

This is a special case of Pipeline where each task is a simple function that returns only an error. All tasks are executed concurrently up to the context's concurrency limit.

Parameters:

  • ctx: Context that controls concurrency limits and cancellation
  • tasks: Variable number of functions to execute concurrently

Returns:

An iterator sequence that yields error values, one for each task.
The order of errors corresponds to the order of tasks provided.

Example:

for err := range Exec(ctx, task1, task2, task3) {
    if err != nil {
        // handle error
    }
}
Example

Example demonstrating Exec usage

package main

import (
	"context"

	"github.com/firetiger-oss/concurrent"
)

func main() {
	ctx := concurrent.WithLimit(context.Background(), 3)

	task1 := func(ctx context.Context) error {
		// do work
		return nil
	}

	task2 := func(ctx context.Context) error {
		// do work
		return nil
	}

	task3 := func(ctx context.Context) error {
		// do work
		return nil
	}

	// Execute all tasks concurrently
	for err := range concurrent.Exec(ctx, task1, task2, task3) {
		if err != nil {
			// handle error
			_ = err
		}
	}
}

func Limit

func Limit(ctx context.Context) int

Limit retrieves the maximum concurrency value from the context.

func Pipeline

func Pipeline[Out, In any](
	ctx context.Context,
	seq iter.Seq2[In, error],
	transform func(context.Context, In) (Out, error),
) iter.Seq2[Out, error]

Pipeline provides concurrent processing of iterator sequences with a transform function.

This function takes an input iterator sequence and applies a transformation function to each element concurrently, respecting the context's concurrency limits. It maintains ordering and provides proper error handling and cancellation support.

Type Parameters:

  • Out: The output type after transformation
  • In: The input type before transformation

Parameters:

  • ctx: Context that controls concurrency limits and cancellation
  • seq: Input iterator sequence of In and error pairs
  • transform: Function that transforms input items to output items

Returns:

An iterator sequence that yields Out and error pairs, maintaining the
order of the input sequence while processing items concurrently.

The function:

  • Respects context concurrency limits from concurrent.Limit(ctx)
  • Maintains proper ordering of results
  • Handles context cancellation gracefully
  • Uses goroutines for concurrent processing
  • Propagates errors from both input sequence and transform function
Example

Example demonstrating Pipeline usage

package main

import (
	"context"

	"github.com/firetiger-oss/concurrent"
)

func main() {
	ctx := concurrent.WithLimit(context.Background(), 5)

	// Create a sequence of numbers
	numbers := func(yield func(int, error) bool) {
		for i := 1; i <= 10; i++ {
			if !yield(i, nil) {
				return
			}
		}
	}

	// Transform function that squares each number
	square := func(ctx context.Context, n int) (int, error) {
		return n * n, nil
	}

	// Process the pipeline
	for result, err := range concurrent.Pipeline(ctx, numbers, square) {
		if err != nil {
			panic(err)
		}
		_ = result // use result
	}
}

func Process

func Process[T any](ctx context.Context, queue *Queue[T]) iter.Seq2[T, error]

Process executes jobs from the queue concurrently and returns an iterator of their results.

The function creates a pool of worker goroutines (controlled by the context's concurrency limit) that pull jobs from the queue and execute them. Results are collected and yielded through the returned iterator.

The concurrency level is determined by Limit(ctx). Workers respect context cancellation and will stop processing when the context is cancelled.

The iterator terminates when: - All jobs in the queue have been processed - The context is cancelled - The consumer stops the iterator by returning false

Process automatically calls queue.Done() when all jobs have been consumed, preventing new jobs from being added and allowing graceful shutdown.

Type parameter T represents the type of values that jobs produce.

Example

Example demonstrates using Process to handle jobs with concurrency control.

ctx := WithLimit(context.Background(), 2) // Limit to 2 concurrent workers
queue := NewQueue[int]()

// Add jobs that simulate work
for i := 1; i <= 3; i++ {
	value := i
	queue.Push(func(ctx context.Context, yield func(int, error) bool) {
		// Simulate some work
		time.Sleep(10 * time.Millisecond)
		yield(value*10, nil)
	})
}

// Process all jobs and collect results
var results []int
for result, err := range Process(ctx, queue) {
	if err != nil {
		fmt.Printf("Error: %v\n", err)
	} else {
		results = append(results, result)
	}
}

// Sort for deterministic output
sort.Ints(results)
fmt.Printf("Results: %v\n", results)
Output:
Results: [10 20 30]

func ProcessTasks

func ProcessTasks(ctx context.Context, tasks *TaskQueue) error

ProcessTasks executes all tasks in the queue concurrently and returns the first error encountered, if any.

Tasks are processed by a pool of workers (controlled by the context's concurrency limit). If any task returns an error, processing stops and that error is returned.

This function is useful for scenarios where you want fail-fast behavior and don't need to collect result values from successful tasks.

func Query

func Query[R any](ctx context.Context, tasks ...func(context.Context) (R, error)) iter.Seq2[R, error]

Query executes multiple query tasks concurrently and returns an iterator of results and errors.

This is a special case of Pipeline where each task is a function that returns a result and an error. All tasks are executed concurrently up to the context's concurrency limit.

Type Parameters:

  • R: The result type returned by each task

Parameters:

  • ctx: Context that controls concurrency limits and cancellation
  • tasks: Variable number of query functions to execute concurrently

Returns:

An iterator sequence that yields result and error pairs, one for each task.
The order of results corresponds to the order of tasks provided.

Example:

for result, err := range Query(ctx, query1, query2, query3) {
    if err != nil {
        // handle error
    } else {
        // use result
    }
}
Example

Example demonstrating Query usage

package main

import (
	"context"

	"github.com/firetiger-oss/concurrent"
)

func main() {
	ctx := concurrent.WithLimit(context.Background(), 5)

	query1 := func(ctx context.Context) (string, error) {
		return "result1", nil
	}

	query2 := func(ctx context.Context) (string, error) {
		return "result2", nil
	}

	// Execute all queries concurrently
	for result, err := range concurrent.Query(ctx, query1, query2) {
		if err != nil {
			// handle error
			continue
		}
		_ = result // use result
	}
}

func Run

func Run[R, T any](ctx context.Context, jobs []T, process func(context.Context, T) (R, error)) iter.Seq2[R, error]

Run is a convenience function that processes each item in the jobs slice concurrently using the provided process function.

Results are returned in the same order as the input jobs, regardless of which jobs complete first. This is achieved by using Pipeline internally.

The concurrency level is controlled by the context's concurrency limit (see WithLimit).

func Run2

func Run2[R any, K comparable, V any](ctx context.Context, jobs map[K]V, process func(context.Context, K, V) (R, error)) iter.Seq2[R, error]

Run2 is like Run but it takes its input jobs as a map.

For each key-value pair in the jobs map, the process function is called concurrently. Note that map iteration order in Go is not deterministic, so while results maintain the iteration order, that order itself varies between runs.

The concurrency level is controlled by the context's concurrency limit (see WithLimit).

func RunTasks

func RunTasks[T any](ctx context.Context, tasks []T, process func(context.Context, T) error) error

RunTasks processes each item in the tasks slice concurrently using the provided process function, returning the first error encountered in input order.

Tasks are executed concurrently, but errors are checked in the order of the input slice. This means if tasks[0] and tasks[2] both fail, the error from tasks[0] will be returned even if tasks[2] completed first.

The concurrency level is controlled by the context's concurrency limit (see WithLimit).

Type parameter T represents the type of input items to process.

Example

Example demonstrates RunTasks for processing a slice of data.

data := []int{1, 2, 3, 4, 5}

// Collect results for deterministic output
var results []string
var mu sync.Mutex

// Process each item, doubling the values
err := RunTasks(context.Background(), data, func(ctx context.Context, item int) error {
	result := item * 2
	mu.Lock()
	results = append(results, fmt.Sprintf("Processed %d -> %d", item, result))
	mu.Unlock()
	return nil
})

if err != nil {
	fmt.Printf("Error: %v\n", err)
} else {
	// Sort for deterministic output since task order may vary
	sort.Strings(results)
	for _, result := range results {
		fmt.Println(result)
	}
}
Output:
Processed 1 -> 2
Processed 2 -> 4
Processed 3 -> 6
Processed 4 -> 8
Processed 5 -> 10

func RunTasks2

func RunTasks2[K comparable, V any](ctx context.Context, tasks map[K]V, process func(context.Context, K, V) error) error

RunTasks2 is like RunTasks but it takes its input tasks as a map.

Note that map iteration order in Go is not deterministic, so while errors are returned in iteration order, that order itself varies between runs.

func WithLimit

func WithLimit(ctx context.Context, maxConcurrency int) context.Context

WithLimit creates a new context with a specified maximum concurrency value.

Types

type Job

type Job[T any] func(context.Context, func(T, error) bool)

Job represents a generic job function that processes data of type T. The job receives a context for cancellation and a yield function to emit results. The yield function returns true if the caller wants more results, or false to stop processing.

Jobs can yield multiple values by calling the yield function multiple times. They should respect context cancellation and stop processing when the context is done.

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

Queue is a thread-safe, bounded queue for processing jobs concurrently. It supports generic job types and provides capacity limits, graceful shutdown, and synchronization primitives for coordinating between producers and consumers.

The queue uses a condition variable to efficiently block consumers when empty and wake them when jobs become available. It maintains a wait group to track outstanding jobs for synchronization.

When the queue reaches its capacity limit, additional jobs are dropped and replaced with error-emitting jobs that yield ErrFull when processed.

Type parameter T represents the type of values that jobs will produce.

Example

Example demonstrates basic queue usage for processing jobs concurrently.

// Create a queue with capacity for 100 jobs
queue := NewQueueWithCapacity[string](100)

// Add some jobs to the queue
queue.Push(func(ctx context.Context, yield func(string, error) bool) {
	yield("processed job 1", nil)
})
queue.Push(func(ctx context.Context, yield func(string, error) bool) {
	yield("processed job 2", nil)
})

// Signal that no more jobs will be added
queue.Done()

// Process jobs using the Pull iterator
var results []string
for job := range queue.Pull() {
	job(context.Background(), func(result string, err error) bool {
		if err != nil {
			fmt.Printf("Error: %v\n", err)
		} else {
			results = append(results, result)
		}
		return true
	})
}

// Sort for deterministic output since job order may vary
sort.Strings(results)
for _, result := range results {
	fmt.Println(result)
}
Output:
processed job 1
processed job 2
Example (CapacityLimit)

Example demonstrates queue capacity limits and error handling.

// Create a queue with very small capacity
queue := NewQueueWithCapacity[string](1)

// Add jobs up to and beyond capacity
queue.Push(func(ctx context.Context, yield func(string, error) bool) {
	yield("job 1", nil)
})
queue.Push(func(ctx context.Context, yield func(string, error) bool) {
	yield("job 2", nil) // This will be replaced with an error job
})

queue.Done()

// Process jobs and handle capacity errors
for job := range queue.Pull() {
	job(context.Background(), func(result string, err error) bool {
		if errors.Is(err, ErrFull) {
			fmt.Println("Queue was full")
		} else if err != nil {
			fmt.Printf("Error: %v\n", err)
		} else {
			fmt.Printf("Result: %s\n", result)
		}
		return true
	})
}
Output:
Result: job 1
Queue was full
Example (Shutdown)

Example demonstrates graceful shutdown with queue coordination.

queue := NewQueue[string]()

// Collect results for deterministic output
var results []string
var mu sync.Mutex

// Start a producer goroutine
go func() {
	for i := 1; i <= 3; i++ {
		value := i // Capture loop variable
		queue.Push(func(ctx context.Context, yield func(string, error) bool) {
			yield(fmt.Sprintf("work item %d", value), nil)
		})
		time.Sleep(10 * time.Millisecond)
	}
	queue.Done() // Signal no more work
}()

// Process work as it arrives
for job := range queue.Pull() {
	job(context.Background(), func(result string, err error) bool {
		if err != nil {
			fmt.Printf("Error: %v\n", err)
		} else {
			mu.Lock()
			results = append(results, result)
			mu.Unlock()
		}
		return true
	})
}

// Wait ensures all jobs are finished before proceeding
queue.Wait()

// Sort for deterministic output since job completion order may vary
sort.Strings(results)
for _, result := range results {
	fmt.Printf("Completed: %s\n", result)
}
fmt.Println("All work completed")
Output:
Completed: work item 1
Completed: work item 2
Completed: work item 3
All work completed

func NewQueue

func NewQueue[T any]() *Queue[T]

NewQueue creates a new job queue with the default capacity. The queue will accept up to DefaultQueueCapacity jobs before dropping additional jobs and replacing them with error-emitting jobs that yield ErrFull.

Type parameter T specifies the type of values that jobs will produce.

func NewQueueWithCapacity

func NewQueueWithCapacity[T any](capacity int) *Queue[T]

NewQueueWithCapacity creates a new job queue with the specified capacity. The capacity determines the maximum number of jobs that can be queued before additional jobs are dropped.

When the queue reaches capacity, additional jobs are dropped and replaced with error-emitting jobs that yield ErrFull when processed.

Type parameter T specifies the type of values that jobs will produce.

func (*Queue[T]) Done

func (q *Queue[T]) Done()

Done marks the queue as closed for new jobs and signals all blocked consumers to stop pulling jobs once the queue is empty.

After Done() is called: - Push() will panic if called - Pull() will terminate once all remaining jobs are consumed - New Pull() calls will return immediately if the queue is empty

Done is safe for concurrent use and is typically called by producers to signal that no more work will be submitted.

func (*Queue[T]) Flush

func (q *Queue[T]) Flush()

Flush removes all pending jobs from the queue without processing them.

All flushed jobs are marked as done in the wait group, allowing Wait() to proceed without waiting for the flushed jobs to be processed.

This method is useful for graceful shutdown scenarios where you want to stop processing new work but still wait for already-started jobs to complete.

Flush is safe for concurrent use.

func (*Queue[T]) Pull

func (q *Queue[T]) Pull() iter.Seq[Job[T]]

Pull returns an iterator that yields jobs from the queue in FIFO order.

The iterator blocks when the queue is empty and waits for new jobs to be pushed. It terminates when Done() is called and all remaining jobs have been consumed.

Each job yielded by the iterator is automatically marked as done in the wait group when the iterator advances, enabling Wait() to track completion.

Pull is safe for concurrent use by multiple consumers. Jobs are distributed among consumers in a round-robin fashion.

The iterator can be stopped early by returning false from the yield function.

func (*Queue[T]) Push

func (q *Queue[T]) Push(job Job[T])

Push adds a job to the queue for processing.

If the queue has reached its capacity, the job will be dropped and replaced with an error-emitting job that yields ErrFull when processed. This prevents unbounded memory growth without blocking the caller.

Push panics if called on a queue that has been marked as done via Done(). This prevents adding jobs to a queue that consumers have stopped processing.

Push is safe for concurrent use and will wake up blocked consumers.

func (*Queue[T]) Wait

func (q *Queue[T]) Wait()

Wait blocks until all jobs that have been pushed to the queue have been processed by consumers via Pull().

Wait tracks job completion through an internal wait group that is incremented when jobs are pushed and decremented when jobs are consumed from Pull().

This method is useful for ensuring all work has completed before proceeding or shutting down the application.

type Task

type Task func(context.Context) error

Task represents a simple function that performs work and may return an error. Tasks are used with TaskQueue for scenarios where you only care about success/failure rather than producing specific result values.

Tasks should respect context cancellation and return promptly when the context is cancelled.

type TaskQueue

type TaskQueue Queue[struct{}]

TaskQueue is a specialized queue for processing Task functions. It's a convenience wrapper around Queue[struct{}] for scenarios where you only need to track success/failure of operations rather than collecting specific result values.

Example

Example demonstrates TaskQueue for simple success/failure operations.

taskQueue := NewTaskQueue()

// Collect task completions for deterministic output
var completed []string
var mu sync.Mutex

// Add tasks that perform operations
taskQueue.Push(func(ctx context.Context) error {
	mu.Lock()
	completed = append(completed, "Task 1 completed")
	mu.Unlock()
	return nil
})
taskQueue.Push(func(ctx context.Context) error {
	mu.Lock()
	completed = append(completed, "Task 2 completed")
	mu.Unlock()
	return nil
})

// Process all tasks
if err := ProcessTasks(context.Background(), taskQueue); err != nil {
	fmt.Printf("Error: %v\n", err)
} else {
	// Sort for deterministic output since task order may vary
	sort.Strings(completed)
	for _, msg := range completed {
		fmt.Println(msg)
	}
	fmt.Println("All tasks completed successfully")
}
Output:
Task 1 completed
Task 2 completed
All tasks completed successfully

func NewTaskQueue

func NewTaskQueue() *TaskQueue

NewTaskQueue creates a new task queue with the default capacity. Task queues are optimized for scenarios where you only need to track success/failure of operations.

func NewTaskQueueWithCapacity

func NewTaskQueueWithCapacity(capacity int) *TaskQueue

NewTaskQueueWithCapacity creates a new task queue with the specified capacity. The capacity determines how many tasks can be queued before additional tasks are dropped and ErrFull is emitted.

func (*TaskQueue) Push

func (q *TaskQueue) Push(task Task)

Push adds a task to the queue for processing. The task will be executed by workers created via ProcessTasks().

Tasks are converted to jobs that yield empty struct{} values on success or error values on failure.

func (*TaskQueue) Queue

func (q *TaskQueue) Queue() *Queue[struct{}]

Queue returns the underlying generic queue that backs this TaskQueue. This allows access to the full Queue API when needed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL