README
¶
taskara
lightweight task scheduler with priority and worker pool
taskara is a simple and fast library for managing tasks in go. it allows you to run many tasks concurrently using a controlled number of workers. it is designed to be minimal, having zero external dependencies.
[!IMPORTANT]
alpha-build
this is an early alpha version of the project. it does not include detailed instructions or many examples yet.
status: under active development.
future: we plan to add more features, better documentation, and complex examples soon.
we do not guarantee backward compatibility of various releases until the first stable version is available.
navigation
features
- worker pool: limit the number of active goroutines to save resources.
- priority queue: important tasks are executed first.
- scheduling: start tasks immediately or at a specific time.
- timeouts: automatically cancel tasks that take too long.
- individual control: cancel a specific task by its id without stopping others.
- easy integration: get results through go channels.
- panic recovery: automatically catches panics inside tasks, reporting them as errors without crashing the worker pool.
architecture
the system consists of three main parts:
- scheduler: manages the waiting and ready queues using a priority heap.
- executor: runs a fixed pool of workers that process tasks.
- cluster: provides a simple interface to submit and manage tasks.
methods and functions
task (package functions)
These functions are called directly from the task package.
NewTask(id string, fn TaskFunc) TaskInterface - function (constructor), creates a new task instance.
[!NOTE] If you leave the id empty, a unique ID will be automatically generated by the cluster during the
Submit()call.
TaskInterface (methods)
These methods are available on a task instance.
ID() string - Returns the task's unique identifier.
SetID(id string) - Updates the task ID manually.
[!CAUTION] Do not call
SetIDafter the task has been added to a cluster. Doing so will break the internal mapping and the task may become unmanageable.
Fn() TaskFunc - Returns the underlying task function.
cluster (package functions)
NewCluster(workers int, ctx context.Context) ClusterInterface - function (constructor), creates a new cluster instance.
ClusterInterface (methods)
Run() - Starts the cluster's execution engine. Workers begin listening to the queue and processing tasks.
[!NOTE] Tasks will remain in the queue and won't start until Run() is called.
AddTask(t task.TaskInterface) *clusterTaskBuilder - The entry point for submitting a task. It returns a builder that allows you to configure scheduling, timeouts, and metadata. You must call .Submit() at the end of the chain to queue the task.
Subscribe(id string) (<-chan Result, error)- Returns a channel that receives the task's result (val and err).
[!WARNING] Timing is key: By default, you must subscribe before the task completes. To retrieve results for already finished tasks, the task must be marked as
.IsCacheable(true)during submission.
[!IMPORTANT] Avoid deadlocks: If your task logic (the
TaskFunc) blocks and waits for a result fromSubscribe, and all workers in the pool are occupied by similar tasks, you will hit a deadlock. Ensure that result consumption happens in a separate goroutine or that the worker pool is large enough.
[!NOTE] Channel safety: Always check if the returned channel is not
niland handle theerrorreturned by the method. A nil channel is returned only if the error is notnil.
CancelTask(id string) - Targets and cancels a specific task by its ID. This triggers the cancelled channel inside the TaskFunc and closes the task's context.
Stop(timeout time.Duration) error - Graceful shutdown. The cluster stops accepting new tasks and waits for active workers to finish. If the timeout is reached before tasks finish, it returns an error.
Cancel() - Immediate shutdown. Instantly kills all workers and cancels all active task contexts.
[!CAUTION] Use this only when a graceful shutdown is not possible, as it may leave tasks in an incomplete state.
clusterTaskBuilder (methods)
These methods allow you to configure a task's behavior before adding it to the queue. They support method chaining.
WithStartTime(st time.Time) *clusterTaskBuilder - Schedules the task to run at a specific time. If the time is in the past or time.Now(), the task will be executed as soon as a worker is available.
WithTimeout(tm time.Duration) *clusterTaskBuilder - Sets a maximum execution time for the task. If the task exceeds this duration, its ctx will be cancelled, and the task will be marked as timed out.
WithPriority(p int) *clusterTaskBuilder - Sets the task's priority. Higher values (or lower, depending on your heap logic—usually higher) will move the task to the front of the queue.
[!TIP] How scheduling priority works?
Time first: The scheduler primarily looks at the
StartTime. A task scheduled for "now" will always beat a task scheduled for "in 5 minutes," regardless of priority.
Priority as a tie-breaker: If two tasks have the sameStartTime, the one with the higher priority will be executed first.
Order of submission: If bothStartTimeandPriorityare identical, the task that was submitted first (FIFO) will typically take precedence.
IsCacheable(v bool) *clusterTaskBuilder - Determines if the task result should be stored in memory after completion.
[!TIP] Retention Policy: Currently, cached results are stored for 5 minutes after the task completes. After this period, the result is purged from memory to prevent leaks. (Note: This duration may become configurable in future releases).
Submit() (string, error) - The final method in the chain. It validates the task, generates an ID (if empty), and pushes the task into the scheduler.
[!WARNING] Returns an error if a task with the same ID is already running or managed by the cluster.
core concepts & usage
installation
go get github.com/bozylik/taskara@v0.0.2-alpha
Usage:
import (
"github.com/bozylik/taskara/task"
"github.com/bozylik/taskara/cluster"
)
part 1: TaskFunc (job)
TaskFunc - is a type from the taskara/task package that defines the function (job) to be executed:
type TaskFunc func(id string, workerCtx context.Context, cancelled <-chan struct{}, report Reporter)
| Argument | Type | Description |
|---|---|---|
| id | string |
The unique identifier of the task |
| ctx | context.Context |
General context. Triggers on manual cancel, timeout, and cluster shutdown |
| cancelled | <-chan struct{} |
Special channel. Triggers only on manual user cancellation via CancelTask() |
| report | Reporter |
Callback function to send results and errors back to the cluster |
type Reporter func(id string, val any, err error) - reporter is a callback used to send results back to the cluster.
| Argument | Type | Description |
|---|---|---|
| id | string |
The unique task identifier (passed from TaskFunc) |
| val | any |
Any data you want to return as a result (interface{}) |
| err | error |
An error object if the task failed, otherwise nil |
job1 := func(id string, ctx context.Context, cancelled <-chan struct{}, report task.Reporter) {
select {
case <-cancelled:
return
case <-ctx.Done():
fmt.Println("timeout")
return
case <-time.After(4 * time.Second):
fmt.Printf("[%s] Working job1...\n", id)
}
report(id, "Data from task-1", nil)
}
[!TIP] Why two signals? Use
ctx.Done()for general cleanup. Use cancelled only if you need to distinguish a manual user "Abort" from a timeout. If you return without callingreport(), the cluster will automatically finalize the task with nil results.
part 2: task and TaskInterface
A task is a base struct used within clusters. It contains a unique ID and the function to be executed.
task - struct with id string and fn TaskFunc fields.
TaskInterface - interface that provides encapsulation for task methods.
task1 := task.NewTask("", job1)
// or specify a custom ID
task1 := task.NewTask("task-1", job1)
part 3: cluster and ClusterInterface
The Cluster manages task execution. It contains all necessary methods for running, cancelling tasks, and the cluster itself.
cluster - struct that contains executor, scheduler, and state.
ClusterInterface - interface that provides encapsulation for cluster methods.
// Cluster context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create cluster with 2 active workers
myCluster := cluster.NewCluster(2, ctx)
myCluster.Run()
part 4: clusterTask and clusterTaskBuilder
Result is what you get from the subscription channel.
type Result struct {
Result any // Data from the task
Err error // Error from the task, timeout, or panic
}
clusterTask - is an internal task struct that represents a task within the cluster's lifecycle.
clusterTaskBuilder - is a helper that allows you to use chaining while adding a new task.
clusterTaskID, err := myCluster.AddTask(task1).
WithStartTime(time.Now().Add(5 * time.Second)).
WithTimeout(5 * time.Second).
IsCacheable(true).
Submit()
if err != nil {
// Panic for example
panic(err)
}
resChan, err := myCluster.Subscribe(clusterTaskID)
if err != nil {
// Panic for example
panic(err)
}
fmt.Println("Results:", <-resChan)
examples
Check out our usage examples, to see more complex use cases and implementation details.
Documentation
¶
There is no documentation for this package.