conman

package module
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 25, 2025 License: GPL-2.0 Imports: 6 Imported by: 0

README

ConMan

ConMan is a concurrency manager for Go that allows setting a limit to the number of tasks that can run concurrently. It provides an intuitive interface for defining and concurrently running any type of tasks.

Usage

Define a Task, which is a stuct implementing the Execute function. Example:

type sum struct {
    op1 int
    op2 int
}

func (s *sum) Execute(ctx context.Context) (int, error) {
    return s.op1 + s.op2, nil
}

Then, create a new Concurrency Manager meant to run tasks that return an int value, with a concurrency limit. The concurrency limit must be at least 2. Example:

cm, err := conman.New[int](5) // concurrency limit of 5
if err != nil {
    // handle error - invalid concurrency limit
    log.Fatal(err)
}

Note: The concurrency limit must be at least 2. Values less than 2 will return an error.

Finally, run as many tasks as needed. Example:

var err error
err = cm.Run(ctx, &sum{op1: 234, op2: 987})
// handle error ...
err = cm.Run(ctx, &sum{op1: 3455, op2: 200})
// handle error ...
// ...
err = cm.Run(ctx, &sum{op1: 905, op2: 7329})
// handle error ...

You can wait for all the tasks to complete before moving on using cm.Wait().

The outputs from all the tasks are collected in cm.Outputs(), and errors can be retrieved via cm.Errors().

If the context ctx is cancelled for whatever reason, all subsequent calls to cm.Run() will return an error about context cancellation.

Retries

To automatically retry a task when it fails, the Execute function must return a pointer to a RetriableError object. This object contains the original error and supports configurable retry strategies.

Retry Strategies

ConMan provides three built-in retry strategies:

Exponential Backoff (Default)

Delays increase exponentially with each retry attempt, with jitter.

type flakyTask struct {
    runCount int
}

func (t *flakyTask) Execute(ctx context.Context) (int, error) {
    if t.runCount < 2 {
        t.runCount++
        // Retry with exponential backoff
        return -1, &RetriableError{Err: fmt.Errorf("Try again")}.WithExponentialBackoff()
    }
    return 42, nil
}
Linear Backoff

Delays increase linearly with each retry attempt.

err := &RetriableError{Err: fmt.Errorf("Try again")}
return -1, err.WithLinearBackoff()
No Backoff

Immediate retries without delays.

err := &RetriableError{Err: fmt.Errorf("Try again")}
return -1, err.WithNoBackoff()
Custom Retry Configuration

Each retry strategy can be customized through the RetryConfig:

type sum struct {
    op1 int
    op2 int
    runCount int
}

func (s *sum) Execute(ctx context.Context) (int, error) {
    if s.runCount < 2 {
        s.runCount++
        // Custom retry configuration
        err := &RetriableError{Err: fmt.Errorf("Try again")}
        err.RetryConfig = &RetryConfig{
            MaxAttempts:   3,
            InitialDelay:  100,  // milliseconds
            BackoffFactor: 2.0,
            MaxDelay:      5000, // milliseconds
            Jitter:        true,
        }
        return -1, err
    }
    return s.op1 + s.op2, nil
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cm, err := conman.New[int](5)
if err != nil {
    log.Fatal(err)
}
cm.Run(ctx, &sum{op1: 234, op2: 987})
cm.Run(ctx, &sum{op1: 3455, op2: 200})

Complete Example

Here's a complete example of running multiple Fibonacci calculations concurrently using ConMan with a concurrency limit of 2.

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/bilyes/conman"
)

type slowFibo struct {
    operand int
}

func (s *slowFibo) fibonacci(i int) int {
    if i == 0 || i == 1 {
        return i
    }
    return s.fibonacci(i-1) + s.fibonacci(i-2)
}

func (s *slowFibo) Execute(ctx context.Context) (int, error) {
    // Long process...
    time.Sleep(2 * time.Second)
    switch {
    case <-ctx.Done():
        return -1, ctx.Err()
    default:
        return s.fibonacci(s.operand), nil
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // Create a concurrency manager with a limit of 2.
    // This means that the total number of concurrently running
    // tasks will never exceed 2.
    cm, err := conman.New[int](2)
    if err != nil {
        log.Fatal(err)
    }

    for _, op := range []int{5, 8, 13, 16} {
        // Dispatch task executions with the context ctx
        if err := cm.Run(ctx, &slowFibo{operand: op}); err != nil {
            // There was an error with dispatching the task execution.
            // This is not an error caused by the execution itself. Those errors are handled
            // by ConMan internally and are accessible through the Errors() function.
            fmt.Printf("Error for operand %s: %v", err)
        }
    }

    // Wait until all tasks are completed
    if err := cm.Wait(ctx); err != nil {
		t.Fatalf("ConMan Wait returned an unexpected error: %v", err)
    }

    // Check if there were any errors
    if errs := cm.Errors(); len(errs) > 0 {
        log.Fatalf("There were calculation errors: %v", errs)
    }

    // Print the results
    fmt.Printf("Here are the results: %v", cm.Outputs())
}

Documentation

Overview

Package conman provides a concurrency manager that allows setting a limit to the number of tasks that can run concurrently. It provides an intuitive interface for defining and concurrently running any type of tasks.

Basic usage:

cm, err := conman.New[int](5) // concurrency limit of 5 (minimum is 2)
if err != nil {
	log.Fatal(err)
}

// Define a task
type myTask struct{}
func (t *myTask) Execute(ctx context.Context) (int, error) {
	return 42, nil
}

// Run the task
err = cm.Run(ctx, &myTask{})

// Wait for completion and get results
cm.Wait()
results := cm.Outputs()

The package also supports automatic retry mechanisms with configurable backoff strategies for tasks that may fail temporarily.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConMan

type ConMan[T any] struct {
	// contains filtered or unexported fields
}

ConMan a structure to manage multiple tasks running concurrently while ensuring the total number of running tasks doesn't exceed a certain concurrency limit

func New

func New[T any](concurrencyLimit int64) (*ConMan[T], error)

New creates a new ConMan instance with the specified concurrency limit.

The concurrency limit determines the maximum number of tasks that can run concurrently. The limit must be at least 2 to ensure meaningful concurrency.

Parameters:

  • concurrencyLimit: Maximum number of concurrent tasks (must be ≥ 2)

Returns:

  • *ConMan[T]: A new ConMan instance
  • error: An error if concurrencyLimit is less than 2

Example:

cm, err := conman.New[int](5) // Allow up to 5 concurrent tasks
if err != nil {
	return fmt.Errorf("failed to create ConMan: %w", err)
}

func (*ConMan[T]) Errors

func (c *ConMan[T]) Errors() []error

Errors returns a slice of all task execution errors.

Only errors from tasks that failed during execution are included. Errors are collected in the order they occur.

Returns:

  • []error: Slice of task execution errors

func (*ConMan[T]) Outputs

func (c *ConMan[T]) Outputs() []T

Outputs returns a slice of successful task results.

Only results from tasks that completed without errors are included. Results are collected in the order tasks complete, not submission order.

Returns:

  • []T: Slice of successful task results

func (*ConMan[T]) Run

func (c *ConMan[T]) Run(ctx context.Context, t Task[T]) error

Run executes a task concurrently, respecting the concurrency limit.

If the concurrency limit is reached, this method blocks until a slot becomes available. The task runs in a separate goroutine and results are collected automatically.

Parameters:

  • ctx: Context for cancellation and timeout control
  • t: Task implementing the Task[T] interface

Returns:

  • error: Context cancellation error if ctx is cancelled before task starts Returns nil if task is successfully dispatched

Note: This method only returns errors related to task dispatch.

Task execution errors are collected and accessible via Errors().

func (*ConMan[T]) Wait

func (c *ConMan[T]) Wait(ctx context.Context) error

Wait blocks until all previously dispatched tasks have completed.

This method should be called after all Run() calls to ensure all tasks have finished execution before accessing results or errors.

Parameters:

  • ctx: Context for cancellation and timeout control

Returns:

  • error: Context cancellation error if ctx is cancelled before all tasks complete Returns nil if all tasks complete successfully

Note: This method blocks until all tasks finish or context is cancelled. After calling Wait(), you can access results via Outputs() and errors via Errors().

type RetriableError added in v0.2.0

type RetriableError struct {
	Err         error
	RetryConfig *RetryConfig
}

RetriableError is an error type that indicates a task should be retried. It includes an embedded RetryConfig to specify the retry strategy.

func (*RetriableError) Error added in v0.2.0

func (e *RetriableError) Error() string

Error returns the error message of the underlying error.

func (*RetriableError) WithExponentialBackoff added in v0.3.0

func (e *RetriableError) WithExponentialBackoff() *RetriableError

WithExponentialBackoff configures the error to use exponential backoff retry strategy. Returns the RetriableError for method chaining.

func (*RetriableError) WithLinearBackoff added in v0.3.0

func (e *RetriableError) WithLinearBackoff() *RetriableError

WithLinearBackoff configures the error to use linear backoff retry strategy. Returns the RetriableError for method chaining.

func (*RetriableError) WithNoBackoff added in v0.3.0

func (e *RetriableError) WithNoBackoff() *RetriableError

WithNoBackoff configures the error to use immediate retries without delays. Returns the RetriableError for method chaining.

func (*RetriableError) WithRetryConfig added in v0.4.2

func (e *RetriableError) WithRetryConfig(config *RetryConfig) (*RetriableError, error)

type RetryConfig added in v0.3.0

type RetryConfig struct {
	MaxAttempts   int     // Maximum number of retry attempts
	InitialDelay  int64   // Initial delay in milliseconds
	BackoffFactor float64 // Multiplier for exponential backoff
	MaxDelay      int64   // Maximum delay in milliseconds
	Jitter        bool    // Whether to add random jitter to delays
}

RetryConfig defines the retry behavior for operations that may fail temporarily. It includes parameters for controlling the number of attempts, delays, and backoff strategy.

type Task

type Task[T any] interface {
	// Execute runs the task with the provided context.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout control
	//
	// Returns:
	//   - T: The result of the task execution
	//   - error: Any error encountered during execution
	//            Return *RetriableError to trigger retry logic
	Execute(ctx context.Context) (T, error)
}

Task defines the interface that all executable tasks must implement. Any type that implements this method can be run concurrently through the ConMan.

The Execute method should be context-aware and respect context cancellation. It can optionally return a *RetriableError to trigger automatic retry logic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL