Documentation
¶
Overview ¶
Package concurrent provides structured concurrency primitives for Go, built on iterators (iter.Seq / iter.Seq2).
All concurrent operations respect a context-based concurrency limit set via WithLimit and propagate panics safely across goroutine boundaries. Results are returned in input order regardless of completion order.
The core primitive is Pipeline, which applies a transform function to an iterator stream using a bounded worker pool. Higher-level helpers—Exec, Query, Run, RunTasks—cover common patterns like processing slices, maps, or fixed sets of independent tasks.
For producer-consumer patterns where jobs arrive dynamically, use Queue with Process.
Index ¶
- Constants
- Variables
- func Exec(ctx context.Context, tasks ...func(context.Context) error) iter.Seq[error]
- func Limit(ctx context.Context) int
- func Pipeline[Out, In any](ctx context.Context, seq iter.Seq2[In, error], ...) iter.Seq2[Out, error]
- func Process[T any](ctx context.Context, queue *Queue[T]) iter.Seq2[T, error]
- func ProcessTasks(ctx context.Context, tasks *TaskQueue) error
- func Query[R any](ctx context.Context, tasks ...func(context.Context) (R, error)) iter.Seq2[R, error]
- func Run[R, T any](ctx context.Context, jobs []T, process func(context.Context, T) (R, error)) iter.Seq2[R, error]
- func Run2[R any, K comparable, V any](ctx context.Context, jobs map[K]V, ...) iter.Seq2[R, error]
- func RunTasks[T any](ctx context.Context, tasks []T, process func(context.Context, T) error) error
- func RunTasks2[K comparable, V any](ctx context.Context, tasks map[K]V, process func(context.Context, K, V) error) error
- func WithLimit(ctx context.Context, maxConcurrency int) context.Context
- type Job
- type Queue
- type Task
- type TaskQueue
Examples ¶
Constants ¶
const DefaultLimit = 10
DefaultLimit is the default maximum number of concurrent operations that are performed by functions that use goroutines to manage concurrency.
const DefaultQueueCapacity = 1_000_000
DefaultQueueCapacity is the default maximum number of jobs that can be queued when using NewQueue. This provides a reasonable default for most use cases while preventing unbounded memory growth.
Variables ¶
var ErrFull = errors.New("job queue is full")
ErrFull is returned when a job queue has reached its capacity limit. When the queue is full, additional jobs will be replaced with error-emitting jobs that yield this error when processed.
Functions ¶
func Exec ¶
Exec executes multiple tasks concurrently and returns an iterator of errors.
This is a special case of Pipeline where each task is a simple function that returns only an error. All tasks are executed concurrently up to the context's concurrency limit.
Parameters:
- ctx: Context that controls concurrency limits and cancellation
- tasks: Variable number of functions to execute concurrently
Returns:
An iterator sequence that yields error values, one for each task. The order of errors corresponds to the order of tasks provided.
Example:
for err := range Exec(ctx, task1, task2, task3) {
if err != nil {
// handle error
}
}
Example ¶
Example demonstrating Exec usage
package main
import (
"context"
"github.com/firetiger-oss/concurrent"
)
func main() {
ctx := concurrent.WithLimit(context.Background(), 3)
task1 := func(ctx context.Context) error {
// do work
return nil
}
task2 := func(ctx context.Context) error {
// do work
return nil
}
task3 := func(ctx context.Context) error {
// do work
return nil
}
// Execute all tasks concurrently
for err := range concurrent.Exec(ctx, task1, task2, task3) {
if err != nil {
// handle error
_ = err
}
}
}
Output:
func Pipeline ¶
func Pipeline[Out, In any]( ctx context.Context, seq iter.Seq2[In, error], transform func(context.Context, In) (Out, error), ) iter.Seq2[Out, error]
Pipeline provides concurrent processing of iterator sequences with a transform function.
This function takes an input iterator sequence and applies a transformation function to each element concurrently, respecting the context's concurrency limits. It maintains ordering and provides proper error handling and cancellation support.
Type Parameters:
- Out: The output type after transformation
- In: The input type before transformation
Parameters:
- ctx: Context that controls concurrency limits and cancellation
- seq: Input iterator sequence of In and error pairs
- transform: Function that transforms input items to output items
Returns:
An iterator sequence that yields Out and error pairs, maintaining the order of the input sequence while processing items concurrently.
The function:
- Respects context concurrency limits from concurrent.Limit(ctx)
- Maintains proper ordering of results
- Handles context cancellation gracefully
- Uses goroutines for concurrent processing
- Propagates errors from both input sequence and transform function
Example ¶
Example demonstrating Pipeline usage
package main
import (
"context"
"github.com/firetiger-oss/concurrent"
)
func main() {
ctx := concurrent.WithLimit(context.Background(), 5)
// Create a sequence of numbers
numbers := func(yield func(int, error) bool) {
for i := 1; i <= 10; i++ {
if !yield(i, nil) {
return
}
}
}
// Transform function that squares each number
square := func(ctx context.Context, n int) (int, error) {
return n * n, nil
}
// Process the pipeline
for result, err := range concurrent.Pipeline(ctx, numbers, square) {
if err != nil {
panic(err)
}
_ = result // use result
}
}
Output:
func Process ¶
Process executes jobs from the queue concurrently and returns an iterator of their results.
The function creates a pool of worker goroutines (controlled by the context's concurrency limit) that pull jobs from the queue and execute them. Results are collected and yielded through the returned iterator.
The concurrency level is determined by Limit(ctx). Workers respect context cancellation and will stop processing when the context is cancelled.
The iterator terminates when: - All jobs in the queue have been processed - The context is cancelled - The consumer stops the iterator by returning false
Process automatically calls queue.Done() when all jobs have been consumed, preventing new jobs from being added and allowing graceful shutdown.
Type parameter T represents the type of values that jobs produce.
Example ¶
Example demonstrates using Process to handle jobs with concurrency control.
ctx := WithLimit(context.Background(), 2) // Limit to 2 concurrent workers
queue := NewQueue[int]()
// Add jobs that simulate work
for i := 1; i <= 3; i++ {
value := i
queue.Push(func(ctx context.Context, yield func(int, error) bool) {
// Simulate some work
time.Sleep(10 * time.Millisecond)
yield(value*10, nil)
})
}
// Process all jobs and collect results
var results []int
for result, err := range Process(ctx, queue) {
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
results = append(results, result)
}
}
// Sort for deterministic output
sort.Ints(results)
fmt.Printf("Results: %v\n", results)
Output: Results: [10 20 30]
func ProcessTasks ¶
ProcessTasks executes all tasks in the queue concurrently and returns the first error encountered, if any.
Tasks are processed by a pool of workers (controlled by the context's concurrency limit). If any task returns an error, processing stops and that error is returned.
This function is useful for scenarios where you want fail-fast behavior and don't need to collect result values from successful tasks.
func Query ¶
func Query[R any](ctx context.Context, tasks ...func(context.Context) (R, error)) iter.Seq2[R, error]
Query executes multiple query tasks concurrently and returns an iterator of results and errors.
This is a special case of Pipeline where each task is a function that returns a result and an error. All tasks are executed concurrently up to the context's concurrency limit.
Type Parameters:
- R: The result type returned by each task
Parameters:
- ctx: Context that controls concurrency limits and cancellation
- tasks: Variable number of query functions to execute concurrently
Returns:
An iterator sequence that yields result and error pairs, one for each task. The order of results corresponds to the order of tasks provided.
Example:
for result, err := range Query(ctx, query1, query2, query3) {
if err != nil {
// handle error
} else {
// use result
}
}
Example ¶
Example demonstrating Query usage
package main
import (
"context"
"github.com/firetiger-oss/concurrent"
)
func main() {
ctx := concurrent.WithLimit(context.Background(), 5)
query1 := func(ctx context.Context) (string, error) {
return "result1", nil
}
query2 := func(ctx context.Context) (string, error) {
return "result2", nil
}
// Execute all queries concurrently
for result, err := range concurrent.Query(ctx, query1, query2) {
if err != nil {
// handle error
continue
}
_ = result // use result
}
}
Output:
func Run ¶
func Run[R, T any](ctx context.Context, jobs []T, process func(context.Context, T) (R, error)) iter.Seq2[R, error]
Run is a convenience function that processes each item in the jobs slice concurrently using the provided process function.
Results are returned in the same order as the input jobs, regardless of which jobs complete first. This is achieved by using Pipeline internally.
The concurrency level is controlled by the context's concurrency limit (see WithLimit).
func Run2 ¶
func Run2[R any, K comparable, V any](ctx context.Context, jobs map[K]V, process func(context.Context, K, V) (R, error)) iter.Seq2[R, error]
Run2 is like Run but it takes its input jobs as a map.
For each key-value pair in the jobs map, the process function is called concurrently. Note that map iteration order in Go is not deterministic, so while results maintain the iteration order, that order itself varies between runs.
The concurrency level is controlled by the context's concurrency limit (see WithLimit).
func RunTasks ¶
RunTasks processes each item in the tasks slice concurrently using the provided process function, returning the first error encountered in input order.
Tasks are executed concurrently, but errors are checked in the order of the input slice. This means if tasks[0] and tasks[2] both fail, the error from tasks[0] will be returned even if tasks[2] completed first.
The concurrency level is controlled by the context's concurrency limit (see WithLimit).
Type parameter T represents the type of input items to process.
Example ¶
Example demonstrates RunTasks for processing a slice of data.
data := []int{1, 2, 3, 4, 5}
// Collect results for deterministic output
var results []string
var mu sync.Mutex
// Process each item, doubling the values
err := RunTasks(context.Background(), data, func(ctx context.Context, item int) error {
result := item * 2
mu.Lock()
results = append(results, fmt.Sprintf("Processed %d -> %d", item, result))
mu.Unlock()
return nil
})
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
// Sort for deterministic output since task order may vary
sort.Strings(results)
for _, result := range results {
fmt.Println(result)
}
}
Output: Processed 1 -> 2 Processed 2 -> 4 Processed 3 -> 6 Processed 4 -> 8 Processed 5 -> 10
func RunTasks2 ¶
func RunTasks2[K comparable, V any](ctx context.Context, tasks map[K]V, process func(context.Context, K, V) error) error
RunTasks2 is like RunTasks but it takes its input tasks as a map.
Note that map iteration order in Go is not deterministic, so while errors are returned in iteration order, that order itself varies between runs.
Types ¶
type Job ¶
Job represents a generic job function that processes data of type T. The job receives a context for cancellation and a yield function to emit results. The yield function returns true if the caller wants more results, or false to stop processing.
Jobs can yield multiple values by calling the yield function multiple times. They should respect context cancellation and stop processing when the context is done.
type Queue ¶
type Queue[T any] struct { // contains filtered or unexported fields }
Queue is a thread-safe, bounded queue for processing jobs concurrently. It supports generic job types and provides capacity limits, graceful shutdown, and synchronization primitives for coordinating between producers and consumers.
The queue uses a condition variable to efficiently block consumers when empty and wake them when jobs become available. It maintains a wait group to track outstanding jobs for synchronization.
When the queue reaches its capacity limit, additional jobs are dropped and replaced with error-emitting jobs that yield ErrFull when processed.
Type parameter T represents the type of values that jobs will produce.
Example ¶
Example demonstrates basic queue usage for processing jobs concurrently.
// Create a queue with capacity for 100 jobs
queue := NewQueueWithCapacity[string](100)
// Add some jobs to the queue
queue.Push(func(ctx context.Context, yield func(string, error) bool) {
yield("processed job 1", nil)
})
queue.Push(func(ctx context.Context, yield func(string, error) bool) {
yield("processed job 2", nil)
})
// Signal that no more jobs will be added
queue.Done()
// Process jobs using the Pull iterator
var results []string
for job := range queue.Pull() {
job(context.Background(), func(result string, err error) bool {
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
results = append(results, result)
}
return true
})
}
// Sort for deterministic output since job order may vary
sort.Strings(results)
for _, result := range results {
fmt.Println(result)
}
Output: processed job 1 processed job 2
Example (CapacityLimit) ¶
Example demonstrates queue capacity limits and error handling.
// Create a queue with very small capacity
queue := NewQueueWithCapacity[string](1)
// Add jobs up to and beyond capacity
queue.Push(func(ctx context.Context, yield func(string, error) bool) {
yield("job 1", nil)
})
queue.Push(func(ctx context.Context, yield func(string, error) bool) {
yield("job 2", nil) // This will be replaced with an error job
})
queue.Done()
// Process jobs and handle capacity errors
for job := range queue.Pull() {
job(context.Background(), func(result string, err error) bool {
if errors.Is(err, ErrFull) {
fmt.Println("Queue was full")
} else if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
fmt.Printf("Result: %s\n", result)
}
return true
})
}
Output: Result: job 1 Queue was full
Example (Shutdown) ¶
Example demonstrates graceful shutdown with queue coordination.
queue := NewQueue[string]()
// Collect results for deterministic output
var results []string
var mu sync.Mutex
// Start a producer goroutine
go func() {
for i := 1; i <= 3; i++ {
value := i // Capture loop variable
queue.Push(func(ctx context.Context, yield func(string, error) bool) {
yield(fmt.Sprintf("work item %d", value), nil)
})
time.Sleep(10 * time.Millisecond)
}
queue.Done() // Signal no more work
}()
// Process work as it arrives
for job := range queue.Pull() {
job(context.Background(), func(result string, err error) bool {
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
mu.Lock()
results = append(results, result)
mu.Unlock()
}
return true
})
}
// Wait ensures all jobs are finished before proceeding
queue.Wait()
// Sort for deterministic output since job completion order may vary
sort.Strings(results)
for _, result := range results {
fmt.Printf("Completed: %s\n", result)
}
fmt.Println("All work completed")
Output: Completed: work item 1 Completed: work item 2 Completed: work item 3 All work completed
func NewQueue ¶
NewQueue creates a new job queue with the default capacity. The queue will accept up to DefaultQueueCapacity jobs before dropping additional jobs and replacing them with error-emitting jobs that yield ErrFull.
Type parameter T specifies the type of values that jobs will produce.
func NewQueueWithCapacity ¶
NewQueueWithCapacity creates a new job queue with the specified capacity. The capacity determines the maximum number of jobs that can be queued before additional jobs are dropped.
When the queue reaches capacity, additional jobs are dropped and replaced with error-emitting jobs that yield ErrFull when processed.
Type parameter T specifies the type of values that jobs will produce.
func (*Queue[T]) Done ¶
func (q *Queue[T]) Done()
Done marks the queue as closed for new jobs and signals all blocked consumers to stop pulling jobs once the queue is empty.
After Done() is called: - Push() will panic if called - Pull() will terminate once all remaining jobs are consumed - New Pull() calls will return immediately if the queue is empty
Done is safe for concurrent use and is typically called by producers to signal that no more work will be submitted.
func (*Queue[T]) Flush ¶
func (q *Queue[T]) Flush()
Flush removes all pending jobs from the queue without processing them.
All flushed jobs are marked as done in the wait group, allowing Wait() to proceed without waiting for the flushed jobs to be processed.
This method is useful for graceful shutdown scenarios where you want to stop processing new work but still wait for already-started jobs to complete.
Flush is safe for concurrent use.
func (*Queue[T]) Pull ¶
Pull returns an iterator that yields jobs from the queue in FIFO order.
The iterator blocks when the queue is empty and waits for new jobs to be pushed. It terminates when Done() is called and all remaining jobs have been consumed.
Each job yielded by the iterator is automatically marked as done in the wait group when the iterator advances, enabling Wait() to track completion.
Pull is safe for concurrent use by multiple consumers. Jobs are distributed among consumers in a round-robin fashion.
The iterator can be stopped early by returning false from the yield function.
func (*Queue[T]) Push ¶
Push adds a job to the queue for processing.
If the queue has reached its capacity, the job will be dropped and replaced with an error-emitting job that yields ErrFull when processed. This prevents unbounded memory growth without blocking the caller.
Push panics if called on a queue that has been marked as done via Done(). This prevents adding jobs to a queue that consumers have stopped processing.
Push is safe for concurrent use and will wake up blocked consumers.
func (*Queue[T]) Wait ¶
func (q *Queue[T]) Wait()
Wait blocks until all jobs that have been pushed to the queue have been processed by consumers via Pull().
Wait tracks job completion through an internal wait group that is incremented when jobs are pushed and decremented when jobs are consumed from Pull().
This method is useful for ensuring all work has completed before proceeding or shutting down the application.
type Task ¶
Task represents a simple function that performs work and may return an error. Tasks are used with TaskQueue for scenarios where you only care about success/failure rather than producing specific result values.
Tasks should respect context cancellation and return promptly when the context is cancelled.
type TaskQueue ¶
type TaskQueue Queue[struct{}]
TaskQueue is a specialized queue for processing Task functions. It's a convenience wrapper around Queue[struct{}] for scenarios where you only need to track success/failure of operations rather than collecting specific result values.
Example ¶
Example demonstrates TaskQueue for simple success/failure operations.
taskQueue := NewTaskQueue()
// Collect task completions for deterministic output
var completed []string
var mu sync.Mutex
// Add tasks that perform operations
taskQueue.Push(func(ctx context.Context) error {
mu.Lock()
completed = append(completed, "Task 1 completed")
mu.Unlock()
return nil
})
taskQueue.Push(func(ctx context.Context) error {
mu.Lock()
completed = append(completed, "Task 2 completed")
mu.Unlock()
return nil
})
// Process all tasks
if err := ProcessTasks(context.Background(), taskQueue); err != nil {
fmt.Printf("Error: %v\n", err)
} else {
// Sort for deterministic output since task order may vary
sort.Strings(completed)
for _, msg := range completed {
fmt.Println(msg)
}
fmt.Println("All tasks completed successfully")
}
Output: Task 1 completed Task 2 completed All tasks completed successfully
func NewTaskQueue ¶
func NewTaskQueue() *TaskQueue
NewTaskQueue creates a new task queue with the default capacity. Task queues are optimized for scenarios where you only need to track success/failure of operations.
func NewTaskQueueWithCapacity ¶
NewTaskQueueWithCapacity creates a new task queue with the specified capacity. The capacity determines how many tasks can be queued before additional tasks are dropped and ErrFull is emitted.