taskara

package module
v0.0.3-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2026 License: MIT Imports: 0 Imported by: 0

README

taskara

lightweight task scheduler with priority and worker pool

taskara is a simple and fast library for managing tasks in go. it allows you to run many tasks concurrently using a controlled number of workers. it is designed to be minimal, having zero external dependencies.


[!IMPORTANT]

alpha-build

this is an early alpha version of the project. it does not include detailed instructions or many examples yet.
status: under active development.
future: we plan to add more features, better documentation, and complex examples soon.
we do not guarantee backward compatibility of various releases until the first stable version is available.


navigation


features

  • worker pool: limit the number of active goroutines to save resources.
  • priority queue: important tasks are executed first.
  • scheduling: start tasks immediately or at a specific time.
  • timeouts: automatically cancel tasks that take too long.
  • individual control: cancel a specific task by its id without stopping others.
  • easy integration: get results through go channels.
  • panic recovery: automatically catches panics inside tasks, reporting them as errors without crashing the worker pool.

architecture

the system consists of three main parts:

  1. scheduler: manages the waiting and ready queues using a priority heap.
  2. executor: runs a fixed pool of workers that process tasks.
  3. cluster: provides a simple interface to submit and manage tasks.

methods and functions

task (package functions)

These functions are called directly from the task package.

NewTask(id string, fn TaskFunc) TaskInterface - function (constructor), creates a new task instance.

[!NOTE] If you leave the id empty, a unique ID will be automatically generated by the cluster during the Submit() call.

TaskInterface (methods)

These methods are available on a task instance.

ID() string - Returns the task's unique identifier.
SetID(id string) - Updates the task ID manually.

[!CAUTION] Do not call SetID after the task has been added to a cluster. Doing so will break the internal mapping and the task may become unmanageable.

Fn() TaskFunc - Returns the underlying task function.

cluster (package functions)

NewCluster(workers int, ctx context.Context) ClusterInterface - function (constructor), creates a new cluster instance.

ClusterInterface (methods)

Run() - Starts the cluster's execution engine. Workers begin listening to the queue and processing tasks.

[!NOTE] Tasks will remain in the queue and won't start until Run() is called.

AddTask(t task.TaskInterface) *clusterTaskBuilder - The entry point for submitting a task. It returns a builder that allows you to configure scheduling, timeouts, and metadata. You must call .Submit() at the end of the chain to queue the task.
Subscribe(id string) (<-chan Result, error)- Returns a channel that receives the task's result (val and err).

[!WARNING] Timing is key: By default, you must subscribe before the task completes. To retrieve results for already finished tasks, the task must be marked as .IsCacheable(true) during submission.

[!IMPORTANT] Avoid deadlocks: If your task logic (the TaskFunc) blocks and waits for a result from Subscribe, and all workers in the pool are occupied by similar tasks, you will hit a deadlock. Ensure that result consumption happens in a separate goroutine or that the worker pool is large enough.

[!NOTE] Channel safety: Always check if the returned channel is not nil and handle the error returned by the method. A nil channel is returned only if the error is not nil.

CancelTask(id string) - Targets and cancels a specific task by its ID. This triggers the cancelled channel inside the TaskFunc and closes the task's context.
Stop(timeout time.Duration) error - Graceful shutdown. The cluster stops accepting new tasks and waits for active workers to finish. If the timeout is reached before tasks finish, it returns an error.
Cancel() - Immediate shutdown. Instantly kills all workers and cancels all active task contexts.

[!CAUTION] Use this only when a graceful shutdown is not possible, as it may leave tasks in an incomplete state.

clusterTaskBuilder (methods)

These methods allow you to configure a task's behavior before adding it to the queue. They support method chaining.

WithStartTime(st time.Time) *clusterTaskBuilder - Schedules the task to run at a specific time. If the time is in the past or time.Now(), the task will be executed as soon as a worker is available.
WithTimeout(tm time.Duration) *clusterTaskBuilder - Sets a maximum execution time for the task. If the task exceeds this duration, its ctx will be cancelled, and the task will be marked as timed out.
WithPriority(p int) *clusterTaskBuilder - Sets the task's priority. Higher values (or lower, depending on your heap logic—usually higher) will move the task to the front of the queue.

[!TIP] How scheduling priority works?

Time first: The scheduler primarily looks at the StartTime. A task scheduled for "now" will always beat a task scheduled for "in 5 minutes," regardless of priority.
Priority as a tie-breaker: If two tasks have the same StartTime, the one with the higher priority will be executed first.
Order of submission: If both StartTime and Priority are identical, the task that was submitted first (FIFO) will typically take precedence.

IsCacheable(v bool) *clusterTaskBuilder - Determines if the task result should be stored in memory after completion.

[!TIP] Retention Policy: Currently, cached results are stored for 5 minutes after the task completes. After this period, the result is purged from memory to prevent leaks. (Note: This duration may become configurable in future releases).

Submit() (string, error) - The final method in the chain. It validates the task, generates an ID (if empty), and pushes the task into the scheduler.

[!WARNING] Returns an error if a task with the same ID is already running or managed by the cluster.


core concepts & usage

installation

go get github.com/bozylik/taskara@v0.0.2-alpha

Usage:

import (
"github.com/bozylik/taskara/task"
"github.com/bozylik/taskara/cluster"
)

part 1: TaskFunc (job)

TaskFunc - is a type from the taskara/task package that defines the function (job) to be executed:
type TaskFunc func(id string, workerCtx context.Context, cancelled <-chan struct{}, report Reporter)

Argument Type Description
id string The unique identifier of the task
ctx context.Context General context. Triggers on manual cancel, timeout, and cluster shutdown
cancelled <-chan struct{} Special channel. Triggers only on manual user cancellation via CancelTask()
report Reporter Callback function to send results and errors back to the cluster

type Reporter func(id string, val any, err error) - reporter is a callback used to send results back to the cluster.

Argument Type Description
id string The unique task identifier (passed from TaskFunc)
val any Any data you want to return as a result (interface{})
err error An error object if the task failed, otherwise nil
job1 := func(id string, ctx context.Context, cancelled <-chan struct{}, report task.Reporter) {

select {
case <-cancelled:
return
case <-ctx.Done():
fmt.Println("timeout")
return
case <-time.After(4 * time.Second):
fmt.Printf("[%s] Working job1...\n", id)
}

report(id, "Data from task-1", nil)
}

[!TIP] Why two signals? Use ctx.Done() for general cleanup. Use cancelled only if you need to distinguish a manual user "Abort" from a timeout. If you return without calling report(), the cluster will automatically finalize the task with nil results.

part 2: task and TaskInterface

A task is a base struct used within clusters. It contains a unique ID and the function to be executed.
task - struct with id string and fn TaskFunc fields.
TaskInterface - interface that provides encapsulation for task methods.

task1 := task.NewTask("", job1)
// or specify a custom ID
task1 := task.NewTask("task-1", job1)

part 3: cluster and ClusterInterface

The Cluster manages task execution. It contains all necessary methods for running, cancelling tasks, and the cluster itself.
cluster - struct that contains executor, scheduler, and state.
ClusterInterface - interface that provides encapsulation for cluster methods.

// Cluster context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create cluster with 2 active workers
myCluster := cluster.NewCluster(2, ctx)
myCluster.Run()

part 4: clusterTask and clusterTaskBuilder

Result is what you get from the subscription channel.

type Result struct {
Result any   // Data from the task
Err    error // Error from the task, timeout, or panic
}

clusterTask - is an internal task struct that represents a task within the cluster's lifecycle.
clusterTaskBuilder - is a helper that allows you to use chaining while adding a new task.

clusterTaskID, err := myCluster.AddTask(task1).
WithStartTime(time.Now().Add(5 * time.Second)).
WithTimeout(5 * time.Second).
IsCacheable(true).
Submit()

if err != nil {
// Panic for example
panic(err)
}

resChan, err := myCluster.Subscribe(clusterTaskID)
if err != nil {
// Panic for example
panic(err)
}

fmt.Println("Results:", <-resChan)

examples

Check out our usage examples, to see more complex use cases and implementation details.

Documentation

The Go Gopher

There is no documentation for this package.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL