Documentation
¶
Overview ¶
rungroup manages the lifecycle of concurrent background services.
Services are supervised independently: panics are recovered, errors are aggregated, and each service carries its own restart policy and backoff.
A *Group satisfies Service, so rungroups can be nested. ErrShutdownAll propagates to the parent by default; use WithIsolateShutdown to absorb it.
Example ¶
Example demonstrates the minimal setup: register a service and run until the parent context is cancelled.
package main
import (
"context"
"fmt"
"lowbit.dev/rungroup"
)
func main() {
g := rungroup.New()
g.Add(rungroup.ServiceFunc(func(ctx context.Context) error {
<-ctx.Done()
return nil
}), rungroup.WithName("worker"))
ctx, cancel := context.WithCancel(context.Background())
cancel() // stop immediately
if err := g.Run(ctx); err != nil {
fmt.Println("error:", err)
} else {
fmt.Println("ok")
}
}
Output: ok
Index ¶
- Variables
- type Event
- type EventType
- type Group
- type GroupOption
- type IntervalService
- type Option
- func WithBackoff(fn func(attempt int) time.Duration) Option
- func WithIsolateShutdown() Option
- func WithName(name string) Option
- func WithRestartPolicy(p RestartPolicy) Option
- func WithServiceEventHandler(fn func(Event)) Option
- func WithServiceShutdownTimeout(d time.Duration) Option
- func WithStabilityWindow(d time.Duration) Option
- type RestartPolicy
- type Service
- type ServiceFunc
Examples ¶
- Package
- Group.Add
- Group.Run (ZeroServices)
- IntervalService.Run
- New
- NewIntervalService
- ServiceFunc
- WithBackoff
- WithEventHandler
- WithIsolateShutdown
- WithName
- WithRestartPolicy (RestartAlways)
- WithRestartPolicy (RestartNever)
- WithRestartPolicy (RestartOnFailure)
- WithServiceEventHandler
- WithServiceShutdownTimeout
- WithShutdownTimeout
- WithStabilityWindow
Constants ¶
This section is empty.
Variables ¶
var ( // ErrDoNotRestart is a sentinel error. If a Service's Run method returns this // error, or an error that wraps it, the rungroup will not restart it, // regardless of the configured restart policy. ErrDoNotRestart = errors.New("do not restart") // ErrAlreadyRunning is returned if Run is called while the rungroup is // already active. ErrAlreadyRunning = errors.New("already running") // ErrRestartLimitExceeded is wrapped when a service is permanently abandoned // because its backoff function returned a negative duration. ErrRestartLimitExceeded = errors.New("restart limit exceeded") // ErrIntentionalHalt is wrapped when a service explicitly returns // ErrDoNotRestart, causing the rungroup to permanently abandon it. ErrIntentionalHalt = errors.New("service halted intentionally") // ErrPolicyHalt is wrapped when a service exits with an error and is not // restarted because its configured RestartPolicy prevents it. ErrPolicyHalt = errors.New("halted by restart policy") // ErrShutdownTimeout is wrapped when a service did not exit within its // configured shutdown timeout after the rungroup context was cancelled. ErrShutdownTimeout = errors.New("shutdown timeout exceeded") // ErrServicePanic is wrapped in the error returned when a managed service // panics during execution. The underlying panic value and stack trace are // included in the formatted error string. ErrServicePanic = errors.New("service panicked") // ErrShutdownAll is a sentinel error. If a Service's Run method returns this // error, or an error that wraps it, the rungroup cancels its internal // context and initiates a graceful shutdown of all other running services. // The error is recorded and returned by Run. Use this when a service detects // an application-wide failure state that makes continued operation impossible. // // When used in a nested rungroup, the shutdown signal propagates to the // parent by default. Use [WithIsolateShutdown] on the child rungroup's // service entry to absorb the signal at that boundary. ErrShutdownAll = errors.New("shutdown all services") )
Functions ¶
This section is empty.
Types ¶
type Event ¶
type Event struct {
Type EventType
ServiceName string
Attempt int // populated for EventServiceRestarting
Delay time.Duration // populated for EventServiceRestarting
Err error // the triggering error; nil for a clean halt
}
Event is the value passed to event handlers registered via WithEventHandler or WithServiceEventHandler.
type EventType ¶
type EventType int
EventType identifies the kind of event emitted by the rungroup.
const ( // EventServiceRestarting is emitted when a service has failed and will be // restarted after a backoff delay. Err, Attempt, and Delay are populated. EventServiceRestarting EventType = iota // EventServiceHalted is emitted when a service has permanently stopped for // any reason (policy, sentinel error, backoff limit, or clean exit). // Err is nil for a clean exit. EventServiceHalted // EventShutdownTimeout is emitted when a service did not exit within its // configured shutdown timeout. ServiceName identifies the hung service. EventShutdownTimeout )
type Group ¶
type Group struct {
// contains filtered or unexported fields
}
Group manages a group of concurrent services.
func New ¶
func New(opts ...GroupOption) *Group
New creates a new Group.
Example ¶
ExampleNew shows creating a Group with a global shutdown timeout option.
package main
import (
"context"
"fmt"
"time"
"lowbit.dev/rungroup"
)
func main() {
// Zero services: Run returns nil without blocking.
g := rungroup.New(rungroup.WithShutdownTimeout(5 * time.Second))
fmt.Println(g.Run(context.Background()))
}
Output: <nil>
func (*Group) Add ¶
Add registers a service to be managed. All services must be added before calling Run. Calling Add after Run has started returns ErrAlreadyRunning.
Example ¶
ExampleGroup_Add shows that Add returns nil for a valid service registration.
package main
import (
"context"
"fmt"
"lowbit.dev/rungroup"
)
func main() {
g := rungroup.New()
err := g.Add(
rungroup.ServiceFunc(func(ctx context.Context) error {
<-ctx.Done()
return nil
}),
rungroup.WithName("worker"),
rungroup.WithRestartPolicy(rungroup.RestartNever),
)
fmt.Println(err)
}
Output: <nil>
func (*Group) Run ¶
Run starts all registered services concurrently. It blocks until the provided context is cancelled AND all services have exited (or their individual shutdown timeouts are exceeded).
If zero services are registered, Run returns nil immediately. It returns an aggregation of any terminal errors produced by the services.
Example (ZeroServices) ¶
ExampleGroup_Run_zeroServices shows that Run returns nil immediately when no services have been registered.
package main
import (
"context"
"fmt"
"lowbit.dev/rungroup"
)
func main() {
g := rungroup.New()
fmt.Println(g.Run(context.Background()))
}
Output: <nil>
type GroupOption ¶
type GroupOption func(*Group)
GroupOption configures the rungroup instance itself.
func WithEventHandler ¶
func WithEventHandler(fn func(Event)) GroupOption
WithEventHandler registers a rungroup-level event handler that fires for all events from all services. It runs after any service-level handler registered via WithServiceEventHandler.
Example ¶
ExampleWithEventHandler shows receiving lifecycle events for every service in the group.
package main
import (
"context"
"fmt"
"time"
"lowbit.dev/rungroup"
)
func main() {
g := rungroup.New(
rungroup.WithEventHandler(func(e rungroup.Event) {
switch e.Type {
case rungroup.EventServiceRestarting:
fmt.Printf("%s: restarting (attempt %d)\n", e.ServiceName, e.Attempt)
case rungroup.EventServiceHalted:
fmt.Printf("%s: halted\n", e.ServiceName)
}
}),
)
g.Add(
rungroup.ServiceFunc(func(_ context.Context) error {
return rungroup.ErrDoNotRestart
}),
rungroup.WithName("worker"),
)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
g.Run(ctx) //nolint:errcheck
}
Output: worker: halted
func WithShutdownTimeout ¶
func WithShutdownTimeout(d time.Duration) GroupOption
WithShutdownTimeout sets the maximum duration the rungroup will wait for all services to exit after the Run context is cancelled. Acts as a global ceiling; per-service timeouts set via WithServiceShutdownTimeout cannot exceed this value.
Example ¶
ExampleWithShutdownTimeout shows that a service which does not exit within the global deadline is abandoned; Run wraps ErrShutdownTimeout.
package main
import (
"context"
"errors"
"fmt"
"time"
"lowbit.dev/rungroup"
)
func main() {
g := rungroup.New(rungroup.WithShutdownTimeout(50 * time.Millisecond))
started := make(chan struct{})
g.Add(rungroup.ServiceFunc(func(_ context.Context) error {
close(started)
time.Sleep(10 * time.Second) // simulate a stuck service
return nil
}), rungroup.WithName("stuck"))
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-started
cancel()
}()
fmt.Println(errors.Is(g.Run(ctx), rungroup.ErrShutdownTimeout))
}
Output: true
type IntervalService ¶
type IntervalService struct {
// contains filtered or unexported fields
}
IntervalService is a Service that calls a handler on a fixed interval. The handler is called immediately on the first tick and then repeatedly every interval until the context is cancelled. If the handler returns a non-nil error, Run returns that error immediately.
func NewIntervalService ¶
func NewIntervalService(interval time.Duration, handler func(ctx context.Context) error) *IntervalService
NewIntervalService creates an IntervalService that calls handler every interval. The handler signature matches ServiceFunc.
Example ¶
ExampleNewIntervalService shows an IntervalService that stops itself by returning ErrDoNotRestart from the handler.
package main
import (
"context"
"fmt"
"time"
"lowbit.dev/rungroup"
)
func main() {
var calls int
svc := rungroup.NewIntervalService(50*time.Millisecond, func(_ context.Context) error {
calls++
if calls >= 3 {
return rungroup.ErrDoNotRestart
}
return nil
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
svc.Run(ctx) //nolint:errcheck
fmt.Println(calls)
}
Output: 3
func (*IntervalService) Run ¶
func (s *IntervalService) Run(ctx context.Context) error
Run calls the handler immediately, then on every interval tick, until ctx is cancelled or the handler returns a non-nil error.
Example ¶
ExampleIntervalService_Run shows that an already-cancelled context causes the handler to be invoked exactly once before Run returns.
package main
import (
"context"
"fmt"
"time"
"lowbit.dev/rungroup"
)
func main() {
var ticks int
svc := rungroup.NewIntervalService(time.Second, func(_ context.Context) error {
ticks++
return nil
})
ctx, cancel := context.WithCancel(context.Background())
cancel()
svc.Run(ctx) //nolint:errcheck
fmt.Println(ticks)
}
Output: 1
type Option ¶
type Option func(*managedService)
Option configures how a Service is supervised.
func WithBackoff ¶
WithBackoff provides a function to calculate the delay before a restart, based on the number of consecutive restarts.
If the function returns a negative duration, the rungroup treats it as a hard limit and permanently stops the service.
Example ¶
ExampleWithBackoff shows a backoff function that enforces a maximum retry count; Run wraps ErrRestartLimitExceeded once the limit is reached.
package main
import (
"context"
"errors"
"fmt"
"time"
"lowbit.dev/rungroup"
)
func main() {
var calls int
g := rungroup.New()
g.Add(
rungroup.ServiceFunc(func(_ context.Context) error {
calls++
return errors.New("failing")
}),
rungroup.WithRestartPolicy(rungroup.RestartAlways),
rungroup.WithBackoff(func(attempt int) time.Duration {
if attempt >= 3 {
return -1 // negative → give up
}
return 0
}),
)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
err := g.Run(ctx)
fmt.Println(errors.Is(err, rungroup.ErrRestartLimitExceeded))
fmt.Println(calls)
}
Output: true 3
func WithIsolateShutdown ¶
func WithIsolateShutdown() Option
WithIsolateShutdown prevents ErrShutdownAll from propagating to the parent rungroup when this service (typically a nested *Group) triggers one. The shutdown is absorbed at this service boundary and treated as a normal policy halt.
Example ¶
ExampleWithIsolateShutdown shows that ErrShutdownAll from a nested Group is absorbed at the service boundary; the parent only sees ErrPolicyHalt.
package main
import (
"context"
"errors"
"fmt"
"time"
"lowbit.dev/rungroup"
)
func main() {
inner := rungroup.New()
inner.Add(
rungroup.ServiceFunc(func(_ context.Context) error {
return rungroup.ErrShutdownAll
}),
rungroup.WithRestartPolicy(rungroup.RestartNever),
)
outer := rungroup.New()
outer.Add(inner,
rungroup.WithName("inner"),
rungroup.WithIsolateShutdown(),
)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
err := outer.Run(ctx)
fmt.Println(errors.Is(err, rungroup.ErrShutdownAll)) // absorbed at boundary
fmt.Println(errors.Is(err, rungroup.ErrPolicyHalt)) // surfaced instead
}
Output: false true
func WithName ¶
WithName assigns an identity to the service. Highly recommended for observability.
Example ¶
ExampleWithName shows that WithName labels events so restarts and halts are identifiable in the event stream.
package main
import (
"context"
"fmt"
"time"
"lowbit.dev/rungroup"
)
func main() {
var halted string
g := rungroup.New(rungroup.WithEventHandler(func(e rungroup.Event) {
if e.Type == rungroup.EventServiceHalted {
halted = e.ServiceName
}
}))
g.Add(
rungroup.ServiceFunc(func(_ context.Context) error {
return rungroup.ErrDoNotRestart
}),
rungroup.WithName("database"),
)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
g.Run(ctx) //nolint:errcheck
fmt.Println(halted)
}
Output: database
func WithRestartPolicy ¶
func WithRestartPolicy(p RestartPolicy) Option
WithRestartPolicy sets the restart behaviour. Defaults to RestartAlways.
Example (RestartAlways) ¶
ExampleWithRestartPolicy_restartAlways shows that RestartAlways restarts the service even after a clean (nil) return.
package main
import (
"context"
"fmt"
"time"
"lowbit.dev/rungroup"
)
func main() {
var calls int
g := rungroup.New()
g.Add(
rungroup.ServiceFunc(func(_ context.Context) error {
calls++
if calls >= 3 {
return rungroup.ErrDoNotRestart
}
return nil // clean exit: RestartAlways restarts anyway
}),
rungroup.WithRestartPolicy(rungroup.RestartAlways),
rungroup.WithBackoff(func(_ int) time.Duration { return 0 }),
)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
g.Run(ctx) //nolint:errcheck
fmt.Println(calls)
}
Output: 3
Example (RestartNever) ¶
ExampleWithRestartPolicy_restartNever shows that a service is not restarted after an error; Run wraps ErrPolicyHalt in the returned error.
package main
import (
"context"
"errors"
"fmt"
"time"
"lowbit.dev/rungroup"
)
func main() {
g := rungroup.New()
g.Add(
rungroup.ServiceFunc(func(_ context.Context) error {
return errors.New("fatal")
}),
rungroup.WithRestartPolicy(rungroup.RestartNever),
rungroup.WithName("worker"),
)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
fmt.Println(errors.Is(g.Run(ctx), rungroup.ErrPolicyHalt))
}
Output: true
Example (RestartOnFailure) ¶
ExampleWithRestartPolicy_restartOnFailure shows that a service is restarted only after a non-nil return; a clean nil exit stops the loop.
package main
import (
"context"
"errors"
"fmt"
"time"
"lowbit.dev/rungroup"
)
func main() {
var calls int
g := rungroup.New()
g.Add(
rungroup.ServiceFunc(func(_ context.Context) error {
calls++
if calls < 3 {
return errors.New("transient")
}
return rungroup.ErrDoNotRestart
}),
rungroup.WithRestartPolicy(rungroup.RestartOnFailure),
rungroup.WithBackoff(func(_ int) time.Duration { return 0 }),
)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
g.Run(ctx) //nolint:errcheck
fmt.Println(calls)
}
Output: 3
func WithServiceEventHandler ¶
WithServiceEventHandler registers an event handler that fires only for events originating from this service. It runs before the rungroup-level handler registered via WithEventHandler.
Example ¶
ExampleWithServiceEventHandler shows a per-service event handler that fires independently of any group-level handler.
package main
import (
"context"
"fmt"
"time"
"lowbit.dev/rungroup"
)
func main() {
g := rungroup.New()
g.Add(
rungroup.ServiceFunc(func(_ context.Context) error {
return rungroup.ErrDoNotRestart
}),
rungroup.WithName("cache"),
rungroup.WithServiceEventHandler(func(e rungroup.Event) {
if e.Type == rungroup.EventServiceHalted {
fmt.Printf("%s halted, has err: %v\n", e.ServiceName, e.Err != nil)
}
}),
)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
g.Run(ctx) //nolint:errcheck
}
Output: cache halted, has err: true
func WithServiceShutdownTimeout ¶
WithServiceShutdownTimeout sets the maximum duration the rungroup will wait for this specific service to exit after the rungroup context is cancelled. If exceeded, an EventShutdownTimeout event is emitted and the goroutine is abandoned.
This takes precedence over the rungroup-level WithShutdownTimeout for this service. If both are set, whichever expires first wins.
Example ¶
ExampleWithServiceShutdownTimeout shows a per-service deadline that only affects the service it is attached to.
package main
import (
"context"
"errors"
"fmt"
"time"
"lowbit.dev/rungroup"
)
func main() {
g := rungroup.New()
started := make(chan struct{})
g.Add(
rungroup.ServiceFunc(func(_ context.Context) error {
close(started)
time.Sleep(10 * time.Second)
return nil
}),
rungroup.WithName("stuck"),
rungroup.WithServiceShutdownTimeout(50*time.Millisecond),
)
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-started
cancel()
}()
fmt.Println(errors.Is(g.Run(ctx), rungroup.ErrShutdownTimeout))
}
Output: true
func WithStabilityWindow ¶
WithStabilityWindow sets the minimum duration a service must run continuously before its restart attempt counter is reset to zero. This prevents a service that runs stably for a long time from being penalised with a large backoff delay after an eventual crash.
Example ¶
ExampleWithStabilityWindow shows configuring a window that resets the restart attempt counter for services that run stably for a long period. This example is compiled but not executed (no Output comment) because the behaviour is timing-dependent.
package main
import (
"context"
"time"
"lowbit.dev/rungroup"
)
func main() {
g := rungroup.New()
g.Add(
rungroup.ServiceFunc(func(ctx context.Context) error {
<-ctx.Done()
return nil
}),
rungroup.WithRestartPolicy(rungroup.RestartAlways),
// Reset restart counter after the service has run for 30 s without crashing.
rungroup.WithStabilityWindow(30*time.Second),
// Exponential backoff; give up after 5 consecutive crashes.
rungroup.WithBackoff(func(attempt int) time.Duration {
if attempt > 5 {
return -1
}
return time.Duration(attempt) * 500 * time.Millisecond
}),
)
}
Output:
type RestartPolicy ¶
type RestartPolicy int
RestartPolicy defines when a service should be restarted after exiting.
const ( // RestartAlways restarts the service unconditionally, even if it returns nil. RestartAlways RestartPolicy = iota // RestartOnFailure restarts the service only if it returns a non-nil error. RestartOnFailure // RestartNever means the service is never restarted, regardless of its // return value. RestartNever )
type Service ¶
type Service interface {
// Run executes the service. It must respect ctx cancellation for clean
// shutdowns. If it panics, the rungroup catches it, converts the panic to
// an error, and evaluates it against the restart policy.
Run(ctx context.Context) error
}
Service represents a managed background operation.
type ServiceFunc ¶
ServiceFunc is an adapter to allow the use of ordinary functions as rungroup Services. If f is a function with the appropriate signature, ServiceFunc(f) is a Service that calls f.
Example ¶
ExampleServiceFunc shows the function adapter, which lets an ordinary function satisfy the Service interface.
package main
import (
"context"
"fmt"
"lowbit.dev/rungroup"
)
func main() {
svc := rungroup.ServiceFunc(func(ctx context.Context) error {
<-ctx.Done()
return nil
})
ctx, cancel := context.WithCancel(context.Background())
cancel()
fmt.Println(svc.Run(ctx))
}
Output: <nil>