taskgroup

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2026 License: MIT Imports: 8 Imported by: 0

README

taskgroup: scoped lifecycles for long-running Go tasks

CI Lint CodeQL Go Reference Go Report Card Go Version Release License

Starting goroutines in Go is easy. Getting them to stop together, in order, with cleanup that actually runs? That's the part where main.go quietly goes from neat to tangled. taskgroup handles that lifecycle for you.

Install

go get github.com/gokern/taskgroup

Requires Go 1.26+.

Example

A typical main.go assembles services and runs them as one unit:

func main() {
	ctx := context.Background()

	tasks := taskgroup.New()
	tasks.Add(taskgroup.SignalTask())
	tasks.Add(bootstrap.GRPCServerTask("api", apiGRPC, cfg.APIAddr()))
	tasks.Add(bootstrap.MetricsServerTask(cfg.Prometheus.Port))

	if err := tasks.Run(ctx); err != nil {
		log.Fatal(err)
	}
}

Every entry is a ready-made taskgroup.Task that knows how to start and stop itself, so main.go stays flat. A typical helper looks like this:

func GRPCServerTask(name string, srv *grpc.Server, addr string) taskgroup.Task {
	return taskgroup.NewTask(func(context.Context) error {
		lis, err := net.Listen("tcp", addr)
		if err != nil {
			return err
		}
		return srv.Serve(lis)
	}).Interrupt(func(error) {
		srv.GracefulStop()
	})
}

When Ctrl-C or SIGTERM arrives, SignalTask returns, the group cancels its run context, and every registered interrupt fires. All tasks exit and Run returns their joined errors.

API

Building tasks:

  • taskgroup.NewTask(fn).Interrupt(stop) — a task with an explicit stop hook.
  • taskgroup.SignalTask(sigs...) — a ready-made task that stops on shutdown signals.

Group lifecycle:

  • taskgroup.New() — create a group.
  • tg.Add(task) / tg.AddFunc(fn) — add a task.
  • tg.Defer(fn) — cleanup after every task exits, LIFO like Go defer.
  • tg.Run(ctx) — start the group; returns the first error.

Runnable examples are in example_test.go. Everything else is in the godoc.

Signals

SignalTask() listens for os.Interrupt on Windows and os.Interrupt + SIGTERM on Unix. Pass your own to override:

tg.Add(taskgroup.SignalTask(syscall.SIGHUP, syscall.SIGTERM))

Detect a signal shutdown with IsSignalError, extract the signal with SignalFromError.

Errors

Run returns the first task error, the run context error, or nil if the first task finished cleanly. Errors from interrupts and defers are joined via errors.Join. Ordinary task errors that arrive after shutdown starts are dropped, so http.ErrServerClosed doesn't hide the real reason you stopped. Panics are recovered, wrapped with ErrPanic, and joined in.

Scope

taskgroup is for application lifecycle: glue together servers, workers, signal handlers, and cleanup in one place. It's not a worker pool or a replacement for errgroup when you want to fan out a batch of jobs and collect their results.

Documentation

Overview

Package taskgroup coordinates related long-running tasks that should live and stop as one unit: servers, workers, signal handlers, background loops, and cleanup steps.

A TaskGroup is run once. When the first task returns, or when the run context is canceled, the group interrupts every task concurrently, waits for all tasks to return, then runs deferred cleanup functions in last-in-first-out order.

Run returns the first task error, the run context error, or nil. Errors and panics from Interrupt and Defer functions, and panics from any task, are joined with the primary error; recovered panics are wrapped with ErrPanic.

See the Example functions for common patterns.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrPanic = errors.New("panic")

ErrPanic is the sentinel that wraps every panic recovered by the package. Test with errors.Is(err, taskgroup.ErrPanic) to detect a recovered panic.

Functions

func IsSignalError

func IsSignalError(err error) bool

IsSignalError reports whether err contains an error returned by SignalTask.

func SignalFromError

func SignalFromError(err error) (os.Signal, bool)

SignalFromError returns the signal contained in err, and reports whether one was found. If err does not wrap a signal error, it returns nil, false.

Types

type DeferFunc

type DeferFunc func(error) error

DeferFunc runs after all tasks have returned.

It receives the primary shutdown error: the first task error or recovered panic, the run context error, or nil when the first task returned cleanly.

type ExecuteFunc

type ExecuteFunc func(context.Context) error

ExecuteFunc is the main body of a task.

type InterruptFunc

type InterruptFunc func(error)

InterruptFunc is called when the TaskGroup starts shutting down.

Interrupt functions must be safe to call after their task has already returned. They may run concurrently with other interrupt functions and should make the corresponding ExecuteFunc return promptly.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task is a unit of work scheduled by a TaskGroup.

func NewTask added in v1.1.0

func NewTask(execute ExecuteFunc) Task

NewTask creates a task from execute.

Example

A task with an explicit shutdown function. Interrupt runs when the group starts shutting down.

package main

import (
	"context"
	"fmt"

	"github.com/gokern/taskgroup"
)

func main() {
	task := taskgroup.NewTask(func(ctx context.Context) error {
		<-ctx.Done()

		return ctx.Err()
	}).Interrupt(func(error) {
		fmt.Println("stop")
	})

	tg := taskgroup.New()
	tg.Add(task)
	// Second task returns immediately, triggering shutdown of the first.
	tg.AddFunc(func(context.Context) error {
		fmt.Println("done")

		return nil
	})

	_ = tg.Run(context.Background())
}
Output:
done
stop

func SignalTask added in v1.1.0

func SignalTask(signals ...os.Signal) Task

SignalTask returns a task that exits when the context is canceled or a signal is received.

When no signals are provided, SignalTask listens for the platform's standard shutdown signals.

On signal the task returns an opaque error. Detect it with IsSignalError and extract the signal with SignalFromError.

Signals delivered before the task body runs get the OS default action. Programs that need to catch signals at startup should call signal.Notify or signal.Ignore before Run.

Example

SignalTask stops the group on shutdown signals. Detect the cause with IsSignalError and SignalFromError.

package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/gokern/taskgroup"
)

func main() {
	tg := taskgroup.New()
	tg.Add(taskgroup.SignalTask())

	// Immediately canceled context so the example terminates deterministically.
	ctx, cancel := context.WithCancel(context.Background())
	cancel()

	err := tg.Run(ctx)

	switch {
	case taskgroup.IsSignalError(err):
		sig, _ := taskgroup.SignalFromError(err)
		fmt.Println("stopped by signal:", sig)
	case errors.Is(err, context.Canceled):
		fmt.Println("canceled")
	}
}
Output:
canceled

func (Task) Interrupt

func (t Task) Interrupt(interrupt InterruptFunc) Task

Interrupt returns a copy of t with the interrupt function set. See InterruptFunc for required semantics.

type TaskGroup

type TaskGroup struct {
	// contains filtered or unexported fields
}

TaskGroup manages a collection of concurrent tasks.

A TaskGroup is run once. When the first task returns, or when the run context is canceled, the group interrupts every task concurrently, waits for all tasks to return, then runs deferred cleanup functions in last-in-first-out order.

Example

A zero-configuration group that runs one task to completion.

package main

import (
	"context"
	"fmt"

	"github.com/gokern/taskgroup"
)

func main() {
	tg := taskgroup.New()

	tg.AddFunc(func(context.Context) error {
		fmt.Println("working")

		return nil
	})

	err := tg.Run(context.Background())
	if err != nil {
		fmt.Println("error:", err)
	}
}
Output:
working

func New

func New() *TaskGroup

New creates an empty TaskGroup.

func (*TaskGroup) Add

func (g *TaskGroup) Add(task Task)

Add appends a task to the TaskGroup.

Add panics on an uninitialized Task; use NewTask (or helpers like SignalTask) to construct one.

func (*TaskGroup) AddFunc added in v1.1.0

func (g *TaskGroup) AddFunc(execute ExecuteFunc)

AddFunc appends a task created from execute to the TaskGroup.

func (*TaskGroup) Defer

func (g *TaskGroup) Defer(fn DeferFunc)

Defer appends a cleanup function to the TaskGroup.

Deferred functions run after all tasks have returned, in last-in-first-out order, like Go defer statements.

Example

Deferred cleanup runs after all tasks have returned, in LIFO order.

package main

import (
	"context"
	"fmt"

	"github.com/gokern/taskgroup"
)

func main() {
	tg := taskgroup.New()

	tg.Defer(func(error) error {
		fmt.Println("close db")

		return nil
	})
	tg.Defer(func(error) error {
		fmt.Println("flush metrics")

		return nil
	})

	tg.AddFunc(func(context.Context) error {
		fmt.Println("work")

		return nil
	})

	_ = tg.Run(context.Background())
}
Output:
work
flush metrics
close db

func (*TaskGroup) Run

func (g *TaskGroup) Run(ctx context.Context) error

Run executes all tasks in the group.

Run returns the first task error, the run context error, or nil when the first task returns cleanly. With no tasks Run returns nil and only runs deferred cleanup, even if ctx is already canceled. Tasks are started even when ctx is already canceled; each task sees ctx and decides how to handle it. If a task result and ctx cancellation arrive simultaneously, either one may become the primary error.

Panics from interrupt functions, and errors or panics from deferred cleanup, are joined with the primary error. Ordinary task errors after shutdown are dropped, but task panics after shutdown are recovered and joined. Every recovered panic is wrapped so errors.Is(err, ErrPanic) is true.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL