tasker

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2025 License: MIT Imports: 8 Imported by: 0

README ¶

Tasker: A Robust Concurrent Task Management Library for Go

Go Reference Build Status

Tasker is a powerful and flexible Go library designed for efficient management of concurrent tasks. It provides a highly customizable worker pool, dynamic scaling (bursting), priority queuing, and robust resource lifecycle management, making it ideal for processing background jobs, handling I/O-bound operations, or managing CPU-intensive computations with controlled concurrency.


Overview & Features

In modern applications, efficiently managing concurrent tasks and shared resources is critical. tasker addresses this by providing a comprehensive solution that abstracts away the complexities of goroutine management, worker pools, and resource lifecycles. It allows developers to define tasks that operate on specific resources (e.g., database connections, external API clients, custom compute units) and then queue these tasks for execution, letting tasker handle the underlying concurrency, scaling, and error recovery.

This library is particularly useful for applications that:

  • Need to process a high volume of background jobs reliably.
  • Perform operations on limited or expensive shared resources.
  • Require dynamic adjustment of processing capacity based on load.
  • Demand prioritization of certain critical tasks over others.
Key Features
  • Concurrent Task Execution: Manages a pool of workers to execute tasks concurrently, ensuring optimal utilization of system resources.
  • Generic Resource Management: Define custom OnCreate and OnDestroy functions for any resource type (R), guaranteeing proper setup and cleanup of external dependencies or expensive objects.
  • Rate-Based Dynamic Worker Scaling: Automatically scales the number of workers up or down based on the real-time task arrival and completion rates. This ensures that the system's throughput dynamically matches the incoming workload, optimizing resource utilization and responsiveness without manual tuning.
  • Priority Queues: Supports both standard (QueueTask) and high-priority (QueueTaskWithPriority) task queues, allowing critical operations to bypass regular tasks and get processed faster.
  • Immediate Task Execution with Resource Pooling (RunTask): Execute tasks synchronously, either by acquiring a resource from a pre-allocated pool or by temporarily creating a new one. This is ideal for urgent, low-latency operations that should not be delayed by queuing.
  • Customizable Health Checks & Retries: Define custom logic (CheckHealth) to determine if a given error indicates an "unhealthy" state for a worker or its associated resource, enabling automatic worker replacement and configurable task retries for transient failures.
  • "At-Most-Once" Task Execution: Provides QueueTaskOnce and QueueTaskWithPriorityOnce methods for non-idempotent operations, preventing re-queuing on health-related failures.
  • Graceful & Immediate Shutdown: Offers both Stop() for graceful shutdown (waiting for active tasks to complete) and Kill() for immediate termination (cancelling all tasks), giving control over shutdown behavior.
  • Real-time Performance Metrics: Access live statistics (Stats()) and comprehensive performance metrics (Metrics()), including task arrival/completion rates, average/min/max/percentile execution times, average wait times, and success/failure rates, for robust monitoring and debugging.
  • Custom Logging: Integrate with your preferred logging solution by providing an implementation of the tasker.Logger interface.

Installation & Setup

Prerequisites
  • Go 1.24.3 or higher
Installation Steps

To add tasker to your Go project, use go get:

go get github.com/asaidimu/tasker
Verification

You can verify the installation and see tasker in action by building and running the provided examples:

# Navigate to the examples directory within your Go module path
# This assumes your GOPATH is set correctly, typically within your user home directory.
cd "$(go env GOPATH)/src/github.com/asaidimu/tasker/examples/basic"
go run main.go

cd "$(go env GOPATH)/src/github.com/asaidimu/tasker/examples/intermediate"
go run main.go

cd "$(go env GOPATH)/src/github.com/asaidimu/tasker/examples/advanced"
go run main.go

Usage Documentation

tasker is designed to be highly configurable and flexible. All interactions happen through the tasker.NewTaskManager constructor and the returned TaskManager interface.

Core Concepts
  • Resource (R): This is the generic type of resource your tasks will operate on. It could be anything: a database connection, an HTTP client, a custom processing struct, a CPU/GPU compute unit, or any other external dependency or expensive object that needs managed lifecycle. tasker manages the creation, use, and destruction of these resources.
  • Task Result (E): This is the generic type of value your tasks will return upon successful completion. This allows tasker to be type-safe for various task outputs.
  • Task Function: Your actual work logic is defined as a func(resource R) (result E, err error). This function receives an instance of your defined R resource type. It's expected to return the result E or an error.
  • tasker.Config[R]: A struct used to configure the TaskManager with essential parameters such as resource lifecycle functions (OnCreate, OnDestroy), initial worker counts, dynamic scaling parameters, optional health check logic, and custom logging/metrics.
  • tasker.Manager[R, E]: The concrete implementation of the TaskManager[R, E] interface, providing the core task management capabilities. You instantiate this via tasker.NewTaskManager.
  • QueueTask(func(R) (E, error)) (E, error): Adds a task to the standard queue for asynchronous processing. The call blocks until the task completes and returns its result or error. Suitable for background processing.
  • QueueTaskWithPriority(func(R) (E, error)) (E, error): Adds a task to a dedicated high-priority queue. Tasks in this queue are processed before tasks in the main queue, ensuring faster execution for critical operations. This call also blocks until completion.
  • QueueTaskOnce(func(R) (E, error)) (E, error): Similar to QueueTask, but if the task fails and CheckHealth indicates an unhealthy worker, this task will not be retried. Use for non-idempotent operations where "at-most-once" processing by tasker is desired.
  • QueueTaskWithPriorityOnce(func(R) (E, error)) (E, error): Combines high priority with "at-most-once" execution semantics.
  • RunTask(func(R) (E, error)) (E, error): Executes a task immediately. It attempts to acquire a resource from a pre-allocated pool first. If the pool is empty, it temporarily creates a new resource for the task. This is a synchronous call, blocking until the task finishes. Ideal for urgent, low-latency operations that should not be delayed by queuing.
  • Stop() error: Initiates a graceful shutdown of the manager. It stops accepting new tasks and waits for all currently queued and executing tasks to complete before releasing resources.
  • Kill() error: Initiates an immediate shutdown of the manager. It cancels all running tasks and drops all queued tasks without waiting for them to complete, then releases resources.
  • Stats() TaskStats: Returns a TaskStats struct containing real-time operational statistics, such as active worker counts, queued task counts, and available resources.
  • Metrics() TaskMetrics: Returns a TaskMetrics struct providing comprehensive performance metrics, including task arrival/completion rates, various execution time percentiles (P95, P99), average wait times, and success/failure rates.

Basic Usage: Simple Calculator

This example demonstrates the fundamental setup for tasker using a simple CalculatorResource with two base workers.

// examples/basic/main.go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/asaidimu/tasker"
)

// CalculatorResource represents a simple resource,
// in this case, just a placeholder.
type CalculatorResource struct{}

// onCreate for CalculatorResource - no actual setup needed
func createCalcResource() (*CalculatorResource, error) {
	fmt.Println("INFO: Creating CalculatorResource")
	return &CalculatorResource{}, nil
}

// onDestroy for CalculatorResource - no actual cleanup needed
func destroyCalcResource(r *CalculatorResource) error {
	fmt.Println("INFO: Destroying CalculatorResource")
	return nil
}

func main() {
	fmt.Println("--- Basic Usage: Simple Calculator ---")

	ctx := context.Background()

	// Configure the tasker for our CalculatorResource
	config := tasker.Config[*CalculatorResource]{
		OnCreate:    createCalcResource,
		OnDestroy:   destroyCalcResource,
		WorkerCount: 2, // Two base workers
		Ctx:         ctx,
		// No specific health check or burst settings for this basic example
	}

	// Create a new task manager
	manager, err := tasker.NewTaskManager[*CalculatorResource, int](config) // Tasks will return an int result
	if err != nil {
		log.Fatalf("Error creating task manager: %v", err)
	}
	defer manager.Stop() // Ensure the manager is stopped gracefully

	fmt.Println("Queuing a simple addition task...")
	task1Start := time.Now()
	// Queue a task to perform addition
	go func() {
		sum, err := manager.QueueTask(func(r *CalculatorResource) (int, error) {
			// In a real scenario, 'r' could be a connection to a math service
			time.Sleep(50 * time.Millisecond) // Simulate some work
			a, b := 10, 25
			fmt.Printf("Worker processing: %d + %d\n", a, b)
			return a + b, nil
		})

		if err != nil {
			fmt.Printf("Task 1 failed: %v\n", err)
		} else {
			fmt.Printf("Task 1 (Addition) Result: %d (took %s)\n", sum, time.Since(task1Start))
		}
	}()

	fmt.Println("Queuing another subtraction task...")
	task2Start := time.Now()
	// Queue another task for subtraction
	go func() {
		difference, err := manager.QueueTask(func(r *CalculatorResource) (int, error) {
			time.Sleep(70 * time.Millisecond) // Simulate some work
			a, b := 100, 40
			fmt.Printf("Worker processing: %d - %d\n", a, b)
			return a - b, nil
		})

		if err != nil {
			fmt.Printf("Task 2 failed: %v\n", err)
		} else {
			fmt.Printf("Task 2 (Subtraction) Result: %d (took %s)\n", difference, time.Since(task2Start))
		}
	}()

	// Allow some time for tasks to complete
	time.Sleep(500 * time.Millisecond)

	stats := manager.Stats()
	fmt.Printf("\n--- Current Stats ---\n")
	fmt.Printf("Active Workers: %d\n", stats.ActiveWorkers)
	fmt.Printf("Queued Tasks: %d\n", stats.QueuedTasks)
	fmt.Printf("Available Resources: %d\n", stats.AvailableResources)
	fmt.Println("----------------------")

	fmt.Println("Basic usage example finished.")
}

Expected Output (Illustrative, timings may vary due to concurrency):

--- Basic Usage: Simple Calculator ---
INFO: Creating CalculatorResource
INFO: Creating CalculatorResource
Queuing a simple addition task...
Queuing another subtraction task...
Worker processing: 100 - 40
Worker processing: 10 + 25
Task 2 (Subtraction) Result: 60 (took 70.xxxms)
Task 1 (Addition) Result: 35 (took 70.xxxms)

--- Current Stats ---
Active Workers: 2
Queued Tasks: 0
Available Resources: 0
----------------------
Basic usage example finished.

Project Architecture

tasker is designed with modularity and extensibility in mind, built around goroutines and channels for efficient concurrency management.

Core Components
  • Manager[R, E]: The central orchestrator, implementing the TaskManager interface. It holds the Config, manages task queues, synchronizes operations, and oversees the lifecycle of workers and resources. It's the primary interface for users to interact with tasker.
  • Worker Goroutines: These are the workhorses of tasker. Each worker runs in its own goroutine, typically holding a single long-lived resource (R). Workers continuously pull tasks from the priorityQueue or mainQueue, execute them, handle errors, and manage retries based on the CheckHealth function.
  • Task Queues (mainQueue, priorityQueue): These are Go channels (chan *Task[R, E]) acting as FIFO queues. The priorityQueue is always checked first by workers, ensuring high-priority tasks are processed ahead of standard tasks.
  • Resource Pool (resourcePool): A buffered channel holding pre-allocated R type resources. This pool is primarily used by RunTask operations to provide immediate resource access for synchronous, low-latency task execution, avoiding the main queues.
  • Burst Manager Goroutine: This background goroutine is responsible for dynamic worker scaling. It periodically fetches performance Metrics (specifically TaskArrivalRate and TaskCompletionRate). If the arrival rate significantly exceeds the completion rate, it instructs the Manager to start new burst workers. Conversely, if the system is over-provisioned (completion rate much higher than arrival rate), it signals idle burst workers to shut down.
  • Task[R, E]: An internal struct that encapsulates a task's executable function (run), channels for returning its result (result chan E) and any error (err chan error) to the caller, and a retry counter (retries) for fault tolerance. It also stores queuedAt for metrics calculations.
  • Config[R]: A comprehensive struct that defines the operational parameters for creating a Manager instance. This includes essential functions for resource lifecycle management (OnCreate, OnDestroy), WorkerCount, MaxWorkerCount, dynamic scaling parameters (BurstInterval), retry policy (MaxRetries), and optional custom logging (Logger) and metrics collection (Collector).
  • TaskStats & TaskMetrics: These structs provide real-time snapshots of the Manager's current operational state (Stats()) and in-depth performance statistics (Metrics()), respectively. TaskMetrics includes values like average, min, max, P95, and P99 execution times, wait times, arrival/completion rates, and success/failure rates.
  • Logger and MetricsCollector Interfaces: These interfaces allow for flexible integration with external logging and monitoring systems.
Data Flow
  1. Initialization: Upon calling NewTaskManager, tasker creates the specified number of base workers, initializes the resource pool by calling OnCreate for each pooled resource, and starts the dedicated burst manager goroutine.
  2. Task Submission:
    • Calls to QueueTask (and QueueTaskOnce) send *Task[R, E] instances to the mainQueue.
    • Calls to QueueTaskWithPriority (and QueueTaskWithPriorityOnce) send *Task[R, E] instances to the priorityQueue.
    • Calls to RunTask first attempt to acquire a resource from the resourcePool. If successful, the task is executed directly with that resource. If the pool is empty, a temporary resource is created via OnCreate, used for the task, and then destroyed via OnDestroy.
    • All task submissions are recorded by the MetricsCollector to track arrival rates.
  3. Task Execution: Worker goroutines continuously select tasks from the priorityQueue (preferentially) or the mainQueue. Once a task is acquired, the worker executes the task's run function with its dedicated resource. The MetricsCollector records task start and completion times.
  4. Health Checks & Retries: If a task execution returns an error, tasker invokes the CheckHealth function (if provided). If CheckHealth returns false, indicating an unhealthy state of the worker or its resource, the worker goroutine processing that task is terminated, its resource destroyed (OnDestroy), and a new worker is created to replace it. The original task that caused the unhealthy error will then be re-queued (up to MaxRetries) to be processed by a newly healthy worker. Tasks submitted via *Once methods are not re-queued. Failures and retries are recorded by the MetricsCollector.
  5. Dynamic Scaling: The burst manager periodically analyzes the TaskArrivalRate versus the TaskCompletionRate from the MetricsCollector. If demand outstrips capacity, it dynamically starts additional "burst" workers. If demand subsides, it signals idle burst workers to gracefully shut down, optimizing resource usage.
  6. Graceful Shutdown (Stop): When Stop() is called, the Manager transitions to a "stopping" state, preventing new tasks from being queued. It then cancels its primary context.Context, signaling all worker goroutines and the burst manager to stop processing new tasks. Workers in "stopping" mode will first finish processing any tasks remaining in their queues before exiting. tasker then waits for all goroutines to complete and drains/destroys all managed resources via OnDestroy.
  7. Immediate Shutdown (Kill): When Kill() is called, the Manager transitions to a "killed" state. It immediately cancels its primary context.Context, causing all active worker goroutines to terminate without waiting for tasks to complete. All queued tasks are dropped, and all resources are destroyed.
Extension Points

tasker is designed to be highly pluggable through its function-based configuration, allowing you to seamlessly integrate it into various application contexts:

  • OnCreate func() (R, error): This essential function defines how to instantiate your specific resource R. This could involve connecting to a database, initializing an external SDK client, setting up a specialized compute object, or any other resource provisioning logic.
  • OnDestroy func(R) error: This crucial function defines how to clean up your resource R when a worker shuts down, a temporary resource is no longer needed, or the TaskManager itself is shutting down. It's vital for releasing connections, closing files, deallocating memory, or performing other necessary resource finalization.
  • CheckHealth func(error) bool: Implement custom logic here to determine if a task's returned error indicates an underlying problem with the worker or its resource. Returning false from this function will cause tasker to consider the worker unhealthy, leading to its replacement and a potential retry of the task, significantly enhancing system resilience.
  • Logger Logger: Provide your own implementation of the tasker.Logger interface to integrate tasker's internal logging messages with your application's preferred logging framework (e.g., logrus, zap). The default is a no-op logger.
  • Collector MetricsCollector: Supply a custom implementation of the tasker.MetricsCollector interface to integrate tasker's rich performance metrics with your existing monitoring and observability stack (e.g., Prometheus, Datadog). A default internal collector is provided if none is specified.

Development & Contributing

We welcome contributions to tasker! Whether it's bug reports, feature requests, or code contributions, your input is valuable.

Development Setup

To set up your local development environment:

  1. Clone the repository:
    git clone https://github.com/asaidimu/tasker.git
    cd tasker
    
  2. Ensure Go Modules are tidy:
    go mod tidy
    
  3. Build the project:
    go build -v ./...
    
Scripts

The project includes a Makefile for common development tasks:

  • make build: Compiles the main package and its dependencies.
  • make test: Runs all unit tests with verbose output.
  • make clean: Removes generated executable files.
Testing

To run the test suite:

go test -v ./...

All contributions are expected to pass existing tests and maintain a high level of code coverage. New features or bug fixes should come with appropriate tests.

Contributing Guidelines

We appreciate your interest in contributing! To ensure a smooth contribution process, please follow these guidelines:

  1. Fork the repository and create your branch from main.
  2. Keep your code clean and well-documented. Follow Go's idiomatic style and best practices.
  3. Write clear, concise commit messages that describe your changes. We generally follow Conventional Commits (e.g., feat: add new feature, fix: resolve bug).
  4. Ensure your changes are tested. New features require new tests, and bug fixes should include a test that reproduces the bug.
  5. Open a Pull Request against the main branch. Provide a detailed description of your changes, including context, problem solved, and how it was solved.
Issue Reporting

If you encounter any bugs or have feature requests, please open an issue on the GitHub Issues page.

When reporting a bug, please include:

  • A clear, concise description of the problem.
  • Steps to reproduce the behavior.
  • Expected behavior.
  • Actual behavior.
  • Any relevant error messages or logs.
  • Your Go version and operating system.

Additional Information

Troubleshooting
  • "Task manager is shutting down: cannot queue task": This error occurs if you attempt to QueueTask, QueueTaskWithPriority, QueueTaskOnce, QueueTaskWithPriorityOnce, or RunTask after manager.Stop() or manager.Kill() has been called, or if the Ctx provided during NewTaskManager initialization has been cancelled. Ensure tasks are only submitted while the manager is actively running.
  • Resource Creation Failure: If your OnCreate function returns an error, tasker will log it, and the associated worker will not start (or a RunTask call will fail). Ensure your resource creation logic is robust and handles transient issues gracefully.
  • Workers Not Starting/Stopping as Expected:
    • Verify your WorkerCount and MaxWorkerCount settings in tasker.Config.
    • For dynamic scaling (bursting), ensure BurstInterval is configured and not set to 0.
    • Check that your main context.Context (passed as Ctx in Config) and its cancel function are managed correctly, especially for graceful shutdown scenarios.
    • Review your CheckHealth logic; an incorrect implementation might lead to workers constantly restarting (thrashing) or not restarting when they should.
  • Deadlocks/Goroutine Leaks: While tasker is designed to prevent these within its core logic, improper usage (e.g., blocking indefinitely within your OnCreate, OnDestroy, or task functions, or not using buffered channels for task results outside the library) can lead to such issues. Always ensure your custom functions (OnCreate, OnDestroy, taskFunc) do not block indefinitely.
FAQ
  • When should I use QueueTask vs. RunTask?
    • Use QueueTask (or QueueTaskWithPriority) for asynchronous, background tasks that can wait in a queue for an available worker. This is ideal for high-throughput, batch processing, or any operation where immediate synchronous completion isn't strictly necessary.
    • Use RunTask for synchronous, immediate tasks that require low latency and might need a resource right away, bypassing the queues. It's suitable for user-facing requests or critical operations that can't afford any queuing delay, such as generating a quick preview.
  • How does CheckHealth affect workers and tasks?
    • If CheckHealth returns false for an error returned by a task, tasker considers the worker (or its underlying resource) to be in an unhealthy state. This unhealthy worker will be gracefully shut down, its resource destroyed (OnDestroy), and a new worker will be created to replace it. The original task that caused the unhealthy error will also be re-queued (up to MaxRetries) to be processed by a newly healthy worker. If CheckHealth returns true (or is nil), the error is considered a task-specific failure, not a worker health issue, and the worker continues operating.
  • What happens if a task panics?
    • It is generally recommended that your task functions (the func(R) (E, error) you pass to QueueTask etc.) internally recover from panics and convert them into errors. If a panic occurs and is not recovered within the task function itself, it will crash the specific worker goroutine that was executing it. While tasker will detect the worker's exit and attempt to replace it, unhandled panics can lead to unexpected behavior, lost task results, and potential resource leaks if OnDestroy is not called due to an abrupt exit.
  • Is tasker suitable for long-running tasks?
    • Yes, tasker can handle long-running tasks. However, be mindful of the parent context.Context passed to NewTaskManager. If that context is cancelled, all workers will attempt to gracefully shut down, potentially interrupting very long tasks. For indefinite tasks, consider managing them using separate Goroutines outside of tasker or ensure a very long-lived context for your TaskManager instance.
Changelog / Roadmap
  • CHANGELOG.md: See the project's history of changes and version releases.
License

tasker is distributed under the MIT License. See LICENSE.md for the full text.

Acknowledgments

This project is inspired by common worker pool patterns and the need for robust, flexible concurrency management in modern Go applications. It aims to provide a reliable foundation for building scalable backend services.

Documentation ¶

Overview ¶

Package tasker provides a robust and flexible task management system.

Package tasker provides a robust and flexible task management system.

Package tasker provides a robust and flexible task management system with dynamic worker scaling, resource pooling, and priority queuing. It is designed for concurrent execution of tasks, offering control over worker lifecycles, resource allocation, and graceful shutdown.

Index ¶

Constants ¶

This section is empty.

Variables ¶

This section is empty.

Functions ¶

This section is empty.

Types ¶

type Config ¶

type Config[R any] struct {
	// OnCreate is a function that creates and initializes a new resource of type R.
	// This function is called when a worker starts or when RunTask needs a temporary resource.
	// It must return a new resource or an error if resource creation fails.
	OnCreate func() (R, error)

	// OnDestroy is a function that performs cleanup or deallocation for a resource of type R.
	// This function is called when a worker shuts down or a temporary resource from RunTask is no longer needed.
	// It should handle any necessary resource finalization and return an error if cleanup fails.
	OnDestroy func(R) error

	// WorkerCount specifies the initial and minimum number of base workers.
	// This many workers will always be running, ready to pick up tasks. Must be greater than 0.
	WorkerCount int

	// Ctx is the parent context for the Runner. All worker contexts will be derived from this.
	// Cancelling this context will initiate a graceful shutdown of the Runner.
	Ctx context.Context

	// CheckHealth is an optional function that determines if a given error
	// indicates an "unhealthy" state for a worker or resource.
	// If a task returns an error and CheckHealth returns false, the worker processing
	// that task might be considered faulty and potentially shut down, and the task re-queued.
	// If nil, all errors are considered healthy (i.e., not cause for worker termination).
	CheckHealth func(error) bool

	// MaxWorkerCount is the maximum total number of workers (base + burst) allowed.
	// If 0, it defaults to WorkerCount * 2.
	MaxWorkerCount int

	// BurstInterval is the frequency at which the burst manager checks queue sizes
	// and adjusts the number of burst workers.
	// If 0, a default of 100 milliseconds is used.
	BurstInterval time.Duration

	// MaxRetries specifies the maximum number of times a task will be re-queued
	// if it fails and CheckHealth indicates an unhealthy state.
	// If 0, a default of 3 retries is used.
	MaxRetries int

	// ResourcePoolSize defines the number of resources to pre-allocate and maintain
	// in the internal pool for `RunTask` operations.
	// If 0, a default of `WorkerCount` is used.
	ResourcePoolSize int

	// Optional: Custom logger interface for internal messages
	Logger Logger

	// Optional: Custom metrics collector
	Collector MetricsCollector
	// Deprecated
	BurstTaskThreshold int
	// Deprecated
	BurstWorkerCount int
}

Config holds configuration parameters for creating a new Runner instance. These parameters control worker behavior, resource management, and scaling policies. R is the resource type (e.g., *sql.DB, *http.Client).

type Logger ¶ added in v1.2.0

type Logger interface {
	// Debugf logs a message at the debug level.
	Debugf(format string, args ...any)
	// Infof logs a message at the info level.
	Infof(format string, args ...any)
	// Warnf logs a message at the warning level.
	Warnf(format string, args ...any)
	// Errorf logs a message at the error level.
	Errorf(format string, args ...any)
}

Logger defines the interface for logging messages from the TaskManager. This allows users to integrate their own preferred logging library.

type Manager ¶ added in v1.2.0

type Manager[R any, E any] struct {
	// contains filtered or unexported fields
}

Manager implements the TaskManager interface, providing a highly concurrent and scalable task execution environment. It manages a pool of workers, handles task queuing (main and priority), supports dynamic worker scaling (bursting), and includes a resource pool for immediate task execution. R is the type of resource used by tasks (e.g., *sql.DB, *http.Client). E is the expected result type of the tasks.

func (*Manager[R, E]) Kill ¶ added in v1.2.0

func (r *Manager[R, E]) Kill() error

Kill immediately terminates the task manager without draining queues.

func (*Manager[R, E]) Metrics ¶ added in v1.2.0

func (r *Manager[R, E]) Metrics() TaskMetrics

Metrics returns a snapshot of the currently aggregated performance metrics.

func (*Manager[R, E]) QueueTask ¶ added in v1.2.0

func (r *Manager[R, E]) QueueTask(taskFunc func(R) (E, error)) (E, error)

QueueTask adds a task to the main queue for asynchronous processing.

func (*Manager[R, E]) QueueTaskOnce ¶ added in v1.2.0

func (r *Manager[R, E]) QueueTaskOnce(taskFunc func(R) (E, error)) (E, error)

QueTaskOnce adds a task to the main queue that will NOT be retried by the task manager's internal retry mechanism if it fails and CheckHealth indicates an unhealthy state. This is suitable for non-idempotent operations. It returns the result and error of the task once it completes. If the task manager is shutting down, it returns an error immediately.

func (*Manager[R, E]) QueueTaskWithPriority ¶ added in v1.2.0

func (r *Manager[R, E]) QueueTaskWithPriority(taskFunc func(R) (E, error)) (E, error)

QueueTaskWithPriority adds a high-priority task to the priority queue.

func (*Manager[R, E]) QueueTaskWithPriorityOnce ¶ added in v1.2.0

func (r *Manager[R, E]) QueueTaskWithPriorityOnce(taskFunc func(R) (E, error)) (E, error)

QueueTaskWithPriority adds a high-priority task to the priority queue, but the task will not be retried.

func (*Manager[R, E]) RunTask ¶ added in v1.2.0

func (r *Manager[R, E]) RunTask(taskFunc func(R) (E, error)) (E, error)

RunTask executes a task immediately using a resource from the pool.

func (*Manager[R, E]) Stats ¶ added in v1.2.0

func (r *Manager[R, E]) Stats() TaskStats

Stats returns a snapshot of the runner's current operational state.

func (*Manager[R, E]) Stop ¶ added in v1.2.0

func (r *Manager[R, E]) Stop() error

Stop gracefully shuts down the task manager by draining the task queues.

type MetricsCollector ¶ added in v1.2.0

type MetricsCollector interface {
	// RecordArrival is called by the TaskManager each time a new task is queued.
	// This provides the necessary data to calculate the task arrival rate.
	RecordArrival()

	// RecordCompletion is called by the TaskManager whenever a task completes
	// successfully. The collector should use the provided timestamps to update
	// its internal metrics for latency and throughput.
	//
	// The `stamps` parameter contains the timing information for the completed task,
	// allowing the collector to calculate wait and execution times.
	RecordCompletion(stamps TaskLifecycleTimestamps)

	// RecordFailure is called by the TaskManager whenever a task fails permanently
	// (i.e., all retries are exhausted). The collector should update its
	// failure and error rate metrics.
	//
	// The `stamps` parameter provides the timing information for the failed task.
	RecordFailure(stamps TaskLifecycleTimestamps)

	// RecordRetry is called by the TaskManager each time a task is queued for a
	// retry attempt. This allows the collector to track the overall reliability
	// and health of the tasks and their underlying resources.
	RecordRetry()

	// Metrics returns a snapshot of the currently aggregated performance metrics.
	// This method allows users to periodically fetch the latest metrics to
	// monitor the system's health and performance.
	Metrics() TaskMetrics
}

MetricsCollector defines the interface for collecting and calculating performance and reliability metrics for the TaskManager. Implementations of this interface are responsible for processing task lifecycle events and aggregating the data into the TaskMetrics struct.

func NewCollector ¶ added in v1.2.0

func NewCollector() MetricsCollector

NewCollector creates and initializes a new collector instance.

type Task ¶

type Task[R any, E any] struct {
	// contains filtered or unexported fields
}

Task represents a unit of work to be executed by a worker. It encapsulates the task function, channels for returning results and errors, and a retry counter for handling transient failures.

type TaskLifecycleTimestamps ¶ added in v1.2.0

type TaskLifecycleTimestamps struct {
	// QueuedAt is the time when the task was first added to a queue.
	QueuedAt time.Time
	// StartedAt is the time when a worker began executing the task.
	StartedAt time.Time
	// FinishedAt is the time when the task execution completed (successfully or not).
	FinishedAt time.Time
}

TaskLifecycleTimestamps holds critical timestamps from a task's journey. This data is passed to a MetricsCollector to calculate performance metrics.

type TaskManager ¶

type TaskManager[R any, E any] interface {
	// QueueTask adds a task to the main queue for asynchronous execution.
	// The task will be picked up by an available worker.
	// It returns the result and error of the task once it completes.
	// If the task manager is shutting down, it returns an error immediately.
	QueueTask(task func(R) (E, error)) (E, error)

	// RunTask executes a task immediately, bypassing the main and priority queues.
	// It attempts to acquire a resource from the internal pool. If no resource
	// is immediately available, it temporarily creates a new one for the task.
	// This method is suitable for urgent tasks that should not be delayed by queueing.
	// It returns the result and error of the task.
	// If the task manager is shutting down, it returns an error immediately.
	RunTask(task func(R) (E, error)) (E, error)

	// QueueTaskWithPriority adds a high priority task to a dedicated queue.
	// Tasks in the priority queue are processed before tasks in the main queue.
	// It returns the result and error of the task once it completes.
	// If the task manager is shutting down, it returns an error immediately.
	QueueTaskWithPriority(task func(R) (E, error)) (E, error)

	// QueueTaskOnce adds a task to the main queue for asynchronous execution.
	// This task will NOT be re-queued by the task manager's internal retry mechanism
	// if it fails and CheckHealth indicates an unhealthy state. This is suitable
	// for non-idempotent operations where "at-most-once" execution is desired
	// from the task manager's perspective.
	// It returns the result and error of the task once it completes.
	// If the task manager is shutting down, it returns an error immediately.
	QueueTaskOnce(task func(R) (E, error)) (E, error)

	// QueueTaskWithPriorityOnce adds a high priority task to a dedicated queue.
	// This task will NOT be re-queued by the task manager's internal retry mechanism
	// if it fails and CheckHealth indicates an unhealthy state. This is suitable
	// for non-idempotent high-priority operations where "at-most-once" execution
	// is desired from the task manager's perspective.
	// It returns the result and error of the task once it completes.
	// If the task manager is shutting down, it returns an error immediately.
	QueueTaskWithPriorityOnce(task func(R) (E, error)) (E, error)

	// Stop gracefully shuts down the task manager.
	// It stops accepting new tasks, waits for all queued tasks to be completed,
	// and then releases all managed resources.
	Stop() error

	// Kill immediately shuts down the task manager.
	// It cancels all running tasks, drops all queued tasks, and releases resources.
	// It does not wait for tasks to complete.
	Kill() error

	// Stats returns current statistics about the task manager's operational state.
	// This includes information on worker counts, queued tasks, and resource availability.
	Stats() TaskStats

	// Metrics returns a snapshot of the currently aggregated performance metrics.
	Metrics() TaskMetrics
}

TaskManager defines the interface for managing asynchronous and synchronous task execution within a pool of workers and resources. Generic types R and E represent the Resource type and the Task execution Result type, respectively.

func NewRunner ¶

func NewRunner[R any, E any](config Config[R]) (TaskManager[R, E], error)

Deprecated. Use NewTaskManager instead

func NewTaskManager ¶ added in v1.2.0

func NewTaskManager[R any, E any](config Config[R]) (TaskManager[R, E], error)

NewTaskManager creates and initializes a new Runner instance based on the provided configuration. It sets up the resource pool, starts the base workers, and kicks off the burst manager. Returns a TaskManager interface and an error if initialization fails (e.g., invalid config, resource creation error).

type TaskMetrics ¶ added in v1.2.0

type TaskMetrics struct {

	// AverageExecutionTime is the average time spent executing a task, from the moment
	// a worker picks it up until it completes. (Unit: time.Duration)
	AverageExecutionTime time.Duration

	// MinExecutionTime is the shortest execution time recorded for any single task.
	// Useful for understanding the best-case performance scenario. (Unit: time.Duration)
	MinExecutionTime time.Duration

	// MaxExecutionTime is the longest execution time recorded for any single task.
	// Useful for identifying outliers and potential long-running task issues. (Unit: time.Duration)
	MaxExecutionTime time.Duration

	// P95ExecutionTime is the 95th percentile of task execution time. 95% of tasks
	// completed in this time or less. This is a key indicator of tail latency. (Unit: time.Duration)
	P95ExecutionTime time.Duration

	// P99ExecutionTime is the 99th percentile of task execution time. 99% of tasks
	// completed in this time or less. Helps in understanding the performance for the
	// vast majority of tasks, excluding extreme outliers. (Unit: time.Duration)
	P99ExecutionTime time.Duration

	// AverageWaitTime is the average time a task spends in a queue before being
	// picked up by a worker. High values may indicate that the system is under-provisioned.
	// (Unit: time.Duration)
	AverageWaitTime time.Duration

	// TaskArrivalRate is the number of new tasks being added to the queues per second,
	// calculated over a recent time window.
	TaskArrivalRate float64

	// TaskCompletionRate is the number of tasks being successfully completed per second,
	// calculated over a recent time window. This is a primary measure of system throughput.
	TaskCompletionRate float64

	// TotalTasksCompleted is the total count of tasks that have completed successfully
	// since the TaskManager started.
	TotalTasksCompleted uint64

	// TotalTasksFailed is the total count of tasks that have failed after all retry
	// attempts have been exhausted.
	TotalTasksFailed uint64

	// TotalTasksRetried is the total number of times any task has been retried due to
	// recoverable errors (e.g., unhealthy resource state). A high value may indicate
	// instability in resources or downstream services.
	TotalTasksRetried uint64

	// SuccessRate is the ratio of successfully completed tasks to the total number of
	// terminal tasks (completed + failed). (Value: 0.0 to 1.0)
	SuccessRate float64

	// FailureRate is the ratio of failed tasks to the total number of terminal tasks
	// (completed + failed). (Value: 0.0 to 1.0)
	FailureRate float64
}

TaskMetrics provides a comprehensive snapshot of performance, throughput, and reliability metrics for a TaskManager instance. These metrics offer deep insights into the behavior of the task execution system over time.

type TaskStats ¶

type TaskStats struct {
	BaseWorkers        int32 // Number of permanently active workers.
	ActiveWorkers      int32 // Total number of currently active workers (base + burst).
	BurstWorkers       int32 // Number of dynamically scaled-up workers.
	QueuedTasks        int32 // Number of tasks currently in the main queue.
	PriorityTasks      int32 // Number of tasks currently in the priority queue.
	AvailableResources int32 // Number of resources currently available in the resource pool.
}

TaskStats provides insight into the task manager's current state and performance.

type Tasker ¶

type Tasker interface {
	// Execute runs a task with the provided resource.
	// The context can be used for cancellation or passing deadlines to the task.
	// It returns the task's result and any error encountered during execution.
	Execute(ctx context.Context, resource any) (any, error)
}

Tasker defines the core interface for task execution. Implementations of Tasker are responsible for running a single task with a given resource and returning a result or an error.

Directories ¶

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL