stopper

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: Apache-2.0 Imports: 4 Imported by: 6

README

Graceful Golang Task Lifecycle Management

Go Reference codecov

go get vawter.tech/stopper

This package contains a utility for gracefully terminating long-running tasks within a Go program. A stopper.Context extends the stdlib context.Context API with a soft-stop signal and includes task-launching APIs similar to WaitGroup or ErrGroup.

Supported use-cases:

API Use

There are a number of examples in the package docs showing a variety of usecase patterns for polling and callback-style network servers.

// Create a stopper context from an existing context.
ctx := stopper.WithContext(context.Background())

// Respond to signals.
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
stopper.StopOnReceive(ctx, time.Second, ch)

// Do work, often in a loop.
ctx.Go(func(ctx *stopper.Context) error {
  for !ctx.IsStopping() {
  }
  return nil
})

// Plays nicely with channels.
ctx.Go(func(ctx *stopper.Context) error {
  for {
    select {
    case <-ctx.Stopping():
      return nil
    case work := <-sourceOfWork:
      // Launches additional workers.
      ctx.Go(func(ctx *stopper.Context) error {
        return process(ctx, work)
      })
    }
  }
})

subCtx := stopper.WithContext(ctx) // Nested contexts can be created.
subCtx.Stop(time.Second)           // Won't affect the outer context.

// Blocks until all managed goroutines are done.
if err := ctx.Wait(); err != nil {
  panic(err)
}

Tracing

Stopper also interoperates with runtime/trace or other modules that create custom context.Context instances by way of the stopper.Context.With() method.

A view of nested golang trace regions

Project History

This repository was extracted from github.com/cockroachdb/field-eng-powertools using the command git filter-repo --subdirectory-filter stopper --path LICENSE by the code's original author.

Documentation

Overview

Package stopper contains a utility for gracefully terminating long-running tasks within a Go program.

Example (Features)
package main

import (
	"context"
	"os"
	"os/signal"
	"time"

	"vawter.tech/stopper"
)

func main() {
	// Create a stopper context from an existing context.
	ctx := stopper.WithContext(context.Background())

	// Respond to signals.
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt)
	stopper.StopOnReceive(ctx, time.Second, ch)

	// Do work, often in a loop.
	ctx.Go(func(ctx *stopper.Context) error {
		for !ctx.IsStopping() {
		}
		return nil
	})

	// Plays nicely with channels.
	ctx.Go(func(ctx *stopper.Context) error {
		for {
			select {
			case <-ctx.Stopping():
				return nil
			case work := <-sourceOfWork:
				// Launches additional workers.
				ctx.Go(func(ctx *stopper.Context) error {
					return process(ctx, work)
				})
			}
		}
	})

	subCtx := stopper.WithContext(ctx) // Nested contexts can be created.
	subCtx.Stop(time.Second)           // Won't affect the outer context.

	// Blocks until all managed goroutines are done.
	if err := ctx.Wait(); err != nil {
		panic(err)
	}
}

var sourceOfWork chan struct{}

// The stopper.Context type fits into existing context plumbing and can be
// retrieved later on.
func process[T any](ctxCtx context.Context, work T) error {
	stopperCtx := stopper.From(ctxCtx)
	stopperCtx.Go(func(ctx *stopper.Context) error {
		return nil
	})
	return nil
}
Example (NetServer)

A pattern for using a stopper with an accept/poll type of network listening API.

package main

import (
	"bufio"
	"context"
	"fmt"
	"log/slog"
	"net"
	"os"
	"os/signal"
	"time"

	"vawter.tech/stopper"
)

func main() {
	ctx := stopper.WithContext(context.Background())

	// Respond to interrupts with a 30-second grace period.
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt)
	stopper.StopOnReceive(ctx, 30*time.Second, ch)

	// Open a network listener.
	const addr = "127.0.0.1:13013"
	l, err := net.Listen("tcp", addr)
	if err != nil {
		panic(err)
	}
	// Close the listener when the context stops.
	ctx.Go(func(ctx *stopper.Context) error {
		<-ctx.Stopping()
		return l.Close()
	})
	// Accept connections.
	ctx.Go(func(ctx *stopper.Context) error {
		for {
			conn, err := l.Accept()
			// This returns an error when the listener has been closed.
			if err != nil {
				return nil
			}
			// Handle the connection in its own goroutine.
			ctx.Go(func(ctx *stopper.Context) error {
				defer func() { _ = conn.Close() }()

				s := bufio.NewScanner(conn)
				s.Scan()
				fmt.Println(s.Text())

				// Simulate an OS signal to stop the app.
				ch <- os.Interrupt

				return nil
			})
		}
	})
	// A task to send a message.
	ctx.Go(func(ctx *stopper.Context) error {
		conn, err := net.Dial("tcp", addr)
		if err != nil {
			panic(err)
		}
		_, _ = conn.Write([]byte("Hello World!\n"))
		if err := conn.Close(); err != nil {
			slog.ErrorContext(ctx, "could not send data", "error", err)
		}
		return nil
	})
	// Block until tasks are complete.
	if err := ctx.Wait(); err != nil {
		slog.ErrorContext(ctx, "internal task error", "error", err)
	}
}
Output:

Hello World!
Example (Testing)

This is a general pattern for constructing a stopper.Context for testing purposes. The specifics of error reporting, timeouts, and other administrivia will vary across projects, hence this not being part of the stopper module.

package main

import (
	"context"
	"testing"
	"time"

	"vawter.tech/stopper"
	"vawter.tech/stopper/linger"
)

func NewStopperForTest(t *testing.T) *stopper.Context {
	const grace = 5 * time.Second

	// Impose a per-test timout.
	stdCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	t.Cleanup(cancel)

	// Add tracking for where goroutine tasks are started.
	rec := linger.NewRecorder(10 /* depth */)
	ctx := stopper.WithInvoker(stdCtx, rec.Invoke)

	// Register a cleanup, which could be a deferred function, that will stop
	// the context, wait for all tasks to exit, and then verify that there are
	// no lingering goroutines associated with the context.
	t.Cleanup(func() {
		ctx.Stop(grace)
		if err := ctx.Wait(); err != nil {
			t.Errorf("task returned an error: %v", err)
		}
		linger.CheckClean(t, rec)
	})

	return ctx
}

// This is a general pattern for constructing a [stopper.Context] for testing
// purposes. The specifics of error reporting, timeouts, and other administrivia
// will vary across projects, hence this not being part of the stopper module.
func main() {}

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrGracePeriodExpired = errors.New("grace period expired")

ErrGracePeriodExpired will be returned from context.Cause when the Context has been stopped, but the goroutines have not exited.

View Source
var ErrStopped = errors.New("stopped")

ErrStopped will be returned from context.Cause when the Context has been stopped.

Functions

func Harden added in v1.2.0

func Harden(ctx *Context) context.Context

Harden adapts the soft-stop behaviors of a stopper into a context.Context. This can be used whenever it is necessary to call other APIs that should be made aware of the soft-stop condition.

The returned context has the following behaviors:

Example

This shows how the soft-stop behavior can be propagated to other APIs via the context.Context interface.

package main

import (
	"context"
	"fmt"

	"vawter.tech/stopper"
)

func main() {
	soft := stopper.WithContext(context.Background())
	soft.Go(func(ctx *stopper.Context) error {
		hard := stopper.Harden(ctx)
		// This is a stand-in for any call to an API that accepts a
		// context.Context.
		<-hard.Done()
		fmt.Println("Done")
		return nil
	})
	soft.Stop(0)
	if err := soft.Wait(); err != nil {
		panic(err)
	}
}
Output:

Done

func HardenFrom added in v1.2.0

func HardenFrom(ctx context.Context) context.Context

HardenFrom returns a hardened context for any input context.

func IsStopping

func IsStopping(ctx context.Context) bool

IsStopping is a convenience method to determine if a stopper is associated with a Context and if work should be stopped.

func StopOnReceive added in v1.0.1

func StopOnReceive[T any](ctx *Context, gracePeriod time.Duration, ch <-chan T)

StopOnReceive will gracefully stop the Context when a value is received from the channel or if the channel is closed.

This can be used, for example, with os/signal.Notify.

Example (Interrupt)
package main

import (
	"context"
	"os"
	"os/signal"
	"time"

	"vawter.tech/stopper"
)

func main() {
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	ctx := stopper.WithContext(context.Background())
	stopper.StopOnReceive(ctx, time.Second, signals)

	ctx.Go(func(ctx *stopper.Context) error {
		// Do other work
		return nil
	})

	if err := ctx.Wait(); err != nil {
		os.Exit(1)
	}
	os.Exit(0)
}

Types

type Adaptable added in v1.2.0

type Adaptable interface {
	func() | func() error |
		func(context.Context) | func(context.Context) error |
		func(*Context) | Func
}

Adaptable is the set of function signatures accepted by Fn.

type Context

type Context struct {
	// contains filtered or unexported fields
}

A Context is conceptually similar to an [errgroup.Group] in that it manages a context.Context whose lifecycle is associated with some number of goroutines. Rather than canceling the associated context when a goroutine returns an error, it cancels the context after the Stop method is called and all associated goroutines have all exited.

As an API convenience, the Context type implements context.Context so that it fits into idiomatic context-plumbing. The From function can be used to retrieve a Context from any context.Context.

Example (Nested)

This example shows that contexts may be nested. Stop signals will propagate from enclosing to inner contexts, while the Len() and Wait() methods are aware of child contexts.

package main

import (
	"context"
	"fmt"
	"time"

	"vawter.tech/stopper"
)

func main() {
	outer := stopper.WithContext(context.Background())
	middle := stopper.WithContext(outer)
	inner := stopper.WithContext(middle)

	middle.Go(func(ctx *stopper.Context) error {
		<-ctx.Stopping()
		return nil
	})

	inner.Go(func(ctx *stopper.Context) error {
		<-ctx.Stopping()
		return nil
	})

	fmt.Println("outer", outer.Len())
	fmt.Println("middle", middle.Len())
	fmt.Println("inner", inner.Len())

	// Stopping a parent context stops the child contexts.
	outer.Stop(time.Second)

	// Wait for all nested tasks.
	if err := outer.Wait(); err != nil {
		panic(err)
	}

	fmt.Println("outer", outer.Len())

}
Output:

outer 2
middle 2
inner 1
outer 0
Example (Ticker)

This example shows how a background task that should execute on a regular basis may be implemented.

package main

import (
	"context"
	"fmt"
	"time"

	"vawter.tech/stopper"
)

func main() {
	ctx := stopper.WithContext(context.Background())

	ctx.Go(func(ctx *stopper.Context) error {
		for {
			// Do some background task.
			select {
			case <-time.After(time.Second):
				// Loop around.
			case <-ctx.Stopping():
				// This channel closes when Stop() is called. The
				// context is not yet cancelled at this point.
				return nil
			case <-ctx.Done():
				// This is a hard-stop condition because either the
				// underlying context.Context was canceled or the task
				// has outlived its graceful shutdown time.
				return ctx.Err()
			}
		}
	})

	// Do other things.
	fmt.Println("task count:", ctx.Len())

	// Calling Stop() makes the Stopping channel close, allowing
	// processes one second before the context is hard-cancelled.
	ctx.Stop(time.Second)

	// Callers can wait for all tasks to finish, similar to an ErrGroup.
	if err := ctx.Wait(); err != nil {
		panic(err)
	}
	fmt.Println("task count:", ctx.Len())

}
Output:

task count: 1
task count: 0

func Background

func Background() *Context

Background is analogous to context.Background. It returns a Context which cannot be stopped or canceled, but which is otherwise functional.

func From

func From(ctx context.Context) *Context

From returns a pre-existing Context from the Context chain. Use WithContext to construct a new Context.

If the chain is not associated with a Context, the Background instance will be returned.

func WithContext

func WithContext(ctx context.Context) *Context

WithContext creates a new Context whose work will be immediately canceled when the parent context is canceled. If the provided context is already managed by a Context, a call to the enclosing Context.Stop method will also trigger a call to Stop in the newly-constructed Context.

func WithInvoker added in v1.1.0

func WithInvoker(ctx context.Context, i Invoker) *Context

WithInvoker is equivalent to WithContext, except that the given Invoker will be used to execute the functions passed to Context.Call and Context.Go. The Invoker of a newly-defined context will be composed with the Invoker defined in a parent context, if any.

Example (ObserveLifecycle)

This shows the sequence of callbacks when nested contexts have Invokers defined. Note that the setup phase is bottom-up, while execution is top-down.

package main

import (
	"context"
	"fmt"
	"time"

	"vawter.tech/stopper"
)

func main() {
	outer := stopper.WithInvoker(context.Background(),
		func(fn stopper.Func) stopper.Func {
			fmt.Println("outer setting up")
			return func(ctx *stopper.Context) error {
				fmt.Println("outer start")
				defer fmt.Println("outer end")
				return fn(ctx)
			}
		})
	middle := stopper.WithInvoker(outer,
		func(fn stopper.Func) stopper.Func {
			fmt.Println("middle setting up")
			return func(ctx *stopper.Context) error {
				fmt.Println("middle start")
				defer fmt.Println("middle end")
				return fn(ctx)
			}
		})
	inner := stopper.WithContext(middle)
	inner.Go(func(ctx *stopper.Context) error {
		fmt.Println("here")
		return nil
	})
	outer.Stop(time.Second)
	if err := outer.Wait(); err != nil {
		panic(err)
	}
}
Output:

middle setting up
outer setting up
outer start
middle start
here
middle end
outer end

func (*Context) Call

func (c *Context) Call(fn func(ctx *Context) error) error

Call executes the given function within the current goroutine and monitors its lifecycle. That is, both Call and Wait will block until the function has returned.

Call returns any error from the function with no other side effects. Unlike the Go method, Call does not stop the Context if the function returns an error. If the Context has already been stopped, ErrStopped will be returned.

The function passed to Call should prefer the Context.Stopping channel to return instead of depending on Context.Done. This allows a soft-stop, rather than waiting for the grace period to expire when Context.Stop is called.

See Fn to create function signature adaptors.

Example (HttpServer)

An example showing the use of stopper.Context.Call when a goroutine is already allocated by some other API. In this case, the HTTP server creates a goroutine per request, but we still want to be able to interact with a stopper hierarchy.

package main

import (
	"context"
	"errors"
	"fmt"
	"io"
	"log/slog"
	"net/http"
	"net/http/httptest"
	"os"
	"os/signal"
	"strings"
	"time"

	"vawter.tech/stopper"
)

func main() {
	ctx := stopper.WithContext(context.Background())

	// Respond to interrupts with a 30-second grace period.
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt)
	stopper.StopOnReceive(ctx, 30*time.Second, ch)

	// The HTTP server creates goroutines for us and has its own expectations
	// around goroutine/request lifecycles.
	svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// We use With() here so that a caller hangup can immediately stop any
		// work. This is optional, and might not be desirable in all
		// circumstances.
		err := ctx.With(r.Context()).Call(func(ctx *stopper.Context) error {
			data, err := io.ReadAll(r.Body)
			if err != nil {
				return err
			}
			fmt.Println(string(data))
			w.WriteHeader(http.StatusAccepted)
			return nil
		})

		if err == nil {
			return
		}

		// If Stop() has been called, Call() will immediately return ErrStopped.
		// We can respond to the client (or load-balancer) saying that this
		// instance of the server cannot fulfill the request.
		if errors.Is(err, stopper.ErrStopped) {
			w.Header().Add("Retry-After", "0")
			w.WriteHeader(http.StatusServiceUnavailable)
			return
		}

		slog.ErrorContext(ctx, "handler error", "error", err)
		w.WriteHeader(http.StatusInternalServerError)
	}))

	// A task to make an HTTP request.
	ctx.Go(func(ctx *stopper.Context) error {
		_, err := svr.Client().Post(svr.URL, "text/plain", strings.NewReader("Hello World!"))
		if err != nil {
			// Returning an error here will make the call to Wait() below fail
			// out. This makes sense for an example, but production code should
			// handle the error in a reasonable way and return nil.
			return err
		}

		// Simulate an OS shutdown.
		ch <- os.Interrupt
		return nil
	})

	if err := ctx.Wait(); err != nil {
		slog.ErrorContext(ctx, "internal task error", "error", err)
	}
}
Output:

Hello World!

func (*Context) Deadline

func (c *Context) Deadline() (deadline time.Time, ok bool)

Deadline implements context.Context.

func (*Context) Defer

func (c *Context) Defer(fn func())

Defer registers a callback that will be executed after the Context.Done channel is closed. This method can be used to clean up resources that are used by goroutines associated with the Context (e.g. closing database connections). The Context will already have been canceled by the time the callback is run, so its behaviors should be associated with context.Background or similar. Callbacks will be executed in a LIFO manner. If the Context has already stopped, the callback will be executed immediately.

Calling this method on the Background context will panic, since that context can never be cancelled.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"vawter.tech/stopper"
)

func main() {
	ctx := stopper.WithContext(context.Background())
	// Deferred functions are executed in reverse order.
	ctx.Defer(func() {
		fmt.Println("defer 0")
	})
	ctx.Defer(func() {
		fmt.Println("defer 1")
	})
	// This will run in a separate goroutine and then stop the context.
	ctx.Go(func(ctx *stopper.Context) error {
		fmt.Println("task")
		ctx.Stop(time.Second)
		return nil
	})
	// Wait for all tasks, including deferred callbacks to be complete.
	if err := ctx.Wait(); err != nil {
		fmt.Println(err)
	}
	fmt.Println("finished")
}
Output:

task
defer 1
defer 0
finished

func (*Context) Done

func (c *Context) Done() <-chan struct{}

Done implements context.Context. The channel that is returned will be closed when Stop has been called and all associated goroutines have exited. The returned channel will be closed immediately if the parent context (passed to WithContext) is canceled. Functions passed to Context.Go should prefer Context.Stopping instead.

func (*Context) Err

func (c *Context) Err() error

Err implements context.Context. When the return value for this is context.ErrCanceled, context.Cause will return ErrStopped if the context cancellation resulted from a call to Stop.

func (*Context) Go

func (c *Context) Go(fn func(ctx *Context) error) (accepted bool)

Go spawns a new goroutine to execute the given function and monitors its lifecycle.

If the function returns an error, the Stop method will be called. The returned error will be available from Wait once the remaining goroutines have exited.

This method will not execute the function and return false if Stop has already been called.

The function passed to Go should prefer the Context.Stopping channel to return instead of depending on Context.Done. This allows a soft-stop, rather than waiting for the grace period to expire when Context.Stop is called.

See Fn to create function signature adaptors.

func (*Context) IsStopping

func (c *Context) IsStopping() bool

IsStopping returns true once [Stop] has been called. See also [Stopping] for a notification-based API.

func (*Context) Len

func (c *Context) Len() int

Len returns the number of tasks being tracked by the Context. This includes tasks started by derived Contexts.

func (*Context) Stop

func (c *Context) Stop(gracePeriod time.Duration)

Stop begins a graceful shutdown of the Context. When this method is called, the Stopping channel will be closed. Once all goroutines started by Go have exited, the associated Context will be cancelled, thus closing the Done channel. If the gracePeriod is non-zero, the context will be forcefully cancelled if the goroutines have not exited within the given timeframe.

func (*Context) StopOnIdle added in v1.2.0

func (c *Context) StopOnIdle()

StopOnIdle causes the Context to automatically call Context.Stop once Context.Len equals zero. This is useful for stoppers that represent a finite pool of tasks that will eventually conclude. All methods on the Context will continue to operate in their usual manners after calling this method. Calling this method on the Background Context has no effect. Calling this method on an idle Context behaves as if Context.Stop had been called instead.

Example
package main

import (
	"context"
	"fmt"
	"sync/atomic"

	"vawter.tech/stopper"
)

func main() {
	ctx := stopper.WithContext(context.Background())
	var nestedDidAccept atomic.Bool
	ctx.Go(func(ctx *stopper.Context) error {
		// It's still valid to create additional tasks or additional
		// nested stoppers.
		ok := ctx.Go(func(ctx *stopper.Context) error {
			// Do nested tasks...
			return nil
		})
		nestedDidAccept.Store(ok)
		return nil
	})
	// StopOnIdle shouldn't be called until all parent tasks have been
	// started.
	ctx.StopOnIdle()
	// Wait doesn't return until all tasks are complete.
	err := ctx.Wait()
	fmt.Printf("OK: %t %t\n", err == nil, nestedDidAccept.Load())

}
Output:

OK: true true

func (*Context) Stopping

func (c *Context) Stopping() <-chan struct{}

Stopping returns a channel that is closed when a graceful shutdown has been requested or when a parent context has been stopped.

func (*Context) Value

func (c *Context) Value(key any) any

Value implements context.Context.

func (*Context) Wait

func (c *Context) Wait() error

Wait will block until Stop has been called and all associated goroutines have exited or the parent context has been cancelled. This method will return the first, non-nil error from any of the callbacks passed to Go. If Wait is called on the Background instance, it will immediately return nil.

func (*Context) With added in v1.0.1

func (c *Context) With(ctx context.Context) *Context

With returns a Context that is otherwise equivalent to the receiver, save that all context.Context behavior is delegated to the new context. This enables interaction with the runtime/trace package or other libraries that generate custom context.Context instances.

Since the With method does not create a new, nested stopper hierarchy, it is less expensive than calling WithContext in tracing scenarios.

Example (Tracing)

This shows how the runtime/trace package, or any other package that creates custom context.Context instances, can be interoperated with.

package main

import (
	"context"
	"fmt"
	"os"
	"runtime/trace"
	"time"

	"vawter.tech/stopper"
)

func main() {
	f, err := os.OpenFile("trace.out", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
	if err != nil {
		panic(err)
	}
	defer func() {
		if err := f.Close(); err != nil {
			panic(err)
		}
		fmt.Println("trace written to", f.Name())
	}()

	if err := trace.Start(f); err != nil {
		panic(err)
	}
	defer trace.Stop()

	rootCtx, rootTask := trace.NewTask(context.Background(), "root task")
	defer rootTask.End()

	ctx := stopper.WithContext(rootCtx)
	defer trace.StartRegion(ctx, "root region").End()

	midCtx, midTask := trace.NewTask(ctx, "mid task")
	ctx.With(midCtx).Go(func(ctx *stopper.Context) error {
		defer midTask.End()
		defer trace.StartRegion(ctx, "mid region").End()
		trace.Log(ctx, "message", "middle task is here")

		innerCtx, innerTask := trace.NewTask(ctx, "inner task")
		ctx.With(innerCtx).Go(func(ctx *stopper.Context) error {
			defer innerTask.End()
			defer trace.StartRegion(ctx, "inner region").End()
			trace.Log(ctx, "message", "inner task is here")
			ctx.Stop(time.Second)
			return nil
		})

		return nil
	})

	if err := ctx.Wait(); err != nil {
		panic(err)
	}
}
Output:

trace written to trace.out

type Func added in v1.1.0

type Func = func(*Context) error

Func is a convenience type alias for the signature of functions invoked by a Context.

func Fn added in v1.2.0

func Fn[A Adaptable](fn A) Func

Fn adapts various function signatures to be compatible with Context.

Example
package main

import (
	"context"

	"vawter.tech/stopper"
)

type Thing struct{}

func (t *Thing) DoSomething() error { return nil }

func main() {
	ctx := stopper.WithContext(context.Background())
	t := &Thing{}
	ctx.Go(stopper.Fn(t.DoSomething))
}

type Invoker added in v1.1.0

type Invoker func(fn Func) Func

An Invoker arranges to call the provided Func. See WithInvoker for additional details.

Directories

Path Synopsis
Package linger contains a utility for reporting on where lingering tasks were originally started.
Package linger contains a utility for reporting on where lingering tasks were originally started.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL