taskrunner

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2025 License: MIT Imports: 5 Imported by: 0

README

Go Task Runner

⚠️ Disclaimer: Experimental / Educational Use Only

This library is an experimental implementation for educational and testing purposes. It is NOT intended for production environments.

A multi-threaded programming architecture for Go, inspired by Chromium's Threading and Tasks design.

Reference: Threading and Tasks in Chrome

Design Philosophy

This library implements a threading model where developers post tasks to virtual threads (Task Runners) rather than managing raw goroutines or channels manually. This decoupling allows the underlying system to manage concurrency details while application code focuses on logic.

The core concepts derived from Chromium are:

  • Task Runners over Threads: You rarely create raw goroutines. Instead, you ask for a TaskRunner and post tasks to it.
  • Sequential Consistency (Strands): The SequencedTaskRunner acts like a single logical thread. Tasks posted to it are guaranteed to run sequentially, allowing you to write lock-free code for resources owned by that sequence.
  • Task Traits: You describe what the task is (e.g., UserBlocking, BestEffort) rather than how to run it. The system decides the appropriate priority.
  • Concurrency Safety: Runtime assertions ensure that sequential rules are strictly followed, preventing common race conditions.

Features

  • Goroutine Thread Pool: Efficient worker pool backing the execution model.
  • Sequenced Task Runner: Strict FIFO execution order for tasks within a stream.
  • Single Thread Task Runner: Guaranteed thread affinity for blocking IO and TLS.
  • Delayed Tasks: Scheduling tasks in the future.
  • Repeating Tasks: Execute tasks repeatedly at fixed intervals with easy stop control.
  • Task and Reply Pattern: Execute task on one runner, reply on another with type-safe return values.
  • Task Traits: Priority-aware task scheduling.

Installation

go get github.com/Swind/go-task-runner

Usage

1. Initialize the Global Thread Pool
package main

import (
    "context"
    "time"

    taskrunner "github.com/Swind/go-task-runner"
)

func main() {
    // Initialize the global thread pool with 4 workers
    taskrunner.InitGlobalThreadPool(4)
    defer taskrunner.ShutdownGlobalThreadPool()

    // Create a sequenced runner (like a logical thread)
    runner := taskrunner.CreateTaskRunner(taskrunner.DefaultTaskTraits())

    // ...
}
2. Using SequencedTaskRunner

The SequencedTaskRunner is the recommended way to execute tasks. It ensures that tasks posted to the same runner are executed sequentially, removing the need for mutexes for state protected by that runner.

    // Task 1
    runner.PostTask(func(ctx context.Context) {
        // This runs safely without locks relative to other tasks on this runner
        println("Doing work...")
    })

    // Task 2 (Delayed)
    runner.PostDelayedTask(func(ctx context.Context) {
        println("Runs 1 second later...")
    }, 1 * time.Second)
2.1 Using SingleThreadTaskRunner

The SingleThreadTaskRunner guarantees that all tasks execute on the same dedicated goroutine (thread affinity). This is useful for:

  • Blocking IO operations (e.g., NetworkReceiver with blocking reads)
  • CGO calls that require Thread Local Storage
  • UI Thread simulation where tasks must run on a specific thread
    // Create a single-threaded runner
    runner := taskrunner.NewSingleThreadTaskRunner()
    defer runner.Stop()

    // All tasks run on the same dedicated goroutine
    runner.PostTask(func(ctx context.Context) {
        // Blocking IO operation - safe on dedicated thread
        data := blockingRead()
        process(data)
    })

    // Delayed task - still on same goroutine
    runner.PostDelayedTask(func(ctx context.Context) {
        println("Runs on same goroutine after delay")
    }, 1 * time.Second)

See examples/single_thread for more examples.

2.2 Using PostTaskAndReply Pattern

The PostTaskAndReply pattern allows you to execute a task on one runner, then automatically post a reply to another runner when the task completes. This is perfect for UI/background work patterns.

    uiRunner := taskrunner.CreateTaskRunner(taskrunner.DefaultTaskTraits())
    bgRunner := taskrunner.CreateTaskRunner(taskrunner.TraitsBestEffort())

    uiRunner.PostTask(func(ctx context.Context) {
        me := taskrunner.GetCurrentTaskRunner(ctx)

        // Execute task on background runner, reply back to UI runner
        bgRunner.PostTaskAndReply(
            func(ctx context.Context) {
                // Heavy work on background thread
                loadDataFromServer()
            },
            func(ctx context.Context) {
                // Update UI on UI thread
                updateUI()
            },
            me, // Reply back to UI runner
        )
    })

With Return Values (Generic):

Use PostTaskAndReplyWithResult to pass data from task to reply:

    core.PostTaskAndReplyWithResult(
        bgRunner,
        func(ctx context.Context) (*UserData, error) {
            // Returns data and error
            return fetchUserFromDB(ctx)
        },
        func(ctx context.Context, user *UserData, err error) {
            // Receives data and error
            if err != nil {
                showError(err)
                return
            }
            updateUserUI(user)
        },
        uiRunner,
    )

See examples/task_and_reply for more examples.

3. Using Task Traits (Priorities)
    runner.PostTaskWithTraits(func(ctx context.Context) {
        println("High priority work!")
    }, taskrunner.TaskTraits{
        Priority: taskrunner.TaskPriorityUserBlocking,
    })
4. Repeating Tasks

Execute tasks repeatedly at fixed intervals:

    // Simple repeating task
    handle := runner.PostRepeatingTask(func(ctx context.Context) {
        println("Runs every second")
    }, 1*time.Second)

    // Stop when done
    defer handle.Stop()

    // With initial delay
    handle := runner.PostRepeatingTaskWithInitialDelay(
        task,
        2*time.Second,  // Start after 2 seconds
        1*time.Second,  // Then repeat every second
        taskrunner.DefaultTaskTraits(),
    )

See examples/repeating_task for more examples.

5. Shutdown and Cleanup

Gracefully shutdown a runner to stop all tasks:

    runner := taskrunner.CreateTaskRunner(taskrunner.DefaultTaskTraits())

    // Add tasks and repeating tasks...

    // Shutdown when done
    runner.Shutdown()  // Prevents new tasks, clears pending queue, stops all repeating tasks

    // Check if closed
    if runner.IsClosed() {
        println("Runner is closed")
    }

See examples/shutdown for more examples.

Architecture

See DESIGN.md for a deep dive into the internal architecture, including how the TaskScheduler, DelayManager, and TaskQueue interact.

License

MIT License. See LICENSE for details.

Documentation

Overview

Package taskrunner provides a Chromium-inspired task scheduling architecture for Go.

This library implements a threading model where developers post tasks to virtual threads (TaskRunners) rather than managing goroutines directly. The core design is inspired by Chromium's Threading and Tasks system.

Quick Start

Initialize the global thread pool at application startup:

taskrunner.InitGlobalThreadPool(4) // 4 workers
defer taskrunner.ShutdownGlobalThreadPool()

Create a SequencedTaskRunner for sequential task execution:

runner := taskrunner.CreateTaskRunner(taskrunner.DefaultTaskTraits())
runner.PostTask(func(ctx context.Context) {
	// Your code here - guaranteed sequential execution
})

Key Concepts

TaskRunner: Interface for posting tasks. Tasks posted to a SequencedTaskRunner execute sequentially, eliminating the need for locks on resources owned by that runner.

TaskTraits: Describes task attributes including priority (BestEffort, UserVisible, UserBlocking). Priority determines when the sequence gets scheduled, not the order within a sequence.

GoroutineThreadPool: The execution engine managing worker goroutines that pull and execute tasks from the scheduler.

Thread Safety

SequencedTaskRunner provides strict FIFO execution guarantees with runtime assertions. Tasks within a sequence never run concurrently, allowing lock-free programming for resources owned by that sequence.

Example

import (
	"context"
	taskrunner "github.com/Swind/go-task-runner"
)

func main() {
	taskrunner.InitGlobalThreadPool(4)
	defer taskrunner.ShutdownGlobalThreadPool()

	runner := taskrunner.CreateTaskRunner(taskrunner.DefaultTaskTraits())

	// Tasks execute sequentially
	runner.PostTask(func(ctx context.Context) {
		println("Task 1")
	})
	runner.PostTask(func(ctx context.Context) {
		println("Task 2")
	})

	// Delayed task
	runner.PostDelayedTask(func(ctx context.Context) {
		println("Task 3 - delayed")
	}, 1*time.Second)
}

For more details, see https://github.com/Swind/go-task-runner

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultTaskTraits  = core.DefaultTaskTraits
	TraitsUserBlocking = core.TraitsUserBlocking
	TraitsBestEffort   = core.TraitsBestEffort
	TraitsUserVisible  = core.TraitsUserVisible
)

Convenience functions for creating TaskTraits

View Source
var GetCurrentTaskRunner = core.GetCurrentTaskRunner

GetCurrentTaskRunner retrieves the current TaskRunner from context

Functions

func InitGlobalThreadPool

func InitGlobalThreadPool(workers int)

InitGlobalThreadPool initializes the global thread pool with specified number of workers. It starts the pool immediately.

func ShutdownGlobalThreadPool

func ShutdownGlobalThreadPool()

ShutdownGlobalThreadPool stops the global thread pool.

Types

type GoroutineThreadPool

type GoroutineThreadPool struct {
	// contains filtered or unexported fields
}

GoroutineThreadPool manages a set of worker goroutines Responsible for pulling tasks from WorkSource and executing them

func GetGlobalThreadPool

func GetGlobalThreadPool() *GoroutineThreadPool

GetGlobalThreadPool returns the global thread pool instance. It panics if InitGlobalThreadPool has not been called.

func NewGoroutineThreadPool

func NewGoroutineThreadPool(id string, workers int) *GoroutineThreadPool

NewGoroutineThreadPool creates a new GoroutineThreadPool

func NewPriorityGoroutineThreadPool

func NewPriorityGoroutineThreadPool(id string, workers int) *GoroutineThreadPool

func (*GoroutineThreadPool) ActiveTaskCount

func (tg *GoroutineThreadPool) ActiveTaskCount() int

func (*GoroutineThreadPool) DelayedTaskCount

func (tg *GoroutineThreadPool) DelayedTaskCount() int

func (*GoroutineThreadPool) ID

func (tg *GoroutineThreadPool) ID() string

ID returns the ID of the thread pool

func (*GoroutineThreadPool) IsRunning

func (tg *GoroutineThreadPool) IsRunning() bool

IsRunning returns whether the thread pool is running

func (*GoroutineThreadPool) Join

func (tg *GoroutineThreadPool) Join()

Join waits for all worker goroutines to finish

func (*GoroutineThreadPool) PostDelayedInternal

func (tg *GoroutineThreadPool) PostDelayedInternal(task core.Task, delay time.Duration, traits core.TaskTraits, target core.TaskRunner)

func (*GoroutineThreadPool) PostInternal

func (tg *GoroutineThreadPool) PostInternal(task core.Task, traits core.TaskTraits)

func (*GoroutineThreadPool) QueuedTaskCount

func (tg *GoroutineThreadPool) QueuedTaskCount() int

func (*GoroutineThreadPool) Start

func (tg *GoroutineThreadPool) Start(ctx context.Context)

Start starts all worker goroutines

func (*GoroutineThreadPool) Stop

func (tg *GoroutineThreadPool) Stop()

Stop stops the thread pool

func (*GoroutineThreadPool) StopGraceful added in v0.2.0

func (tg *GoroutineThreadPool) StopGraceful(timeout time.Duration) error

StopGraceful stops the thread pool gracefully, waiting for queued tasks to complete Returns error if timeout is exceeded before tasks complete

func (*GoroutineThreadPool) WorkerCount

func (tg *GoroutineThreadPool) WorkerCount() int

WorkerCount returns the number of workers

type RepeatingTaskHandle

type RepeatingTaskHandle = core.RepeatingTaskHandle

RepeatingTaskHandle controls the lifecycle of a repeating task

type ReplyWithResult

type ReplyWithResult[T any] = core.ReplyWithResult[T]

type SequencedTaskRunner

type SequencedTaskRunner = core.SequencedTaskRunner

SequencedTaskRunner ensures sequential execution of tasks

func CreateTaskRunner

func CreateTaskRunner(traits TaskTraits) *SequencedTaskRunner

CreateTaskRunner creates a new SequencedTaskRunner using the global thread pool. This is the recommended way to get a new TaskRunner.

func NewSequencedTaskRunner

func NewSequencedTaskRunner(pool ThreadPool) *SequencedTaskRunner

NewSequencedTaskRunner creates a new SequencedTaskRunner with the given thread pool. This is re-exported for advanced users who want to create runners with custom pools.

type SingleThreadTaskRunner

type SingleThreadTaskRunner = core.SingleThreadTaskRunner

SingleThreadTaskRunner ensures all tasks execute on the same dedicated goroutine

func NewSingleThreadTaskRunner

func NewSingleThreadTaskRunner() *SingleThreadTaskRunner

NewSingleThreadTaskRunner creates a new SingleThreadTaskRunner with a dedicated goroutine. Use this for blocking IO operations, CGO calls with thread-local storage, or UI thread simulation.

type Task

type Task = core.Task

Task is the unit of work (Closure)

type TaskPriority

type TaskPriority = core.TaskPriority

TaskPriority defines the priority levels for tasks

const (
	TaskPriorityBestEffort   TaskPriority = core.TaskPriorityBestEffort
	TaskPriorityUserVisible  TaskPriority = core.TaskPriorityUserVisible
	TaskPriorityUserBlocking TaskPriority = core.TaskPriorityUserBlocking
)

Priority constants

type TaskRunner

type TaskRunner = core.TaskRunner

TaskRunner is the interface for posting tasks

type TaskTraits

type TaskTraits = core.TaskTraits

TaskTraits defines task attributes (priority, blocking behavior, etc.)

type TaskWithResult

type TaskWithResult[T any] = core.TaskWithResult[T]

TaskWithResult and ReplyWithResult for generic PostTaskAndReply pattern

type ThreadPool

type ThreadPool = core.ThreadPool

ThreadPool is re-exported for type compatibility

Directories

Path Synopsis
examples
basic_sequence command
delayed_task command
mixed_priority command
repeating_task command
shutdown command
single_thread command
task_and_reply command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL