Documentation
¶
Overview ¶
Package tasker provides a robust and flexible task management system.
Package tasker provides a robust and flexible task management system.
Package tasker provides a robust and flexible task management system with dynamic worker scaling, resource pooling, and priority queuing. It is designed for concurrent execution of tasks, offering control over worker lifecycles, resource allocation, and graceful shutdown.
Index ¶
- type Config
- type Logger
- type Manager
- func (r *Manager[R, E]) Kill() error
- func (r *Manager[R, E]) Metrics() TaskMetrics
- func (r *Manager[R, E]) QueueTask(taskFunc func(R) (E, error)) (E, error)
- func (r *Manager[R, E]) QueueTaskOnce(taskFunc func(R) (E, error)) (E, error)
- func (r *Manager[R, E]) QueueTaskWithPriority(taskFunc func(R) (E, error)) (E, error)
- func (r *Manager[R, E]) QueueTaskWithPriorityOnce(taskFunc func(R) (E, error)) (E, error)
- func (r *Manager[R, E]) RunTask(taskFunc func(R) (E, error)) (E, error)
- func (r *Manager[R, E]) Stats() TaskStats
- func (r *Manager[R, E]) Stop() error
- type MetricsCollector
- type Task
- type TaskLifecycleTimestamps
- type TaskManager
- type TaskMetrics
- type TaskStats
- type Tasker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config[R any] struct { // OnCreate is a function that creates and initializes a new resource of type R. // This function is called when a worker starts or when RunTask needs a temporary resource. // It must return a new resource or an error if resource creation fails. OnCreate func() (R, error) // OnDestroy is a function that performs cleanup or deallocation for a resource of type R. // This function is called when a worker shuts down or a temporary resource from RunTask is no longer needed. // It should handle any necessary resource finalization and return an error if cleanup fails. OnDestroy func(R) error // WorkerCount specifies the initial and minimum number of base workers. // This many workers will always be running, ready to pick up tasks. Must be greater than 0. WorkerCount int // Ctx is the parent context for the Runner. All worker contexts will be derived from this. // Cancelling this context will initiate a graceful shutdown of the Runner. Ctx context.Context // CheckHealth is an optional function that determines if a given error // indicates an "unhealthy" state for a worker or resource. // If a task returns an error and CheckHealth returns false, the worker processing // that task might be considered faulty and potentially shut down, and the task re-queued. // If nil, all errors are considered healthy (i.e., not cause for worker termination). CheckHealth func(error) bool // MaxWorkerCount is the maximum total number of workers (base + burst) allowed. // If 0, it defaults to WorkerCount * 2. MaxWorkerCount int // BurstInterval is the frequency at which the burst manager checks queue sizes // and adjusts the number of burst workers. // If 0, a default of 100 milliseconds is used. BurstInterval time.Duration // MaxRetries specifies the maximum number of times a task will be re-queued // if it fails and CheckHealth indicates an unhealthy state. // If 0, a default of 3 retries is used. MaxRetries int // ResourcePoolSize defines the number of resources to pre-allocate and maintain // in the internal pool for `RunTask` operations. // If 0, a default of `WorkerCount` is used. ResourcePoolSize int // Optional: Custom logger interface for internal messages Logger Logger // Optional: Custom metrics collector Collector MetricsCollector // Deprecated BurstTaskThreshold int // Deprecated BurstWorkerCount int }
Config holds configuration parameters for creating a new Runner instance. These parameters control worker behavior, resource management, and scaling policies. R is the resource type (e.g., *sql.DB, *http.Client).
type Logger ¶ added in v1.2.0
type Logger interface { // Debugf logs a message at the debug level. Debugf(format string, args ...any) // Infof logs a message at the info level. Infof(format string, args ...any) // Warnf logs a message at the warning level. Warnf(format string, args ...any) // Errorf logs a message at the error level. Errorf(format string, args ...any) }
Logger defines the interface for logging messages from the TaskManager. This allows users to integrate their own preferred logging library.
type Manager ¶ added in v1.2.0
Manager implements the TaskManager interface, providing a highly concurrent and scalable task execution environment. It manages a pool of workers, handles task queuing (main and priority), supports dynamic worker scaling (bursting), and includes a resource pool for immediate task execution. R is the type of resource used by tasks (e.g., *sql.DB, *http.Client). E is the expected result type of the tasks.
func (*Manager[R, E]) Kill ¶ added in v1.2.0
Kill immediately terminates the task manager without draining queues.
func (*Manager[R, E]) Metrics ¶ added in v1.2.0
func (r *Manager[R, E]) Metrics() TaskMetrics
Metrics returns a snapshot of the currently aggregated performance metrics.
func (*Manager[R, E]) QueueTask ¶ added in v1.2.0
QueueTask adds a task to the main queue for asynchronous processing.
func (*Manager[R, E]) QueueTaskOnce ¶ added in v1.2.0
QueTaskOnce adds a task to the main queue that will NOT be retried by the task manager's internal retry mechanism if it fails and CheckHealth indicates an unhealthy state. This is suitable for non-idempotent operations. It returns the result and error of the task once it completes. If the task manager is shutting down, it returns an error immediately.
func (*Manager[R, E]) QueueTaskWithPriority ¶ added in v1.2.0
QueueTaskWithPriority adds a high-priority task to the priority queue.
func (*Manager[R, E]) QueueTaskWithPriorityOnce ¶ added in v1.2.0
QueueTaskWithPriority adds a high-priority task to the priority queue, but the task will not be retried.
func (*Manager[R, E]) RunTask ¶ added in v1.2.0
RunTask executes a task immediately using a resource from the pool.
type MetricsCollector ¶ added in v1.2.0
type MetricsCollector interface { // RecordArrival is called by the TaskManager each time a new task is queued. // This provides the necessary data to calculate the task arrival rate. RecordArrival() // RecordCompletion is called by the TaskManager whenever a task completes // successfully. The collector should use the provided timestamps to update // its internal metrics for latency and throughput. // // The `stamps` parameter contains the timing information for the completed task, // allowing the collector to calculate wait and execution times. RecordCompletion(stamps TaskLifecycleTimestamps) // RecordFailure is called by the TaskManager whenever a task fails permanently // (i.e., all retries are exhausted). The collector should update its // failure and error rate metrics. // // The `stamps` parameter provides the timing information for the failed task. RecordFailure(stamps TaskLifecycleTimestamps) // RecordRetry is called by the TaskManager each time a task is queued for a // retry attempt. This allows the collector to track the overall reliability // and health of the tasks and their underlying resources. RecordRetry() // Metrics returns a snapshot of the currently aggregated performance metrics. // This method allows users to periodically fetch the latest metrics to // monitor the system's health and performance. Metrics() TaskMetrics }
MetricsCollector defines the interface for collecting and calculating performance and reliability metrics for the TaskManager. Implementations of this interface are responsible for processing task lifecycle events and aggregating the data into the TaskMetrics struct.
func NewCollector ¶ added in v1.2.0
func NewCollector() MetricsCollector
NewCollector creates and initializes a new collector instance.
type Task ¶
Task represents a unit of work to be executed by a worker. It encapsulates the task function, channels for returning results and errors, and a retry counter for handling transient failures.
type TaskLifecycleTimestamps ¶ added in v1.2.0
type TaskLifecycleTimestamps struct { // QueuedAt is the time when the task was first added to a queue. QueuedAt time.Time // StartedAt is the time when a worker began executing the task. StartedAt time.Time // FinishedAt is the time when the task execution completed (successfully or not). FinishedAt time.Time }
TaskLifecycleTimestamps holds critical timestamps from a task's journey. This data is passed to a MetricsCollector to calculate performance metrics.
type TaskManager ¶
type TaskManager[R any, E any] interface { // QueueTask adds a task to the main queue for asynchronous execution. // The task will be picked up by an available worker. // It returns the result and error of the task once it completes. // If the task manager is shutting down, it returns an error immediately. QueueTask(task func(R) (E, error)) (E, error) // RunTask executes a task immediately, bypassing the main and priority queues. // It attempts to acquire a resource from the internal pool. If no resource // is immediately available, it temporarily creates a new one for the task. // This method is suitable for urgent tasks that should not be delayed by queueing. // It returns the result and error of the task. // If the task manager is shutting down, it returns an error immediately. RunTask(task func(R) (E, error)) (E, error) // QueueTaskWithPriority adds a high priority task to a dedicated queue. // Tasks in the priority queue are processed before tasks in the main queue. // It returns the result and error of the task once it completes. // If the task manager is shutting down, it returns an error immediately. QueueTaskWithPriority(task func(R) (E, error)) (E, error) // QueueTaskOnce adds a task to the main queue for asynchronous execution. // This task will NOT be re-queued by the task manager's internal retry mechanism // if it fails and CheckHealth indicates an unhealthy state. This is suitable // for non-idempotent operations where "at-most-once" execution is desired // from the task manager's perspective. // It returns the result and error of the task once it completes. // If the task manager is shutting down, it returns an error immediately. QueueTaskOnce(task func(R) (E, error)) (E, error) // QueueTaskWithPriorityOnce adds a high priority task to a dedicated queue. // This task will NOT be re-queued by the task manager's internal retry mechanism // if it fails and CheckHealth indicates an unhealthy state. This is suitable // for non-idempotent high-priority operations where "at-most-once" execution // is desired from the task manager's perspective. // It returns the result and error of the task once it completes. // If the task manager is shutting down, it returns an error immediately. QueueTaskWithPriorityOnce(task func(R) (E, error)) (E, error) // Stop gracefully shuts down the task manager. // It stops accepting new tasks, waits for all queued tasks to be completed, // and then releases all managed resources. Stop() error // Kill immediately shuts down the task manager. // It cancels all running tasks, drops all queued tasks, and releases resources. // It does not wait for tasks to complete. Kill() error // Stats returns current statistics about the task manager's operational state. // This includes information on worker counts, queued tasks, and resource availability. Stats() TaskStats // Metrics returns a snapshot of the currently aggregated performance metrics. Metrics() TaskMetrics }
TaskManager defines the interface for managing asynchronous and synchronous task execution within a pool of workers and resources. Generic types R and E represent the Resource type and the Task execution Result type, respectively.
func NewRunner ¶
func NewRunner[R any, E any](config Config[R]) (TaskManager[R, E], error)
Deprecated. Use NewTaskManager instead
func NewTaskManager ¶ added in v1.2.0
func NewTaskManager[R any, E any](config Config[R]) (TaskManager[R, E], error)
NewTaskManager creates and initializes a new Runner instance based on the provided configuration. It sets up the resource pool, starts the base workers, and kicks off the burst manager. Returns a TaskManager interface and an error if initialization fails (e.g., invalid config, resource creation error).
type TaskMetrics ¶ added in v1.2.0
type TaskMetrics struct { // AverageExecutionTime is the average time spent executing a task, from the moment // a worker picks it up until it completes. (Unit: time.Duration) AverageExecutionTime time.Duration // MinExecutionTime is the shortest execution time recorded for any single task. // Useful for understanding the best-case performance scenario. (Unit: time.Duration) MinExecutionTime time.Duration // MaxExecutionTime is the longest execution time recorded for any single task. // Useful for identifying outliers and potential long-running task issues. (Unit: time.Duration) MaxExecutionTime time.Duration // P95ExecutionTime is the 95th percentile of task execution time. 95% of tasks // completed in this time or less. This is a key indicator of tail latency. (Unit: time.Duration) P95ExecutionTime time.Duration // P99ExecutionTime is the 99th percentile of task execution time. 99% of tasks // completed in this time or less. Helps in understanding the performance for the // vast majority of tasks, excluding extreme outliers. (Unit: time.Duration) P99ExecutionTime time.Duration // AverageWaitTime is the average time a task spends in a queue before being // picked up by a worker. High values may indicate that the system is under-provisioned. // (Unit: time.Duration) AverageWaitTime time.Duration // TaskArrivalRate is the number of new tasks being added to the queues per second, // calculated over a recent time window. TaskArrivalRate float64 // TaskCompletionRate is the number of tasks being successfully completed per second, // calculated over a recent time window. This is a primary measure of system throughput. TaskCompletionRate float64 // TotalTasksCompleted is the total count of tasks that have completed successfully // since the TaskManager started. TotalTasksCompleted uint64 // TotalTasksFailed is the total count of tasks that have failed after all retry // attempts have been exhausted. TotalTasksFailed uint64 // TotalTasksRetried is the total number of times any task has been retried due to // recoverable errors (e.g., unhealthy resource state). A high value may indicate // instability in resources or downstream services. TotalTasksRetried uint64 // SuccessRate is the ratio of successfully completed tasks to the total number of // terminal tasks (completed + failed). (Value: 0.0 to 1.0) SuccessRate float64 // FailureRate is the ratio of failed tasks to the total number of terminal tasks // (completed + failed). (Value: 0.0 to 1.0) FailureRate float64 }
TaskMetrics provides a comprehensive snapshot of performance, throughput, and reliability metrics for a TaskManager instance. These metrics offer deep insights into the behavior of the task execution system over time.
type TaskStats ¶
type TaskStats struct { BaseWorkers int32 // Number of permanently active workers. ActiveWorkers int32 // Total number of currently active workers (base + burst). BurstWorkers int32 // Number of dynamically scaled-up workers. QueuedTasks int32 // Number of tasks currently in the main queue. PriorityTasks int32 // Number of tasks currently in the priority queue. AvailableResources int32 // Number of resources currently available in the resource pool. }
TaskStats provides insight into the task manager's current state and performance.
type Tasker ¶
type Tasker interface { // Execute runs a task with the provided resource. // The context can be used for cancellation or passing deadlines to the task. // It returns the task's result and any error encountered during execution. Execute(ctx context.Context, resource any) (any, error) }
Tasker defines the core interface for task execution. Implementations of Tasker are responsible for running a single task with a given resource and returning a result or an error.