async

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2025 License: MIT Imports: 8 Imported by: 1

README

kelindar/async
Go Version PkgGoDev Go Report Card License Coverage

Concurrent Task Orchestration

This library provides fast, type-safe task orchestration for Go, designed for efficient concurrent processing and asynchronous operations. It simplifies complex orchestration patterns while maintaining excellent performance and memory efficiency.

  • Type-Safe Generics: Full compile-time type safety with Go generics, eliminating runtime type assertions
  • High Performance: Optimized for minimal allocations (2 allocs/task) and maximum throughput
  • Flexible Patterns: Support for fork/join, throttling, worker pools, and repeating tasks
  • Context Aware: Full context propagation with cancellation and timeout support
  • Thread-Safe: Safe for concurrent use across multiple goroutines
  • Zero Dependencies: Pure Go implementation with no external dependencies

Use When:

  • ✅ Building concurrent data processing pipelines
  • ✅ Orchestrating multiple API calls or I/O operations
  • ✅ Implementing worker pools with controlled concurrency
  • ✅ Creating reactive systems with task composition
  • ✅ Managing background jobs with cancellation support

Quick Start

// Create and run a task
task := async.Invoke(context.TODO(), func(ctx context.Context) (string, error) {
    time.Sleep(100 * time.Millisecond)
    return "Hello, World!", nil
})

// Wait for the result
result, err := task.Outcome()
if err != nil {
    panic(err)
}

fmt.Println(result) // Output: Hello, World!
fmt.Printf("Duration: %v\n", task.Duration())

The library supports several common concurrency patterns out of the box:

  • Worker Pools](#worker-pools)** - Controlled concurrency with Consume and InvokeAll
  • Fork/Join - Parallel task execution with result aggregation
  • Throttling - Rate limiting with Consume and custom concurrency
  • Repeating - Periodic execution with Repeat

Introduction

Task is the fundamental building block, similar to Java's Future or JavaScript's Promise. It represents an asynchronous operation with full type safety using Go generics. Tasks are lightweight (only 2 allocations) and provide a clean abstraction over goroutines and channels, handling synchronization details while exposing a simple API for concurrent execution.

// Create a task with type safety
task := async.NewTask(func(ctx context.Context) (int, error) {
    return 42, nil
})

// Check if completed (non-blocking)
if task.State() == async.IsCompleted {
    result, err := task.Outcome() // Won't block
}

// Cancel if needed
task.Cancel()

Tasks follow a well-defined state machine with atomic operations for thread safety. They progress from IsCreatedIsRunningIsCompleted/IsCancelled. State transitions are irreversible and prevent common concurrency bugs like double-execution or race conditions during cancellation.

const (
    IsCreated   State = iota // Newly created task
    IsRunning                // Currently executing  
    IsCompleted              // Finished successfully or with error
    IsCancelled              // Cancelled before or during execution
)

The library provides deep integration with Go's context package for cancellation, timeout, and deadline management. Tasks automatically respect context cancellation at all stages of execution, with proper error propagation for timeouts and shutdown scenarios. This enables sophisticated patterns like graceful shutdown and hierarchical task cancellation.

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

task := async.Invoke(ctx, func(ctx context.Context) (string, error) {
    select {
    case <-time.After(10*time.Second):
        return "too slow", nil
    case <-ctx.Done():
        return "", ctx.Err() // Will return timeout error
    }
})

result, err := task.Outcome()
// err will be context.DeadlineExceeded

Fork/Join Pattern

The Fork/Join pattern is ideal for decomposing a larger problem into independent subtasks that can run concurrently. This pattern shines when you have multiple operations that don't depend on each other but whose results you need to combine. Using InvokeAll with concurrency set to 0 provides unlimited parallelism, making it perfect for scenarios like fetching data from multiple APIs, processing independent files, or performing parallel computations.

tasks := []async.Task[string]{
    async.NewTask(func(ctx context.Context) (string, error) {
        return "user data", nil
    }),
    async.NewTask(func(ctx context.Context) (string, error) {
        return "user preferences", nil
    }),
    async.NewTask(func(ctx context.Context) (string, error) {
        return "user history", nil
    }),
}

// Run all tasks concurrently (unlimited concurrency)
result := async.InvokeAll(context.Background(), 0, tasks)
_, err := result.Outcome()

Throttled Execution

Throttled execution prevents resource exhaustion by limiting the number of concurrent operations. This pattern is essential when dealing with rate-limited APIs, database connections with limited pools, or any scenario where unbounded concurrency could overwhelm system resources. The library uses a batch processing approach that processes tasks in groups, ensuring predictable resource usage while maintaining high throughput.

// Process 1000 tasks with max 10 concurrent
var tasks []async.Task[string]
for i := 0; i < 1000; i++ {
    i := i // capture loop variable
    tasks = append(tasks, async.NewTask(func(ctx context.Context) (string, error) {
        // Process item (placeholder function)
        return fmt.Sprintf("processed item %d", i), nil
    }))
}

result := async.InvokeAll(context.Background(), 10, tasks)
_, err := result.Outcome()

Worker Pool Pattern

The worker pool pattern efficiently processes a stream of tasks using a fixed number of worker goroutines. This pattern is perfect for scenarios where tasks arrive dynamically and you want to maintain consistent resource usage. The Consume function creates dedicated workers that pull tasks from a channel, providing excellent performance for high-throughput scenarios while maintaining bounded resource consumption.

// Create a channel of tasks
taskQueue := make(chan async.Task[string], 100)

// Add tasks to the queue
go func() {
    defer close(taskQueue)
    for i := 0; i < 50; i++ {
        task := async.NewTask(func(ctx context.Context) (string, error) {
            return fmt.Sprintf("Processed item %d", i), nil
        })
        taskQueue <- task
    }
}()

// Process with 3 concurrent workers
consumer := async.Consume(context.Background(), 3, taskQueue)
_, err := consumer.Outcome()

Repeating Tasks

Repeating tasks enable periodic execution of operations at regular intervals. This pattern is useful for implementing heartbeats, health checks, periodic data synchronization, or any recurring background operations. The implementation uses Go's ticker mechanism and properly handles context cancellation, making it suitable for long-running services that need graceful shutdown capabilities.

// Heartbeat every 30 seconds
heartbeat := async.Repeat(context.Background(), 30*time.Second, 
    func(ctx context.Context) (string, error) {
        // Send heartbeat (placeholder function)
        return "heartbeat sent", nil
    })

// Stop after 5 minutes
time.Sleep(5 * time.Minute)
heartbeat.Cancel()

Benchmarks

The benchmarks demonstrate the library's excellent performance characteristics across different usage patterns.

cpu: 13th Gen Intel(R) Core(TM) i7-13700K
BenchmarkTask/Consume-24         	    4054	    309833 ns/op	  145127 B/op	    2014 allocs/op
BenchmarkTask/Invoke-24          	 2361956	       507.6 ns/op	     128 B/op	       2 allocs/op
BenchmarkTask/InvokeAll-24       	    4262	    303242 ns/op	  161449 B/op	    2015 allocs/op
BenchmarkTask/Completed-24       	89886966	        13.36 ns/op	      32 B/op	       1 allocs/op
BenchmarkTask/Errored-24         	89026714	        13.50 ns/op	      32 B/op	

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrPanic = errors.New("panic in async task")
)

Functions

func CancelAll

func CancelAll[T any](tasks []Task[T])

CancelAll cancels all specified tasks.

func WaitAll

func WaitAll[T any](tasks []Task[T])

WaitAll waits for all tasks to finish.

Types

type State

type State byte

State represents the state enumeration for a task.

const (
	IsCreated   State = iota // IsCreated represents a newly created task
	IsRunning                // IsRunning represents a task which is currently running
	IsCompleted              // IsCompleted represents a task which was completed successfully or errored out
	IsCancelled              // IsCancelled represents a task which was cancelled or has timed out
)

Various task states

type Task

type Task[T any] interface {
	Run(ctx context.Context) Task[T]
	Cancel()
	State() State
	Outcome() (T, error)
	Duration() time.Duration
}

Task represents a unit of work to be done

func Completed added in v1.1.0

func Completed[T any](result T) Task[T]

Completed creates a completed task with the given result.

func Consume

func Consume[T any](ctx context.Context, concurrency int, tasks chan Task[T]) Task[T]

Consume runs the tasks with a specific max concurrency

func Failed added in v1.3.0

func Failed[T any](err error) Task[T]

Failed creates a failed task with the given error.

func Invoke

func Invoke[T any](ctx context.Context, action Work[T]) Task[T]

Invoke creates a new tasks and runs it asynchronously.

func InvokeAll

func InvokeAll[T any](ctx context.Context, concurrency int, tasks []Task[T]) Task[T]

InvokeAll runs the tasks with a specific max concurrency

Example
resChan := make(chan int, 6)
works := make([]Work[any], 6, 6)
for i := range works {
	j := i
	works[j] = func(context.Context) (any, error) {
		fmt.Println(j / 2)
		time.Sleep(time.Millisecond * 10)
		return nil, nil
	}
}
tasks := NewTasks(works...)
InvokeAll(context.Background(), 2, tasks)
WaitAll(tasks)
close(resChan)
res := []int{}
for r := range resChan {
	res = append(res, r)
}
Output:

0
0
1
1
2
2

func NewTask

func NewTask[T any](action Work[T]) Task[T]

NewTask creates a new task.

func NewTasks

func NewTasks[T any](actions ...Work[T]) []Task[T]

NewTasks creates a set of new tasks.

func Repeat

func Repeat[T any](ctx context.Context, interval time.Duration, action Work[T]) Task[T]

Repeat performs an action asynchronously on a predetermined interval.

type Work

type Work[T any] func(context.Context) (T, error)

Work represents a handler to execute

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL