rungroup

package module
v0.0.0-...-c502711 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2026 License: MIT Imports: 7 Imported by: 0

README

RunGroup

Go Report Card Go Reference

RunGroup is a concurrency primitive for managing long-lived background services in Go. It supervises goroutines independently, handling restarts, panic recovery, backoff, and clean shutdown. It blocks until all services have exited before returning a joined aggregate of any errors.

  • Restart policies — always, on failure, or never; configured per service
  • Backoff — custom strategy with optional hard restart limits
  • Panic recovery — panics are caught and converted to errors, stack trace included
  • Sentinel errors — signal intentional halts or trigger group-wide shutdown
  • Shutdown timeouts — per-service and global, with graceful goroutine abandonment
  • Events — observe restarts, halts, and timeouts via callbacks
  • Composable*Group implements Service, enabling supervision trees
  • Zero dependencies

Install

go get lowbit.dev/rungroup

Usage

g := rungroup.New()

g.Add(
    rungroup.ServiceFunc(func(ctx context.Context) error {
        return server.Serve(ctx)
    }),
    rungroup.WithName("http-server"),
    rungroup.WithRestartPolicy(rungroup.RestartOnFailure),
    rungroup.WithBackoff(func(attempt int) time.Duration {
        return time.Duration(attempt) * 500 * time.Millisecond
    }),
)

if err := g.Run(ctx); err != nil {
    log.Fatal(err)
}

Restart Policies

Policy On nil On error
RestartAlways (default) restart restart
RestartOnFailure halt restart
RestartNever halt halt

WithBackoff accepts func(attempt int) time.Duration. Return a negative duration to permanently stop the service (ErrRestartLimitExceeded). Use WithStabilityWindow to reset the attempt counter after a service has run continuously for a given duration.

Sentinel Errors

Error Effect
ErrDoNotRestart Permanently halts the service regardless of policy.
ErrShutdownAll Cancels the group context and shuts down all services.

Both are matched via errors.Is traversal.

Shutdown Timeouts

g := rungroup.New(rungroup.WithShutdownTimeout(10 * time.Second))

g.Add(svc, rungroup.WithServiceShutdownTimeout(2 * time.Second))

Per-service and global timeouts race; whichever fires first wins. Exceeded services emit EventShutdownTimeout and are abandoned.

Nested Groups

*Group implements Service. Add a group to another to build a supervision tree. By default ErrShutdownAll propagates upward; use WithIsolateShutdown to absorb it at a boundary.

outer.Add(inner,
    rungroup.WithName("inner-group"),
    rungroup.WithIsolateShutdown(),
)

Events

rungroup.New(rungroup.WithEventHandler(func(e rungroup.Event) {
    // e.Type: EventServiceRestarting | EventServiceHalted | EventShutdownTimeout
    // e.ServiceName, e.Attempt, e.Delay, e.Err
}))

Per-service handlers can be registered with WithServiceEventHandler and fire before the group-level handler.

Interval Service

IntervalService runs a handler on a fixed schedule. The handler is called immediately on start, then on every tick. A non-nil return exits Run and hands control back to the group's restart policy.

g.Add(
    rungroup.NewIntervalService(30*time.Second, func(ctx context.Context) error {
        return cache.Refresh(ctx)
    }),
    rungroup.WithName("cache-refresh"),
    rungroup.WithRestartPolicy(rungroup.RestartOnFailure),
)

Options

ServiceWithName, WithRestartPolicy, WithBackoff, WithStabilityWindow, WithServiceShutdownTimeout, WithIsolateShutdown, WithServiceEventHandler

GroupWithShutdownTimeout, WithEventHandler

License

MIT

Documentation

Overview

rungroup manages the lifecycle of concurrent background services.

Services are supervised independently: panics are recovered, errors are aggregated, and each service carries its own restart policy and backoff.

A *Group satisfies Service, so rungroups can be nested. ErrShutdownAll propagates to the parent by default; use WithIsolateShutdown to absorb it.

Example

Example demonstrates the minimal setup: register a service and run until the parent context is cancelled.

package main

import (
	"context"
	"fmt"

	"lowbit.dev/rungroup"
)

func main() {
	g := rungroup.New()
	g.Add(rungroup.ServiceFunc(func(ctx context.Context) error {
		<-ctx.Done()
		return nil
	}), rungroup.WithName("worker"))

	ctx, cancel := context.WithCancel(context.Background())
	cancel() // stop immediately

	if err := g.Run(ctx); err != nil {
		fmt.Println("error:", err)
	} else {
		fmt.Println("ok")
	}
}
Output:
ok

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrDoNotRestart is a sentinel error. If a Service's Run method returns this
	// error, or an error that wraps it, the rungroup will not restart it,
	// regardless of the configured restart policy.
	ErrDoNotRestart = errors.New("do not restart")

	// ErrAlreadyRunning is returned if Run is called while the rungroup is
	// already active.
	ErrAlreadyRunning = errors.New("already running")

	// ErrRestartLimitExceeded is wrapped when a service is permanently abandoned
	// because its backoff function returned a negative duration.
	ErrRestartLimitExceeded = errors.New("restart limit exceeded")

	// ErrIntentionalHalt is wrapped when a service explicitly returns
	// ErrDoNotRestart, causing the rungroup to permanently abandon it.
	ErrIntentionalHalt = errors.New("service halted intentionally")

	// ErrPolicyHalt is wrapped when a service exits with an error and is not
	// restarted because its configured RestartPolicy prevents it.
	ErrPolicyHalt = errors.New("halted by restart policy")

	// ErrShutdownTimeout is wrapped when a service did not exit within its
	// configured shutdown timeout after the rungroup context was cancelled.
	ErrShutdownTimeout = errors.New("shutdown timeout exceeded")

	// ErrServicePanic is wrapped in the error returned when a managed service
	// panics during execution. The underlying panic value and stack trace are
	// included in the formatted error string.
	ErrServicePanic = errors.New("service panicked")

	// ErrShutdownAll is a sentinel error. If a Service's Run method returns this
	// error, or an error that wraps it, the rungroup cancels its internal
	// context and initiates a graceful shutdown of all other running services.
	// The error is recorded and returned by Run. Use this when a service detects
	// an application-wide failure state that makes continued operation impossible.
	//
	// When used in a nested rungroup, the shutdown signal propagates to the
	// parent by default. Use [WithIsolateShutdown] on the child rungroup's
	// service entry to absorb the signal at that boundary.
	ErrShutdownAll = errors.New("shutdown all services")
)

Functions

This section is empty.

Types

type Event

type Event struct {
	Type        EventType
	ServiceName string
	Attempt     int           // populated for EventServiceRestarting
	Delay       time.Duration // populated for EventServiceRestarting
	Err         error         // the triggering error; nil for a clean halt
}

Event is the value passed to event handlers registered via WithEventHandler or WithServiceEventHandler.

type EventType

type EventType int

EventType identifies the kind of event emitted by the rungroup.

const (
	// EventServiceRestarting is emitted when a service has failed and will be
	// restarted after a backoff delay. Err, Attempt, and Delay are populated.
	EventServiceRestarting EventType = iota

	// EventServiceHalted is emitted when a service has permanently stopped for
	// any reason (policy, sentinel error, backoff limit, or clean exit).
	// Err is nil for a clean exit.
	EventServiceHalted

	// EventShutdownTimeout is emitted when a service did not exit within its
	// configured shutdown timeout. ServiceName identifies the hung service.
	EventShutdownTimeout
)

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group manages a group of concurrent services.

func New

func New(opts ...GroupOption) *Group

New creates a new Group.

Example

ExampleNew shows creating a Group with a global shutdown timeout option.

package main

import (
	"context"
	"fmt"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	// Zero services: Run returns nil without blocking.
	g := rungroup.New(rungroup.WithShutdownTimeout(5 * time.Second))
	fmt.Println(g.Run(context.Background()))
}
Output:
<nil>

func (*Group) Add

func (s *Group) Add(svc Service, opts ...Option) error

Add registers a service to be managed. All services must be added before calling Run. Calling Add after Run has started returns ErrAlreadyRunning.

Example

ExampleGroup_Add shows that Add returns nil for a valid service registration.

package main

import (
	"context"
	"fmt"

	"lowbit.dev/rungroup"
)

func main() {
	g := rungroup.New()
	err := g.Add(
		rungroup.ServiceFunc(func(ctx context.Context) error {
			<-ctx.Done()
			return nil
		}),
		rungroup.WithName("worker"),
		rungroup.WithRestartPolicy(rungroup.RestartNever),
	)
	fmt.Println(err)
}
Output:
<nil>

func (*Group) Run

func (s *Group) Run(ctx context.Context) error

Run starts all registered services concurrently. It blocks until the provided context is cancelled AND all services have exited (or their individual shutdown timeouts are exceeded).

If zero services are registered, Run returns nil immediately. It returns an aggregation of any terminal errors produced by the services.

Example (ZeroServices)

ExampleGroup_Run_zeroServices shows that Run returns nil immediately when no services have been registered.

package main

import (
	"context"
	"fmt"

	"lowbit.dev/rungroup"
)

func main() {
	g := rungroup.New()
	fmt.Println(g.Run(context.Background()))
}
Output:
<nil>

type GroupOption

type GroupOption func(*Group)

GroupOption configures the rungroup instance itself.

func WithEventHandler

func WithEventHandler(fn func(Event)) GroupOption

WithEventHandler registers a rungroup-level event handler that fires for all events from all services. It runs after any service-level handler registered via WithServiceEventHandler.

Example

ExampleWithEventHandler shows receiving lifecycle events for every service in the group.

package main

import (
	"context"
	"fmt"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	g := rungroup.New(
		rungroup.WithEventHandler(func(e rungroup.Event) {
			switch e.Type {
			case rungroup.EventServiceRestarting:
				fmt.Printf("%s: restarting (attempt %d)\n", e.ServiceName, e.Attempt)
			case rungroup.EventServiceHalted:
				fmt.Printf("%s: halted\n", e.ServiceName)
			}
		}),
	)
	g.Add(
		rungroup.ServiceFunc(func(_ context.Context) error {
			return rungroup.ErrDoNotRestart
		}),
		rungroup.WithName("worker"),
	)

	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	g.Run(ctx) //nolint:errcheck
}
Output:
worker: halted

func WithShutdownTimeout

func WithShutdownTimeout(d time.Duration) GroupOption

WithShutdownTimeout sets the maximum duration the rungroup will wait for all services to exit after the Run context is cancelled. Acts as a global ceiling; per-service timeouts set via WithServiceShutdownTimeout cannot exceed this value.

Example

ExampleWithShutdownTimeout shows that a service which does not exit within the global deadline is abandoned; Run wraps ErrShutdownTimeout.

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	g := rungroup.New(rungroup.WithShutdownTimeout(50 * time.Millisecond))
	started := make(chan struct{})
	g.Add(rungroup.ServiceFunc(func(_ context.Context) error {
		close(started)
		time.Sleep(10 * time.Second) // simulate a stuck service
		return nil
	}), rungroup.WithName("stuck"))

	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		<-started
		cancel()
	}()

	fmt.Println(errors.Is(g.Run(ctx), rungroup.ErrShutdownTimeout))
}
Output:
true

type IntervalService

type IntervalService struct {
	// contains filtered or unexported fields
}

IntervalService is a Service that calls a handler on a fixed interval. The handler is called immediately on the first tick and then repeatedly every interval until the context is cancelled. If the handler returns a non-nil error, Run returns that error immediately.

func NewIntervalService

func NewIntervalService(interval time.Duration, handler func(ctx context.Context) error) *IntervalService

NewIntervalService creates an IntervalService that calls handler every interval. The handler signature matches ServiceFunc.

Example

ExampleNewIntervalService shows an IntervalService that stops itself by returning ErrDoNotRestart from the handler.

package main

import (
	"context"
	"fmt"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	var calls int
	svc := rungroup.NewIntervalService(50*time.Millisecond, func(_ context.Context) error {
		calls++
		if calls >= 3 {
			return rungroup.ErrDoNotRestart
		}
		return nil
	})

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	svc.Run(ctx) //nolint:errcheck
	fmt.Println(calls)
}
Output:
3

func (*IntervalService) Run

func (s *IntervalService) Run(ctx context.Context) error

Run calls the handler immediately, then on every interval tick, until ctx is cancelled or the handler returns a non-nil error.

Example

ExampleIntervalService_Run shows that an already-cancelled context causes the handler to be invoked exactly once before Run returns.

package main

import (
	"context"
	"fmt"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	var ticks int
	svc := rungroup.NewIntervalService(time.Second, func(_ context.Context) error {
		ticks++
		return nil
	})

	ctx, cancel := context.WithCancel(context.Background())
	cancel()

	svc.Run(ctx) //nolint:errcheck
	fmt.Println(ticks)
}
Output:
1

type Option

type Option func(*managedService)

Option configures how a Service is supervised.

func WithBackoff

func WithBackoff(fn func(attempt int) time.Duration) Option

WithBackoff provides a function to calculate the delay before a restart, based on the number of consecutive restarts.

If the function returns a negative duration, the rungroup treats it as a hard limit and permanently stops the service.

Example

ExampleWithBackoff shows a backoff function that enforces a maximum retry count; Run wraps ErrRestartLimitExceeded once the limit is reached.

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	var calls int
	g := rungroup.New()
	g.Add(
		rungroup.ServiceFunc(func(_ context.Context) error {
			calls++
			return errors.New("failing")
		}),
		rungroup.WithRestartPolicy(rungroup.RestartAlways),
		rungroup.WithBackoff(func(attempt int) time.Duration {
			if attempt >= 3 {
				return -1 // negative → give up
			}
			return 0
		}),
	)

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	err := g.Run(ctx)
	fmt.Println(errors.Is(err, rungroup.ErrRestartLimitExceeded))
	fmt.Println(calls)
}
Output:
true
3

func WithIsolateShutdown

func WithIsolateShutdown() Option

WithIsolateShutdown prevents ErrShutdownAll from propagating to the parent rungroup when this service (typically a nested *Group) triggers one. The shutdown is absorbed at this service boundary and treated as a normal policy halt.

Example

ExampleWithIsolateShutdown shows that ErrShutdownAll from a nested Group is absorbed at the service boundary; the parent only sees ErrPolicyHalt.

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	inner := rungroup.New()
	inner.Add(
		rungroup.ServiceFunc(func(_ context.Context) error {
			return rungroup.ErrShutdownAll
		}),
		rungroup.WithRestartPolicy(rungroup.RestartNever),
	)

	outer := rungroup.New()
	outer.Add(inner,
		rungroup.WithName("inner"),
		rungroup.WithIsolateShutdown(),
	)

	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	err := outer.Run(ctx)
	fmt.Println(errors.Is(err, rungroup.ErrShutdownAll)) // absorbed at boundary
	fmt.Println(errors.Is(err, rungroup.ErrPolicyHalt))  // surfaced instead
}
Output:
false
true

func WithName

func WithName(name string) Option

WithName assigns an identity to the service. Highly recommended for observability.

Example

ExampleWithName shows that WithName labels events so restarts and halts are identifiable in the event stream.

package main

import (
	"context"
	"fmt"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	var halted string
	g := rungroup.New(rungroup.WithEventHandler(func(e rungroup.Event) {
		if e.Type == rungroup.EventServiceHalted {
			halted = e.ServiceName
		}
	}))
	g.Add(
		rungroup.ServiceFunc(func(_ context.Context) error {
			return rungroup.ErrDoNotRestart
		}),
		rungroup.WithName("database"),
	)

	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	g.Run(ctx) //nolint:errcheck
	fmt.Println(halted)
}
Output:
database

func WithRestartPolicy

func WithRestartPolicy(p RestartPolicy) Option

WithRestartPolicy sets the restart behaviour. Defaults to RestartAlways.

Example (RestartAlways)

ExampleWithRestartPolicy_restartAlways shows that RestartAlways restarts the service even after a clean (nil) return.

package main

import (
	"context"
	"fmt"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	var calls int
	g := rungroup.New()
	g.Add(
		rungroup.ServiceFunc(func(_ context.Context) error {
			calls++
			if calls >= 3 {
				return rungroup.ErrDoNotRestart
			}
			return nil // clean exit: RestartAlways restarts anyway
		}),
		rungroup.WithRestartPolicy(rungroup.RestartAlways),
		rungroup.WithBackoff(func(_ int) time.Duration { return 0 }),
	)

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	g.Run(ctx) //nolint:errcheck
	fmt.Println(calls)
}
Output:
3
Example (RestartNever)

ExampleWithRestartPolicy_restartNever shows that a service is not restarted after an error; Run wraps ErrPolicyHalt in the returned error.

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	g := rungroup.New()
	g.Add(
		rungroup.ServiceFunc(func(_ context.Context) error {
			return errors.New("fatal")
		}),
		rungroup.WithRestartPolicy(rungroup.RestartNever),
		rungroup.WithName("worker"),
	)

	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	fmt.Println(errors.Is(g.Run(ctx), rungroup.ErrPolicyHalt))
}
Output:
true
Example (RestartOnFailure)

ExampleWithRestartPolicy_restartOnFailure shows that a service is restarted only after a non-nil return; a clean nil exit stops the loop.

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	var calls int
	g := rungroup.New()
	g.Add(
		rungroup.ServiceFunc(func(_ context.Context) error {
			calls++
			if calls < 3 {
				return errors.New("transient")
			}
			return rungroup.ErrDoNotRestart
		}),
		rungroup.WithRestartPolicy(rungroup.RestartOnFailure),
		rungroup.WithBackoff(func(_ int) time.Duration { return 0 }),
	)

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	g.Run(ctx) //nolint:errcheck
	fmt.Println(calls)
}
Output:
3

func WithServiceEventHandler

func WithServiceEventHandler(fn func(Event)) Option

WithServiceEventHandler registers an event handler that fires only for events originating from this service. It runs before the rungroup-level handler registered via WithEventHandler.

Example

ExampleWithServiceEventHandler shows a per-service event handler that fires independently of any group-level handler.

package main

import (
	"context"
	"fmt"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	g := rungroup.New()
	g.Add(
		rungroup.ServiceFunc(func(_ context.Context) error {
			return rungroup.ErrDoNotRestart
		}),
		rungroup.WithName("cache"),
		rungroup.WithServiceEventHandler(func(e rungroup.Event) {
			if e.Type == rungroup.EventServiceHalted {
				fmt.Printf("%s halted, has err: %v\n", e.ServiceName, e.Err != nil)
			}
		}),
	)

	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	g.Run(ctx) //nolint:errcheck
}
Output:
cache halted, has err: true

func WithServiceShutdownTimeout

func WithServiceShutdownTimeout(d time.Duration) Option

WithServiceShutdownTimeout sets the maximum duration the rungroup will wait for this specific service to exit after the rungroup context is cancelled. If exceeded, an EventShutdownTimeout event is emitted and the goroutine is abandoned.

This takes precedence over the rungroup-level WithShutdownTimeout for this service. If both are set, whichever expires first wins.

Example

ExampleWithServiceShutdownTimeout shows a per-service deadline that only affects the service it is attached to.

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	g := rungroup.New()
	started := make(chan struct{})
	g.Add(
		rungroup.ServiceFunc(func(_ context.Context) error {
			close(started)
			time.Sleep(10 * time.Second)
			return nil
		}),
		rungroup.WithName("stuck"),
		rungroup.WithServiceShutdownTimeout(50*time.Millisecond),
	)

	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		<-started
		cancel()
	}()

	fmt.Println(errors.Is(g.Run(ctx), rungroup.ErrShutdownTimeout))
}
Output:
true

func WithStabilityWindow

func WithStabilityWindow(d time.Duration) Option

WithStabilityWindow sets the minimum duration a service must run continuously before its restart attempt counter is reset to zero. This prevents a service that runs stably for a long time from being penalised with a large backoff delay after an eventual crash.

Example

ExampleWithStabilityWindow shows configuring a window that resets the restart attempt counter for services that run stably for a long period. This example is compiled but not executed (no Output comment) because the behaviour is timing-dependent.

package main

import (
	"context"
	"time"

	"lowbit.dev/rungroup"
)

func main() {
	g := rungroup.New()
	g.Add(
		rungroup.ServiceFunc(func(ctx context.Context) error {
			<-ctx.Done()
			return nil
		}),
		rungroup.WithRestartPolicy(rungroup.RestartAlways),
		// Reset restart counter after the service has run for 30 s without crashing.
		rungroup.WithStabilityWindow(30*time.Second),
		// Exponential backoff; give up after 5 consecutive crashes.
		rungroup.WithBackoff(func(attempt int) time.Duration {
			if attempt > 5 {
				return -1
			}
			return time.Duration(attempt) * 500 * time.Millisecond
		}),
	)
}

type RestartPolicy

type RestartPolicy int

RestartPolicy defines when a service should be restarted after exiting.

const (
	// RestartAlways restarts the service unconditionally, even if it returns nil.
	RestartAlways RestartPolicy = iota
	// RestartOnFailure restarts the service only if it returns a non-nil error.
	RestartOnFailure
	// RestartNever means the service is never restarted, regardless of its
	// return value.
	RestartNever
)

type Service

type Service interface {
	// Run executes the service. It must respect ctx cancellation for clean
	// shutdowns. If it panics, the rungroup catches it, converts the panic to
	// an error, and evaluates it against the restart policy.
	Run(ctx context.Context) error
}

Service represents a managed background operation.

type ServiceFunc

type ServiceFunc func(ctx context.Context) error

ServiceFunc is an adapter to allow the use of ordinary functions as rungroup Services. If f is a function with the appropriate signature, ServiceFunc(f) is a Service that calls f.

Example

ExampleServiceFunc shows the function adapter, which lets an ordinary function satisfy the Service interface.

package main

import (
	"context"
	"fmt"

	"lowbit.dev/rungroup"
)

func main() {
	svc := rungroup.ServiceFunc(func(ctx context.Context) error {
		<-ctx.Done()
		return nil
	})

	ctx, cancel := context.WithCancel(context.Background())
	cancel()

	fmt.Println(svc.Run(ctx))
}
Output:
<nil>

func (ServiceFunc) Run

func (f ServiceFunc) Run(ctx context.Context) error

Run calls f(ctx).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL