Documentation
¶
Overview ¶
Package stopper contains a utility for gracefully terminating long-running tasks within a Go program.
Example (Features) ¶
package main
import (
"context"
"os"
"os/signal"
"time"
"vawter.tech/stopper"
)
func main() {
// Create a stopper context from an existing context.
ctx := stopper.WithContext(context.Background())
// Respond to signals.
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
stopper.StopOnReceive(ctx, time.Second, ch)
// Do work, often in a loop.
ctx.Go(func(ctx *stopper.Context) error {
for !ctx.IsStopping() {
}
return nil
})
// Plays nicely with channels.
ctx.Go(func(ctx *stopper.Context) error {
for {
select {
case <-ctx.Stopping():
return nil
case work := <-sourceOfWork:
// Launches additional workers.
ctx.Go(func(ctx *stopper.Context) error {
return process(ctx, work)
})
}
}
})
subCtx := stopper.WithContext(ctx) // Nested contexts can be created.
subCtx.Stop(time.Second) // Won't affect the outer context.
// Blocks until all managed goroutines are done.
if err := ctx.Wait(); err != nil {
panic(err)
}
}
var sourceOfWork chan struct{}
// The stopper.Context type fits into existing context plumbing and can be
// retrieved later on.
func process[T any](ctxCtx context.Context, work T) error {
stopperCtx := stopper.From(ctxCtx)
stopperCtx.Go(func(ctx *stopper.Context) error {
return nil
})
return nil
}
Example (NetServer) ¶
A pattern for using a stopper with an accept/poll type of network listening API.
package main
import (
"bufio"
"context"
"fmt"
"log/slog"
"net"
"os"
"os/signal"
"time"
"vawter.tech/stopper"
)
func main() {
ctx := stopper.WithContext(context.Background())
// Respond to interrupts with a 30-second grace period.
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
stopper.StopOnReceive(ctx, 30*time.Second, ch)
// Open a network listener.
const addr = "127.0.0.1:13013"
l, err := net.Listen("tcp", addr)
if err != nil {
panic(err)
}
// Close the listener when the context stops.
ctx.Go(func(ctx *stopper.Context) error {
<-ctx.Stopping()
return l.Close()
})
// Accept connections.
ctx.Go(func(ctx *stopper.Context) error {
for {
conn, err := l.Accept()
// This returns an error when the listener has been closed.
if err != nil {
return nil
}
// Handle the connection in its own goroutine.
ctx.Go(func(ctx *stopper.Context) error {
defer func() { _ = conn.Close() }()
s := bufio.NewScanner(conn)
s.Scan()
fmt.Println(s.Text())
// Simulate an OS signal to stop the app.
ch <- os.Interrupt
return nil
})
}
})
// A task to send a message.
ctx.Go(func(ctx *stopper.Context) error {
conn, err := net.Dial("tcp", addr)
if err != nil {
panic(err)
}
_, _ = conn.Write([]byte("Hello World!\n"))
if err := conn.Close(); err != nil {
slog.ErrorContext(ctx, "could not send data", "error", err)
}
return nil
})
// Block until tasks are complete.
if err := ctx.Wait(); err != nil {
slog.ErrorContext(ctx, "internal task error", "error", err)
}
}
Output: Hello World!
Example (Testing) ¶
This is a general pattern for constructing a stopper.Context for testing purposes. The specifics of error reporting, timeouts, and other administrivia will vary across projects, hence this not being part of the stopper module.
package main
import (
"context"
"testing"
"time"
"vawter.tech/stopper"
"vawter.tech/stopper/linger"
)
func NewStopperForTest(t *testing.T) *stopper.Context {
const grace = 5 * time.Second
// Impose a per-test timout.
stdCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
t.Cleanup(cancel)
// Add tracking for where goroutine tasks are started.
rec := linger.NewRecorder(10 /* depth */)
ctx := stopper.WithInvoker(stdCtx, rec.Invoke)
// Register a cleanup, which could be a deferred function, that will stop
// the context, wait for all tasks to exit, and then verify that there are
// no lingering goroutines associated with the context.
t.Cleanup(func() {
ctx.Stop(grace)
if err := ctx.Wait(); err != nil {
t.Errorf("task returned an error: %v", err)
}
linger.CheckClean(t, rec)
})
return ctx
}
// This is a general pattern for constructing a [stopper.Context] for testing
// purposes. The specifics of error reporting, timeouts, and other administrivia
// will vary across projects, hence this not being part of the stopper module.
func main() {}
Index ¶
- Variables
- func Harden(ctx *Context) context.Context
- func HardenFrom(ctx context.Context) context.Context
- func IsStopping(ctx context.Context) bool
- func StopOnReceive[T any](ctx *Context, gracePeriod time.Duration, ch <-chan T)
- type Adaptable
- type Context
- func (c *Context) Call(fn func(ctx *Context) error) error
- func (c *Context) Deadline() (deadline time.Time, ok bool)
- func (c *Context) Defer(fn func())
- func (c *Context) Done() <-chan struct{}
- func (c *Context) Err() error
- func (c *Context) Go(fn func(ctx *Context) error) (accepted bool)
- func (c *Context) IsStopping() bool
- func (c *Context) Len() int
- func (c *Context) Stop(gracePeriod time.Duration)
- func (c *Context) StopOnIdle()
- func (c *Context) Stopping() <-chan struct{}
- func (c *Context) Value(key any) any
- func (c *Context) Wait() error
- func (c *Context) With(ctx context.Context) *Context
- type Func
- type Invoker
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrGracePeriodExpired = errors.New("grace period expired")
ErrGracePeriodExpired will be returned from context.Cause when the Context has been stopped, but the goroutines have not exited.
var ErrStopped = errors.New("stopped")
ErrStopped will be returned from context.Cause when the Context has been stopped.
Functions ¶
func Harden ¶ added in v1.2.0
Harden adapts the soft-stop behaviors of a stopper into a context.Context. This can be used whenever it is necessary to call other APIs that should be made aware of the soft-stop condition.
The returned context has the following behaviors:
- The context.Context.Done method returns Context.Stopping.
- The context.Context.Err method returns ErrStopped if the context has been stopped. Otherwise, it returns Context.Err.
- All other interface methods delegate to the provided stopper.
Example ¶
This shows how the soft-stop behavior can be propagated to other APIs via the context.Context interface.
package main
import (
"context"
"fmt"
"vawter.tech/stopper"
)
func main() {
soft := stopper.WithContext(context.Background())
soft.Go(func(ctx *stopper.Context) error {
hard := stopper.Harden(ctx)
// This is a stand-in for any call to an API that accepts a
// context.Context.
<-hard.Done()
fmt.Println("Done")
return nil
})
soft.Stop(0)
if err := soft.Wait(); err != nil {
panic(err)
}
}
Output: Done
func HardenFrom ¶ added in v1.2.0
HardenFrom returns a hardened context for any input context.
func IsStopping ¶
IsStopping is a convenience method to determine if a stopper is associated with a Context and if work should be stopped.
func StopOnReceive ¶ added in v1.0.1
StopOnReceive will gracefully stop the Context when a value is received from the channel or if the channel is closed.
This can be used, for example, with os/signal.Notify.
Example (Interrupt) ¶
package main
import (
"context"
"os"
"os/signal"
"time"
"vawter.tech/stopper"
)
func main() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
ctx := stopper.WithContext(context.Background())
stopper.StopOnReceive(ctx, time.Second, signals)
ctx.Go(func(ctx *stopper.Context) error {
// Do other work
return nil
})
if err := ctx.Wait(); err != nil {
os.Exit(1)
}
os.Exit(0)
}
Types ¶
type Adaptable ¶ added in v1.2.0
type Adaptable interface {
func() | func() error |
func(context.Context) | func(context.Context) error |
func(*Context) | Func
}
Adaptable is the set of function signatures accepted by Fn.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
A Context is conceptually similar to an [errgroup.Group] in that it manages a context.Context whose lifecycle is associated with some number of goroutines. Rather than canceling the associated context when a goroutine returns an error, it cancels the context after the Stop method is called and all associated goroutines have all exited.
As an API convenience, the Context type implements context.Context so that it fits into idiomatic context-plumbing. The From function can be used to retrieve a Context from any context.Context.
Example (Nested) ¶
This example shows that contexts may be nested. Stop signals will propagate from enclosing to inner contexts, while the Len() and Wait() methods are aware of child contexts.
package main
import (
"context"
"fmt"
"time"
"vawter.tech/stopper"
)
func main() {
outer := stopper.WithContext(context.Background())
middle := stopper.WithContext(outer)
inner := stopper.WithContext(middle)
middle.Go(func(ctx *stopper.Context) error {
<-ctx.Stopping()
return nil
})
inner.Go(func(ctx *stopper.Context) error {
<-ctx.Stopping()
return nil
})
fmt.Println("outer", outer.Len())
fmt.Println("middle", middle.Len())
fmt.Println("inner", inner.Len())
// Stopping a parent context stops the child contexts.
outer.Stop(time.Second)
// Wait for all nested tasks.
if err := outer.Wait(); err != nil {
panic(err)
}
fmt.Println("outer", outer.Len())
}
Output: outer 2 middle 2 inner 1 outer 0
Example (Ticker) ¶
This example shows how a background task that should execute on a regular basis may be implemented.
package main
import (
"context"
"fmt"
"time"
"vawter.tech/stopper"
)
func main() {
ctx := stopper.WithContext(context.Background())
ctx.Go(func(ctx *stopper.Context) error {
for {
// Do some background task.
select {
case <-time.After(time.Second):
// Loop around.
case <-ctx.Stopping():
// This channel closes when Stop() is called. The
// context is not yet cancelled at this point.
return nil
case <-ctx.Done():
// This is a hard-stop condition because either the
// underlying context.Context was canceled or the task
// has outlived its graceful shutdown time.
return ctx.Err()
}
}
})
// Do other things.
fmt.Println("task count:", ctx.Len())
// Calling Stop() makes the Stopping channel close, allowing
// processes one second before the context is hard-cancelled.
ctx.Stop(time.Second)
// Callers can wait for all tasks to finish, similar to an ErrGroup.
if err := ctx.Wait(); err != nil {
panic(err)
}
fmt.Println("task count:", ctx.Len())
}
Output: task count: 1 task count: 0
func Background ¶
func Background() *Context
Background is analogous to context.Background. It returns a Context which cannot be stopped or canceled, but which is otherwise functional.
func From ¶
From returns a pre-existing Context from the Context chain. Use WithContext to construct a new Context.
If the chain is not associated with a Context, the Background instance will be returned.
func WithContext ¶
WithContext creates a new Context whose work will be immediately canceled when the parent context is canceled. If the provided context is already managed by a Context, a call to the enclosing Context.Stop method will also trigger a call to Stop in the newly-constructed Context.
func WithInvoker ¶ added in v1.1.0
WithInvoker is equivalent to WithContext, except that the given Invoker will be used to execute the functions passed to Context.Call and Context.Go. The Invoker of a newly-defined context will be composed with the Invoker defined in a parent context, if any.
Example (ObserveLifecycle) ¶
This shows the sequence of callbacks when nested contexts have Invokers defined. Note that the setup phase is bottom-up, while execution is top-down.
package main
import (
"context"
"fmt"
"time"
"vawter.tech/stopper"
)
func main() {
outer := stopper.WithInvoker(context.Background(),
func(fn stopper.Func) stopper.Func {
fmt.Println("outer setting up")
return func(ctx *stopper.Context) error {
fmt.Println("outer start")
defer fmt.Println("outer end")
return fn(ctx)
}
})
middle := stopper.WithInvoker(outer,
func(fn stopper.Func) stopper.Func {
fmt.Println("middle setting up")
return func(ctx *stopper.Context) error {
fmt.Println("middle start")
defer fmt.Println("middle end")
return fn(ctx)
}
})
inner := stopper.WithContext(middle)
inner.Go(func(ctx *stopper.Context) error {
fmt.Println("here")
return nil
})
outer.Stop(time.Second)
if err := outer.Wait(); err != nil {
panic(err)
}
}
Output: middle setting up outer setting up outer start middle start here middle end outer end
func (*Context) Call ¶
Call executes the given function within the current goroutine and monitors its lifecycle. That is, both Call and Wait will block until the function has returned.
Call returns any error from the function with no other side effects. Unlike the Go method, Call does not stop the Context if the function returns an error. If the Context has already been stopped, ErrStopped will be returned.
The function passed to Call should prefer the Context.Stopping channel to return instead of depending on Context.Done. This allows a soft-stop, rather than waiting for the grace period to expire when Context.Stop is called.
See Fn to create function signature adaptors.
Example (HttpServer) ¶
An example showing the use of stopper.Context.Call when a goroutine is already allocated by some other API. In this case, the HTTP server creates a goroutine per request, but we still want to be able to interact with a stopper hierarchy.
package main
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"os"
"os/signal"
"strings"
"time"
"vawter.tech/stopper"
)
func main() {
ctx := stopper.WithContext(context.Background())
// Respond to interrupts with a 30-second grace period.
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
stopper.StopOnReceive(ctx, 30*time.Second, ch)
// The HTTP server creates goroutines for us and has its own expectations
// around goroutine/request lifecycles.
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// We use With() here so that a caller hangup can immediately stop any
// work. This is optional, and might not be desirable in all
// circumstances.
err := ctx.With(r.Context()).Call(func(ctx *stopper.Context) error {
data, err := io.ReadAll(r.Body)
if err != nil {
return err
}
fmt.Println(string(data))
w.WriteHeader(http.StatusAccepted)
return nil
})
if err == nil {
return
}
// If Stop() has been called, Call() will immediately return ErrStopped.
// We can respond to the client (or load-balancer) saying that this
// instance of the server cannot fulfill the request.
if errors.Is(err, stopper.ErrStopped) {
w.Header().Add("Retry-After", "0")
w.WriteHeader(http.StatusServiceUnavailable)
return
}
slog.ErrorContext(ctx, "handler error", "error", err)
w.WriteHeader(http.StatusInternalServerError)
}))
// A task to make an HTTP request.
ctx.Go(func(ctx *stopper.Context) error {
_, err := svr.Client().Post(svr.URL, "text/plain", strings.NewReader("Hello World!"))
if err != nil {
// Returning an error here will make the call to Wait() below fail
// out. This makes sense for an example, but production code should
// handle the error in a reasonable way and return nil.
return err
}
// Simulate an OS shutdown.
ch <- os.Interrupt
return nil
})
if err := ctx.Wait(); err != nil {
slog.ErrorContext(ctx, "internal task error", "error", err)
}
}
Output: Hello World!
func (*Context) Deadline ¶
Deadline implements context.Context.
func (*Context) Defer ¶
func (c *Context) Defer(fn func())
Defer registers a callback that will be executed after the Context.Done channel is closed. This method can be used to clean up resources that are used by goroutines associated with the Context (e.g. closing database connections). The Context will already have been canceled by the time the callback is run, so its behaviors should be associated with context.Background or similar. Callbacks will be executed in a LIFO manner. If the Context has already stopped, the callback will be executed immediately.
Calling this method on the Background context will panic, since that context can never be cancelled.
Example ¶
package main
import (
"context"
"fmt"
"time"
"vawter.tech/stopper"
)
func main() {
ctx := stopper.WithContext(context.Background())
// Deferred functions are executed in reverse order.
ctx.Defer(func() {
fmt.Println("defer 0")
})
ctx.Defer(func() {
fmt.Println("defer 1")
})
// This will run in a separate goroutine and then stop the context.
ctx.Go(func(ctx *stopper.Context) error {
fmt.Println("task")
ctx.Stop(time.Second)
return nil
})
// Wait for all tasks, including deferred callbacks to be complete.
if err := ctx.Wait(); err != nil {
fmt.Println(err)
}
fmt.Println("finished")
}
Output: task defer 1 defer 0 finished
func (*Context) Done ¶
func (c *Context) Done() <-chan struct{}
Done implements context.Context. The channel that is returned will be closed when Stop has been called and all associated goroutines have exited. The returned channel will be closed immediately if the parent context (passed to WithContext) is canceled. Functions passed to Context.Go should prefer Context.Stopping instead.
func (*Context) Err ¶
Err implements context.Context. When the return value for this is context.ErrCanceled, context.Cause will return ErrStopped if the context cancellation resulted from a call to Stop.
func (*Context) Go ¶
Go spawns a new goroutine to execute the given function and monitors its lifecycle.
If the function returns an error, the Stop method will be called. The returned error will be available from Wait once the remaining goroutines have exited.
This method will not execute the function and return false if Stop has already been called.
The function passed to Go should prefer the Context.Stopping channel to return instead of depending on Context.Done. This allows a soft-stop, rather than waiting for the grace period to expire when Context.Stop is called.
See Fn to create function signature adaptors.
func (*Context) IsStopping ¶
IsStopping returns true once [Stop] has been called. See also [Stopping] for a notification-based API.
func (*Context) Len ¶
Len returns the number of tasks being tracked by the Context. This includes tasks started by derived Contexts.
func (*Context) Stop ¶
Stop begins a graceful shutdown of the Context. When this method is called, the Stopping channel will be closed. Once all goroutines started by Go have exited, the associated Context will be cancelled, thus closing the Done channel. If the gracePeriod is non-zero, the context will be forcefully cancelled if the goroutines have not exited within the given timeframe.
func (*Context) StopOnIdle ¶ added in v1.2.0
func (c *Context) StopOnIdle()
StopOnIdle causes the Context to automatically call Context.Stop once Context.Len equals zero. This is useful for stoppers that represent a finite pool of tasks that will eventually conclude. All methods on the Context will continue to operate in their usual manners after calling this method. Calling this method on the Background Context has no effect. Calling this method on an idle Context behaves as if Context.Stop had been called instead.
Example ¶
package main
import (
"context"
"fmt"
"sync/atomic"
"vawter.tech/stopper"
)
func main() {
ctx := stopper.WithContext(context.Background())
var nestedDidAccept atomic.Bool
ctx.Go(func(ctx *stopper.Context) error {
// It's still valid to create additional tasks or additional
// nested stoppers.
ok := ctx.Go(func(ctx *stopper.Context) error {
// Do nested tasks...
return nil
})
nestedDidAccept.Store(ok)
return nil
})
// StopOnIdle shouldn't be called until all parent tasks have been
// started.
ctx.StopOnIdle()
// Wait doesn't return until all tasks are complete.
err := ctx.Wait()
fmt.Printf("OK: %t %t\n", err == nil, nestedDidAccept.Load())
}
Output: OK: true true
func (*Context) Stopping ¶
func (c *Context) Stopping() <-chan struct{}
Stopping returns a channel that is closed when a graceful shutdown has been requested or when a parent context has been stopped.
func (*Context) Wait ¶
Wait will block until Stop has been called and all associated goroutines have exited or the parent context has been cancelled. This method will return the first, non-nil error from any of the callbacks passed to Go. If Wait is called on the Background instance, it will immediately return nil.
func (*Context) With ¶ added in v1.0.1
With returns a Context that is otherwise equivalent to the receiver, save that all context.Context behavior is delegated to the new context. This enables interaction with the runtime/trace package or other libraries that generate custom context.Context instances.
Since the With method does not create a new, nested stopper hierarchy, it is less expensive than calling WithContext in tracing scenarios.
Example (Tracing) ¶
This shows how the runtime/trace package, or any other package that creates custom context.Context instances, can be interoperated with.
package main
import (
"context"
"fmt"
"os"
"runtime/trace"
"time"
"vawter.tech/stopper"
)
func main() {
f, err := os.OpenFile("trace.out", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
panic(err)
}
defer func() {
if err := f.Close(); err != nil {
panic(err)
}
fmt.Println("trace written to", f.Name())
}()
if err := trace.Start(f); err != nil {
panic(err)
}
defer trace.Stop()
rootCtx, rootTask := trace.NewTask(context.Background(), "root task")
defer rootTask.End()
ctx := stopper.WithContext(rootCtx)
defer trace.StartRegion(ctx, "root region").End()
midCtx, midTask := trace.NewTask(ctx, "mid task")
ctx.With(midCtx).Go(func(ctx *stopper.Context) error {
defer midTask.End()
defer trace.StartRegion(ctx, "mid region").End()
trace.Log(ctx, "message", "middle task is here")
innerCtx, innerTask := trace.NewTask(ctx, "inner task")
ctx.With(innerCtx).Go(func(ctx *stopper.Context) error {
defer innerTask.End()
defer trace.StartRegion(ctx, "inner region").End()
trace.Log(ctx, "message", "inner task is here")
ctx.Stop(time.Second)
return nil
})
return nil
})
if err := ctx.Wait(); err != nil {
panic(err)
}
}
Output: trace written to trace.out
type Func ¶ added in v1.1.0
Func is a convenience type alias for the signature of functions invoked by a Context.
func Fn ¶ added in v1.2.0
Fn adapts various function signatures to be compatible with Context.
Example ¶
package main
import (
"context"
"vawter.tech/stopper"
)
type Thing struct{}
func (t *Thing) DoSomething() error { return nil }
func main() {
ctx := stopper.WithContext(context.Background())
t := &Thing{}
ctx.Go(stopper.Fn(t.DoSomething))
}
type Invoker ¶ added in v1.1.0
An Invoker arranges to call the provided Func. See WithInvoker for additional details.
