Documentation
¶
Overview ¶
Package conman provides a concurrency manager that allows setting a limit to the number of tasks that can run concurrently. It provides an intuitive interface for defining and concurrently running any type of tasks.
Basic usage:
cm, err := conman.New[int](5) // concurrency limit of 5 (minimum is 2)
if err != nil {
log.Fatal(err)
}
// Define a task
type myTask struct{}
func (t *myTask) Execute(ctx context.Context) (int, error) {
return 42, nil
}
// Run the task
err = cm.Run(ctx, &myTask{})
// Wait for completion and get results
cm.Wait()
results := cm.Outputs()
The package also supports automatic retry mechanisms with configurable backoff strategies for tasks that may fail temporarily.
Index ¶
- type ConMan
- type RetriableError
- func (e *RetriableError) Error() string
- func (e *RetriableError) WithExponentialBackoff() *RetriableError
- func (e *RetriableError) WithLinearBackoff() *RetriableError
- func (e *RetriableError) WithNoBackoff() *RetriableError
- func (e *RetriableError) WithRetryConfig(config *RetryConfig) (*RetriableError, error)
- type RetryConfig
- type Task
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConMan ¶
type ConMan[T any] struct { // contains filtered or unexported fields }
ConMan a structure to manage multiple tasks running concurrently while ensuring the total number of running tasks doesn't exceed a certain concurrency limit
func New ¶
New creates a new ConMan instance with the specified concurrency limit.
The concurrency limit determines the maximum number of tasks that can run concurrently. The limit must be at least 2 to ensure meaningful concurrency.
Parameters:
- concurrencyLimit: Maximum number of concurrent tasks (must be ≥ 2)
Returns:
- *ConMan[T]: A new ConMan instance
- error: An error if concurrencyLimit is less than 2
Example:
cm, err := conman.New[int](5) // Allow up to 5 concurrent tasks
if err != nil {
return fmt.Errorf("failed to create ConMan: %w", err)
}
func (*ConMan[T]) Errors ¶
Errors returns a slice of all task execution errors.
Only errors from tasks that failed during execution are included. Errors are collected in the order they occur.
Returns:
- []error: Slice of task execution errors
func (*ConMan[T]) Outputs ¶
func (c *ConMan[T]) Outputs() []T
Outputs returns a slice of successful task results.
Only results from tasks that completed without errors are included. Results are collected in the order tasks complete, not submission order.
Returns:
- []T: Slice of successful task results
func (*ConMan[T]) Run ¶
Run executes a task concurrently, respecting the concurrency limit.
If the concurrency limit is reached, this method blocks until a slot becomes available. The task runs in a separate goroutine and results are collected automatically.
Parameters:
- ctx: Context for cancellation and timeout control
- t: Task implementing the Task[T] interface
Returns:
- error: Context cancellation error if ctx is cancelled before task starts Returns nil if task is successfully dispatched
Note: This method only returns errors related to task dispatch.
Task execution errors are collected and accessible via Errors().
func (*ConMan[T]) Wait ¶
Wait blocks until all previously dispatched tasks have completed.
This method should be called after all Run() calls to ensure all tasks have finished execution before accessing results or errors.
Parameters:
- ctx: Context for cancellation and timeout control
Returns:
- error: Context cancellation error if ctx is cancelled before all tasks complete Returns nil if all tasks complete successfully
Note: This method blocks until all tasks finish or context is cancelled. After calling Wait(), you can access results via Outputs() and errors via Errors().
type RetriableError ¶ added in v0.2.0
type RetriableError struct {
Err error
RetryConfig *RetryConfig
}
RetriableError is an error type that indicates a task should be retried. It includes an embedded RetryConfig to specify the retry strategy.
func (*RetriableError) Error ¶ added in v0.2.0
func (e *RetriableError) Error() string
Error returns the error message of the underlying error.
func (*RetriableError) WithExponentialBackoff ¶ added in v0.3.0
func (e *RetriableError) WithExponentialBackoff() *RetriableError
WithExponentialBackoff configures the error to use exponential backoff retry strategy. Returns the RetriableError for method chaining.
func (*RetriableError) WithLinearBackoff ¶ added in v0.3.0
func (e *RetriableError) WithLinearBackoff() *RetriableError
WithLinearBackoff configures the error to use linear backoff retry strategy. Returns the RetriableError for method chaining.
func (*RetriableError) WithNoBackoff ¶ added in v0.3.0
func (e *RetriableError) WithNoBackoff() *RetriableError
WithNoBackoff configures the error to use immediate retries without delays. Returns the RetriableError for method chaining.
func (*RetriableError) WithRetryConfig ¶ added in v0.4.2
func (e *RetriableError) WithRetryConfig(config *RetryConfig) (*RetriableError, error)
type RetryConfig ¶ added in v0.3.0
type RetryConfig struct {
MaxAttempts int // Maximum number of retry attempts
InitialDelay int64 // Initial delay in milliseconds
BackoffFactor float64 // Multiplier for exponential backoff
MaxDelay int64 // Maximum delay in milliseconds
Jitter bool // Whether to add random jitter to delays
}
RetryConfig defines the retry behavior for operations that may fail temporarily. It includes parameters for controlling the number of attempts, delays, and backoff strategy.
type Task ¶
type Task[T any] interface { // Execute runs the task with the provided context. // // Parameters: // - ctx: Context for cancellation and timeout control // // Returns: // - T: The result of the task execution // - error: Any error encountered during execution // Return *RetriableError to trigger retry logic Execute(ctx context.Context) (T, error) }
Task defines the interface that all executable tasks must implement. Any type that implements this method can be run concurrently through the ConMan.
The Execute method should be context-aware and respect context cancellation. It can optionally return a *RetriableError to trigger automatic retry logic.