processkit

package module
v0.0.0-...-4ce751e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2026 License: MIT Imports: 20 Imported by: 0

README

processkit-go

Go Reference CI Go Report Card

Kernel-backed, no-orphan child-process management for Go — a native implementation of the processkit model.

Every process you start — and everything it spawns — runs inside a kill-on-drop OS container (a Windows Job Object, a Linux cgroup v2 subtree, or a POSIX process group), so no descendant ever outlives your run. Capture output, read exit codes, set timeouts, and cancel through context.Context — with typed, errors.Is/errors.As-friendly errors.

Status: early (v0.x). The API is still taking shape and is not yet frozen.

Requirements

  • Go 1.25 or later
  • Windows or Unix (Linux, macOS, the BSDs)

Installation

go get github.com/ZelAnton/processkit-go

Platform support

Containment is kernel-backed and the active mechanism is observable (Group.Mechanism() / Result.Mechanism()) — never a silent downgrade. Where a platform can't honour an operation, processkit says so (ErrUnsupported / ErrResourceLimit) rather than skipping it silently.

Capability Windows Linux macOS / BSD
Containment (no-orphan teardown) Job Object — KILL_ON_JOB_CLOSE, kernel-enforced even if the parent is hard-killed cgroup v2 where available², else process group¹ process group¹
Run/capture, streaming, pipelines, supervision, retries, probes, CLI client, record/replay
Resource stats (Group.Stats, RunningProcess.Profile) ✅ CPU + peak memory + count per-process CPU/memory (/proc); group = count count only (no /proc)
Resource limits (WithMemoryMax / WithMaxProcesses / WithCPUQuota) ✅ enforced (Job Object) ✅ cgroup v2 where delegated², else ErrResourceLimit ErrResourceLimit
Signal(SignalKill) (atomic whole-tree kill)
Other signals, Suspend / Resume ErrUnsupported
Adopt an external process

¹ A POSIX process group is weaker than a Job Object: a child that calls setsid escapes it, and teardown needs the parent to dispatch the kill (the defer group.Close() path), so it is best-effort rather than kernel-enforced.

² On Linux, processkit prefers a cgroup v2 subtree (atomic clone3 placement, kernel ≥ 5.7; cgroup.kill teardown) — Group.Mechanism() reports CgroupV2. Limit enforcement additionally needs the process at the real cgroup-v2 root (controllers delegated): under systemd or in an ordinary container the controllers can't be enabled, so a requested limit fails fast with ErrResourceLimit rather than going unenforced. Where no cgroup can be made at all (no v2, no delegation, read-only fs), processkit falls back to a process group (limits → ErrResourceLimit).

Teardown asymmetry worth knowing: only the Windows Job Object's KILL_ON_JOB_CLOSE survives a hard kill of the parent. On Unix the no-orphan guarantee degrades to "holds as long as you don't forget defer group.Close()" (plus context cancellation) — there is no RAII or finalizer to lean on, by design.

Usage

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	ctx := context.Background()

	// Run-and-capture: a non-zero exit is data, not an error.
	res, err := processkit.Command("git", "rev-parse", "HEAD").Output(ctx)
	if err != nil {
		log.Fatal(err) // spawn failure, cancelled context, …
	}
	fmt.Println(res.Stdout(), res.Outcome())

	// Require success and get trimmed stdout directly.
	version, err := processkit.Command("go", "version").Run(ctx)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(version)

	// Feed a buffer into a tool (no shell) with WithStdinString.
	out, _ := processkit.Command("jq", ".name").
		WithStdinString(`{"name":"processkit"}`).Run(ctx)
	fmt.Println(out)
}

Pick the verb that fits: Output (the full Result), Run (trimmed stdout, must succeed), ExitCode (the code), or Probe (a yes/no predicate). Bound a run with .WithTimeout(d) and tear the whole tree down by cancelling the context.

For several processes that must die together — a server and its helpers — use a Group, a shared kill-on-drop container:

group, err := processkit.NewGroup()
if err != nil {
	log.Fatal(err)
}
defer group.Close() // reaps the whole tree, grandchildren included

server, err := group.Start(ctx, processkit.Command("my-server"))
if err != nil {
	log.Fatal(err)
}
_ = server
// ... use the server; group.Shutdown(ctx) ends it gracefully (SIGTERM → grace →
// SIGKILL on Unix; an atomic kill on Windows).
Streaming & interactive I/O

A group-started process can stream its output line by line. StreamLines enables a single merged channel of stdout and stderr lines, each tagged with its stream; it closes when the process has produced all its output (or when you cancel the context):

proc, err := group.Start(ctx, processkit.Command("journalctl", "-f"),
	processkit.StreamLines())
if err != nil {
	log.Fatal(err)
}
for line := range proc.Lines() {
	fmt.Printf("[%s] %s\n", line.Stream, line.Text)
}

The pieces compose — mix and match per start:

  • Callbacks: OnStdoutLine(fn) / OnStderrLine(fn) invoke fn per line.
  • Tees: WithStdout(w) / WithStderr(w) mirror the raw bytes to any io.Writer (pass a bytes.Buffer to capture the full output while streaming).
  • Interactive stdin: WithStdin(r) feeds input from any io.Reader; pass the read end of an io.Pipe to drive a child over time.
  • Backpressure: the channel is bounded (BufferLines(n)); by default a slow reader applies backpressure. OnOverflow(OverflowDropNewest) instead drops lines (counted by proc.DroppedLines()) so a slow reader never stalls the child.
  • Encoding: WithDecoder(fn) wraps each stream before line-splitting — plug in a golang.org/x/text/encoding reader for non-UTF-8 console output (no extra dependency is pulled into this module).
Pipelines

Chain commands shell-free with Pipe — each stage's stdout feeds the next stage's stdin over a real OS pipe, and the whole chain runs in one kill-on-drop container:

// grep error log.txt | sort | uniq -c
counts, err := processkit.Pipe(
	processkit.Command("grep", "error", "log.txt"),
	processkit.Command("sort"),
	processkit.Command("uniq", "-c"),
).Run(ctx)

The verbs mirror a single command (Output / Run / ExitCode / Probe) and return one Result folded by pipefail attribution: the captured stdout is always the last stage's, while the program, stderr, and exit code are the first failing stage's. Feed the head of the chain with .WithStdin(r) and bound the whole chain with .WithTimeout(d). For the producer | head pattern — where the producer is killed by SIGPIPE once the consumer stops reading — mark the producer .WithUncheckedInPipe() so it doesn't fail the chain.

Retry

Replay a failed run to success with WithRetry — you supply a classifier deciding which failures are worth another try (there is no default):

out, err := processkit.Command("curl", "-sf", "https://example.com").
	WithTimeout(10 * time.Second).
	WithRetry(5, time.Second, func(err error) bool { // up to 5 attempts, 1s apart
		return errors.Is(err, processkit.ErrTimeout) // only retry timeouts
	}).
	Run(ctx)

WithRetry(maxAttempts, backoff, retryIf) runs at most maxAttempts times total, sleeping backoff between tries, stopping on the first success, the first non-retryable failure, or the budget — returning the last error. A cancelled context is terminal (never retried). It applies to the success-requiring verbs (Run / ExitCode / Probe). For transient low-level spawn failures, the IsTransient helper is a ready-made classifier. This is replay-to-success; to keep a long-running process alive across crashes, use Supervise instead.

Resource metrics

Read whole-tree resource usage from a Group, or profile one run:

st, _ := group.Stats()                          // a snapshot
fmt.Println(st.ActiveProcesses())               // live process count
if cpu, ok := st.CPUTime(); ok { … }            // cumulative CPU (Job Object backend)

prof, _ := proc.Profile(ctx, 50*time.Millisecond) // sample one run to exit
fmt.Println(prof.Duration())                    // wall-clock, always available
if mem, ok := prof.PeakMemoryBytes(); ok { … }  // every optional metric returns ok

Group.SampleStats(ctx, every) returns a channel of snapshots. A metric a platform can't read is reported as unavailable (the ok bool), never an error — and the backends differ: the Job Object (Windows) and a cgroup v2 group (Linux) report count + CPU + peak memory; the process-group fallback reports the count only. Per-process RunningProcess.CPUTime / PeakMemoryBytes work on Linux and Windows, not macOS.

Resource limits

Cap the whole tree's resources when you create the group:

group, err := processkit.NewGroup(
    processkit.WithMemoryMax(512*1024*1024), // 512 MiB across the tree
    processkit.WithMaxProcesses(64),         // at most 64 live processes
    processkit.WithCPUQuota(1.5),            // 1.5 cores' worth of CPU
)
if errors.Is(err, processkit.ErrResourceLimit) {
    // the active mechanism can't enforce a requested cap — handle, don't ignore
}

Limits are a Group facility — applied to the OS container at creation. There is deliberately no per-run (Cmd) or per-Pipe cap; to bound a command's tree, Start it into a limited group. Every cap bounds the whole tree, not one process. Enforcement needs a real container: a Windows Job Object honours all three. A mechanism with no whole-tree limit primitive does not silently ignore a cap — NewGroup returns a *ResourceLimitError (matching ErrResourceLimit) so you never get a group you believe is bounded but isn't. An invalid value (zero, negative, non-finite) is rejected the same way. The honesty matrix:

Backend WithMemoryMax / WithMaxProcesses / WithCPUQuota
Windows Job Object ✅ enforced (job memory / active-process / CPU hard cap¹)
Linux cgroup v2 (real root) ✅ enforced (memory.max / pids.max / cpu.max)
Linux cgroup v2 (container / systemd) ErrResourceLimit (controllers can't be delegated)
Linux process group, macOS / BSD ErrResourceLimit (no whole-tree cap primitive)

¹ The Windows CPU cap is expressed against total system CPU, so WithCPUQuota is approximate (converted via the host core count) and saturates at 100%.

On Linux the caps are enforced by a cgroup v2 subtree (memory.max / pids.max / cpu.max) — but only where this process sits at the real cgroup-v2 root with the controllers delegated. Under systemd or in an ordinary container the controllers can't be enabled (cgroup v2's "no internal processes" rule), so a requested cap fails fast with ErrResourceLimit — never a silently-unbounded group. That fail-fast is the common path off bare metal.

Whole-tree process control

A Group can signal, pause, and adopt processes as a unit:

group.Signal(processkit.SignalHup)   // reload the whole tree (SIGHUP)
group.Suspend(); group.Resume()      // freeze and thaw the tree
group.Adopt(cmd.Process)             // pull an externally-started process in

Operations a platform can't honour return ErrUnsupported explicitly — never a silent no-op. The honesty matrix:

Operation Unix Windows
Signal(SignalKill) ✅ killpg / atomic ✅ TerminateJobObject
Signal(other / RawSignal) ✅ killpg ErrUnsupported
Suspend / Resume ✅ SIGSTOP / SIGCONT ErrUnsupported
Adopt ✅ setpgid / solo ✅ AssignProcessToJobObject
Processes
Readiness probes

A group-started process can be probed for readiness — and a probe never kills the process, so if it isn't ready you decide what to do:

server, _ := group.Start(ctx, processkit.Command("my-server"), processkit.StreamLines())

// Any one of:
line, err := server.WaitForLine(ctx, func(s string) bool {     // a log line appears
	return strings.Contains(s, "listening")
}, 10*time.Second)
err = server.WaitForPort(ctx, "127.0.0.1:8080", 10*time.Second) // a TCP port accepts
err = server.WaitFor(ctx, func(ctx context.Context) bool {      // a custom check passes
	return healthCheck(ctx) == nil
}, 10*time.Second)

On the deadline (or if the process exits first) a probe returns a *NotReadyError (errors.Is(err, processkit.ErrNotReady)) — distinct from ErrTimeout, which is the run's own deadline that does tear the tree down. WaitForLine watches the merged stdout/stderr stream, so the process must be started with StreamLines().

Supervision

Keep a command alive with Supervise — it re-runs the command on a crash with capped-exponential backoff (and jitter), until a stop condition is met:

outcome, err := processkit.Supervise(processkit.Command("my-worker")).
	WithMaxRestarts(10).               // give up after 10 restarts
	WithBackoff(time.Second, 2).       // 1s, 2s, 4s, … capped at WithMaxBackoff (30s)
	Run(ctx)
// outcome.Stopped is StoppedPolicySatisfied / StoppedByPredicate / StoppedRestartsExhausted;
// outcome.Final holds the last run's Result; outcome.Restarts counts the re-runs.

A crash is any non-success run (a non-zero/rejected exit, a timeout, a signal kill, or a spawn failure). The default policy (RestartOnCrash) stops on the first clean run; WithRestart(RestartAlways) keeps re-running and WithRestart(RestartNever) runs once. Stop on a custom condition with StopWhen(func(*Result) bool). For a flap-prone service, turn on the failure-storm guard with WithStormPause(d) so a tight crash loop pauses instead of hammering. Supervision is sequential — the whole process tree is reaped before each restart — and a cancelled context ends it promptly.

Typed CLI wrappers & test doubles

To wrap one CLI tool (git, gh, jj, …), build a CliClient — it injects the program, defaults, and runner once, so your wrapper is just argument-building and output-parsing, and is mockable by construction:

type Git struct{ client *processkit.CliClient }

func NewGit() *Git {
	// WithEnv replaces the environment; AppendEnv adds to the inherited one:
	// .AppendEnv("GIT_TERMINAL_PROMPT=0")
	return &Git{client: processkit.NewClient("git")}
}
func (g *Git) CurrentBranch(ctx context.Context) (string, error) {
	return g.client.Run(ctx, "rev-parse", "--abbrev-ref", "HEAD")
}

The processkittest package gives you ready-made fakes for the ProcessRunner seam — no real subprocess in your tests:

import "github.com/ZelAnton/processkit-go/processkittest"

scripted := processkittest.NewScriptedRunner().
	On([]string{"git", "rev-parse", "--abbrev-ref", "HEAD"}, processkittest.OK("main")).
	Fallback(processkittest.Fail(1, "unexpected command"))
git := &Git{client: processkit.NewClient("git").WithRunner(scripted)}
// git.CurrentBranch(ctx) now returns "main" with no `git` on the machine.

ScriptedRunner answers commands with canned Replys (OK / Fail / TimedOut / Signalled / Err / Pending), and an unexpected command fails loudly. RecordingRunner records the invocations a wrapper builds so you can assert on them (rec.OnlyCall().Args). Writing your own runner? Build its results with processkit.NewResult.

Recording & replaying runs

RecordReplayRunner captures real runs to a JSON cassette once, then replays them hermetically — no subprocess in CI:

// Record once against the real tool:
rec := processkittest.Record("testdata/git.json", processkit.JobRunner{})
out, _ := processkit.Command("git", "--version").WithRunner(rec).Output(ctx)
_ = rec.Save() // writes the cassette (0600 on Unix)

// Replay everywhere else — identical results, no `git` needed:
rep, _ := processkittest.Replay("testdata/git.json")
out2, _ := processkit.Command("git", "--version").WithRunner(rep).Output(ctx)
// out2 matches out; an unrecorded command is a *CassetteMissError (errors.Is
// ErrCassetteMiss), distinct from a missing program, and never spawns anything.

A run is matched on program + args + working directory (environment is excluded, so an irrelevant env difference can't cause a miss). A command recorded twice replays in capture order, then repeats the last. Only runs that complete with a Result are recorded — a spawn failure, not-found, or cancellation records nothing (a non-zero exit, signal, or timeout is a result and is recorded). Replay is instant, not timing-faithful (Duration() is zero).

Secrets: a cassette redacts environment values (it stores variable names only), but program, args, cwd, stdout, and stderr are stored verbatim and can carry secrets — a --password=… flag, a token echoed to output. Review a fixture before committing it. On Unix the file is written owner-only (0600) and the write refuses to follow a symlink; on Windows it inherits the directory ACL, so keep the fixture directory restricted.

Observability

Attach a log/slog logger with WithLogger — on a Cmd, a Pipeline, a Supervisor, a CliClient, or a Group (the WithLogger option). It is off by default.

logger := slog.New(slog.NewJSONHandler(os.Stderr, nil))
out, _ := processkit.Command("ffmpeg", "-i", "in.mov", "out.mp4").
    WithLogger(logger).Output(ctx)
// {"level":"DEBUG","msg":"child spawned","program":"ffmpeg","pid":4242,"mechanism":"JobObject"}
// {"level":"DEBUG","msg":"process exited","program":"ffmpeg","outcome":"exited(0)","elapsed_ms":1830}

Events cover spawn/exit, timeout and cancellation, group teardown and graceful shutdown, retries, and supervisor restarts / failure-storm pauses — at Debug for ordinary lifecycle and Warn for anomalies. They carry the program name, pid, mechanism, outcome, and durations — but never the command's arguments, environment, working directory, or output, which routinely carry secrets. That rule is structural: those values can't reach a log record.

The logger flows with the value it's set on: a Cmd.WithLogger command logs each run even through OutputAll (set it per command — there is no batch-level logger), and a Group.WithLogger group's started processes inherit it.

Changelog

See CHANGELOG.md for the version history.

Contributing

See CONTRIBUTING.md for build/test instructions and conventions. To report a security issue, follow SECURITY.md — please do not open a public issue.

License

This project is licensed under the MIT License.

Documentation

Overview

Package processkit runs and supervises child processes with a kernel-backed, no-orphan guarantee: every process you start — and everything it spawns — lives in a kill-on-drop OS container (a Windows Job Object, or a POSIX process group), so no descendant outlives your program.

Start every run with Command; the verb you finish with decides what you get back:

  • Cmd.Output — the full Result; a non-zero exit is data, never an error.
  • Cmd.Run — trimmed stdout as a string; a non-zero exit / timeout errors.
  • Cmd.ExitCode — the exit code (a timeout or signal kill errors, not -1).
  • Cmd.Probe — a bool: exit 0 → true, 1 → false, anything else errors.

A run is bounded by Cmd.WithTimeout (captured in the Result as Outcome.TimedOut) and by the context.Context passed to every verb (cancelling it, or its own deadline elapsing, is an error — see ErrCancelled). Errors are typed: match the sentinels with errors.Is (ErrTimeout, ErrCancelled, ErrNotFound, …) and the rich *ExitError / *NotFoundError with errors.As.

For several processes that must die together — a server and its helpers — use a Group: a shared kill-on-drop container. Group.Start runs each process into one OS container; Group.Close (use `defer`) reaps the whole tree. Group.Shutdown ends it gracefully (SIGTERM → grace → SIGKILL on Unix; an immediate atomic kill on Windows). WaitAny / WaitAll race or join started processes, and OutputAll runs a batch of commands with a concurrency cap.

A group-started process can stream its output: pass StreamLines and range over RunningProcess.Lines, a merged stdout/stderr channel of tagged [Line]s that closes at EOF. Per-line callbacks (OnStdoutLine / OnStderrLine), raw tees (WithStdout / WithStderr), interactive WithStdin, a bounded-buffer policy (BufferLines / OnOverflow), and an encoding WithDecoder all compose as [StartOption]s on Group.Start. To feed a *captured* run its standard input, use Cmd.WithStdin (e.g. piping a buffer into `jq`); a live Group.Start uses the WithStdin start option, and a Pipe wires each stage's stdin from the previous (its head from Pipeline.WithStdin).

To chain commands shell-free — a | b | c — use Pipe: each stage's stdout feeds the next stage's stdin over a real OS pipe, and the whole chain runs in one kill-on-drop container, so it lives and dies together. The verbs mirror Cmd and return one Result folded by *pipefail attribution*: the captured stdout is always the last stage's, while the program, stderr, and exit code are the first failing (non-exempt) stage's — preferring a real culprit over an upstream stage killed by SIGPIPE. A stage marked Cmd.WithUncheckedInPipe is exempt from blame (the `producer | head` pattern). Pipeline.WithTimeout bounds the whole chain.

A Group also gives whole-tree process control: Group.Signal sends a Signal to every member (only SignalKill is portable — the others are Unix-only and return ErrUnsupported on Windows), Group.Suspend / Group.Resume freeze and thaw the tree (Unix only), and Group.Adopt pulls an externally-started process into the group's containment. Operations a platform can't honour return ErrUnsupported explicitly — never a silent no-op.

Resource usage is available too: Group.Stats (and the Group.SampleStats channel) report a whole-tree snapshot — live process count, and, on the Job Object backend, cumulative CPU and peak memory; RunningProcess.Profile samples one run over its lifetime into a RunProfile. Metrics a platform can't read are reported as unavailable (an ok bool), never an error.

A group can also cap the whole tree's resources at creation: NewGroup takes WithMemoryMax, WithMaxProcesses, and WithCPUQuota (a Group-only facility — there is no per-run or per-Pipe cap; Start a command into a limited group to bound it). A Windows Job Object and a Linux cgroup v2 subtree enforce all three; but cgroup enforcement needs the process at the real cgroup-v2 root (not under systemd or in a container), and macOS/BSD have no whole-tree cap, so where a cap can't be enforced NewGroup fails with a *ResourceLimitError (matching ErrResourceLimit) rather than hand back a silently-unbounded group. An unenforced limit is no protection.

A group-started process can be probed for readiness: RunningProcess.WaitForLine waits for a line of its output to match, RunningProcess.WaitForPort waits for a TCP address to accept connections, and RunningProcess.WaitFor polls a custom predicate. These wait for the process to become *ready* (and leave it running) — not for it to *exit*, which is RunningProcess.Wait. A probe never kills the process: if it isn't ready by the deadline you get a *NotReadyError (matching ErrNotReady, distinct from ErrTimeout) and decide what to do next.

To keep a command alive, wrap it in a Supervisor: it re-runs the command on a crash with capped-exponential backoff (plus jitter, and an optional failure-storm guard) until a stop condition is met — a clean run, a Supervisor.StopWhen predicate, or an exhausted restart budget. Supervision is sequential and single-flight, so a command's whole tree is always reaped before a restart. Supervisor.Run returns a SupervisionOutcome describing why it stopped and how many restarts it took. To replay one run to *success* — rather than keep a process alive — attach Cmd.WithRetry to a verb instead: it retries a failed run, with a classifier deciding which failures are worth another try.

The ProcessRunner interface is the dependency-injection and test seam: swap the real JobRunner for a fake to test command-running code with no subprocess. The Cmd verbs and Supervisor run through it; Group.Start and Pipe always run real processes (their containment is the point), so they can't be faked this way.

For observability, attach an optional log/slog logger with WithLogger — on a Cmd, a Pipeline, a Supervisor, a CliClient, or a Group (the WithLogger option). It is off by default. Events cover spawn and exit, timeout and cancellation, group teardown and graceful shutdown, retries, supervisor restarts and failure-storm pauses — at Debug for ordinary lifecycle and Warn for anomalies. They carry the program name, pid, mechanism, outcome, and durations, but NEVER the command's arguments, environment, working directory, or output, which routinely carry secrets.

To build a reusable typed wrapper around one CLI tool (git, gh, …), use a CliClient: it injects the program, defaults, and runner once, so the wrapper is just argument-building and output-parsing — and is mockable by construction. The processkittest package provides ready-made fakes for the seam (a scripted runner, a recording runner, and a record/replay runner that captures real runs to a JSON cassette and replays them hermetically); a hand-written custom runner builds its [Result]s with NewResult.

Context cancellation is reported two ways, by design: every verb that owns the run — the Cmd, Pipeline, and Supervisor verbs — wraps it in a *CancelError (a rich typed error), while the live-handle observers (RunningProcess.Wait, WaitAny, WaitAll, and the readiness probes) return the bare context error, so you can errors.Is it against context.Canceled / context.DeadlineExceeded without unwrapping. Note also that the stream options (WithStdin, WithStdout, StreamLines, …) and the group limit options (WithMemoryMax, …) are package-level functions, not Cmd methods, because they configure a live Group.Start / NewGroup rather than the capture builder.

Only Windows and Unix (Linux, macOS, the BSDs) are supported.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrCancelled means the run was abandoned via its context. Unlike a timeout
	// (which is captured in the [Result]), a cancellation is always an error and
	// carries no output. It wins over a co-occurring timeout.
	ErrCancelled = errors.New("processkit: run cancelled")

	// ErrTimeout means the run exceeded its deadline and was killed. A timed-out
	// [*ExitError] matches this via errors.Is.
	ErrTimeout = errors.New("processkit: run timed out")

	// ErrUnsupported means the operation is not available on this platform (e.g.
	// a non-kill [Group.Signal], or [Group.Suspend] / [Group.Resume], on Windows).
	// Never a silent skip.
	ErrUnsupported = errors.New("processkit: operation not supported on this platform")

	// ErrNotReady means a readiness probe did not pass within its deadline (or can
	// no longer pass). Distinct from ErrTimeout, which is the run's own deadline.
	ErrNotReady = errors.New("processkit: readiness probe did not pass")

	// ErrResourceLimit means a requested whole-tree resource cap could not be
	// enforced — never a silently-unbounded group.
	ErrResourceLimit = errors.New("processkit: resource limit could not be enforced")

	// ErrNotFound means the program could not be found. A [*NotFoundError] matches
	// this via errors.Is.
	ErrNotFound = errors.New("processkit: program not found")

	// ErrStart means the process could not be started (spawned or contained). A
	// [*StartError] matches this via errors.Is; unwrap it for the underlying OS cause.
	ErrStart = errors.New("processkit: failed to start the process")

	// ErrTooFewStages means a [Pipeline] was run with fewer than two stages. A
	// pipeline needs at least two commands to chain.
	ErrTooFewStages = errors.New("processkit: a pipeline needs at least two stages")
)

Sentinel errors for use with errors.Is. The data-carrying error types below match the relevant sentinels through their Is methods.

View Source
var (
	SignalTerm = Signal{/* contains filtered or unexported fields */} // graceful stop (SIGTERM)
	SignalKill = Signal{/* contains filtered or unexported fields */} // hard kill (SIGKILL; the only portable signal)
	SignalInt  = Signal{/* contains filtered or unexported fields */} // interrupt (SIGINT)
	SignalHup  = Signal{/* contains filtered or unexported fields */} // hang up / reload (SIGHUP)
	SignalQuit = Signal{/* contains filtered or unexported fields */} // quit with core (SIGQUIT)
	SignalUsr1 = Signal{/* contains filtered or unexported fields */} // user-defined 1 (SIGUSR1)
	SignalUsr2 = Signal{/* contains filtered or unexported fields */} // user-defined 2 (SIGUSR2)
)

The curated signals. They map to the platform's matching syscall signal on Unix; on Windows only SignalKill is deliverable (it routes to the atomic job kill).

Functions

func IsTransient

func IsTransient(err error) bool

IsTransient reports whether err is a transient, low-level failure worth retrying — an interrupted or would-block syscall, a busy executable or file, or (on Windows) a sharing/lock violation. It is meant as a Cmd.WithRetry classifier for spawn hiccups.

It deliberately does NOT treat a non-zero exit code or a timeout as transient: those are domain-specific (a "git exited 128" is not generically retryable). Classify those yourself, e.g. errors.Is(err, ErrTimeout) to retry timeouts.

Types

type BatchOutput

type BatchOutput struct {
	Result *Result
	Err    error
}

BatchOutput is one command's independent result from OutputAll: either a captured Result (any exit code) or an error (spawn failure, cancellation).

func OutputAll

func OutputAll(ctx context.Context, cmds []*Cmd, concurrency int) []BatchOutput

OutputAll runs every command to completion and captures each result, with at most concurrency runs in flight at once (so fanning out hundreds of commands can't exhaust file descriptors or the process table). It is collect-all: a non-zero exit never short-circuits the batch — each element is independent, in input order.

type CancelError

type CancelError struct {
	Program string
	Cause   error // context.Canceled or context.DeadlineExceeded
}

CancelError reports that a run was ended by the caller's context — either cancelled or its deadline elapsed. It carries no captured output (the run was abandoned). Matches errors.Is(err, ErrCancelled); Cause is the underlying context error, so errors.Is(err, context.Canceled) / context.DeadlineExceeded also work. (A run's *own* Cmd.WithTimeout deadline is captured in the Result instead — see Outcome.TimedOut.)

func (*CancelError) Error

func (e *CancelError) Error() string

Error renders the cancellation, distinguishing a cancelled context from an elapsed parent deadline (the Is/Unwrap match against ErrCancelled and the underlying context error is unaffected either way).

func (*CancelError) Is

func (e *CancelError) Is(target error) bool

Is matches the ErrCancelled sentinel.

func (*CancelError) Unwrap

func (e *CancelError) Unwrap() error

Unwrap exposes the underlying context error to errors.Is / errors.As.

type CliClient

type CliClient struct {
	// contains filtered or unexported fields
}

CliClient is a small, reusable core for building a typed wrapper around one external CLI tool (git, jj, gh, …). It holds the program name, a ProcessRunner, and per-client defaults (timeout, environment, working directory, ok-codes), and hands back pre-configured commands — so a wrapper reduces to a typed facade over its argument-building and output-parsing, with the process plumbing injected once.

Because the runner is injectable, a wrapper built on a CliClient is mockable by construction: give it a fake runner (see the processkittest package) and no real subprocess runs. The idiomatic wrapper embeds a client:

type Git struct{ client *processkit.CliClient }

func NewGit() *Git { return &Git{client: processkit.NewClient("git")} }
func (g *Git) Status(ctx context.Context) (string, error) {
	return g.client.Run(ctx, "status", "--porcelain")
}

CliClient is a value built by chainable WithX methods (each returning a new, independent *CliClient).

Example
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	ctx := context.Background()
	// A reusable core for a typed git wrapper: program + defaults injected once.
	// (Pass append(os.Environ(), "GIT_TERMINAL_PROMPT=0")... to WithEnv to add a
	// var without dropping the inherited environment.)
	git := processkit.NewClient("git").WithTimeout(10 * time.Second)

	branch, err := git.Run(ctx, "rev-parse", "--abbrev-ref", "HEAD")
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(branch)
}

func NewClient

func NewClient(program string) *CliClient

NewClient starts a client for the given program, with a real JobRunner and no defaults. Layer defaults with the WithX methods.

func (*CliClient) AppendEnv

func (c *CliClient) AppendEnv(env ...string) *CliClient

AppendEnv returns a copy of the client whose commands add env to the inherited environment (rather than replacing it, as CliClient.WithEnv does). See Cmd.AppendEnv.

func (*CliClient) Command

func (c *CliClient) Command(args ...string) *Cmd

Command builds a command for a subcommand: the client's program and defaults with args appended. Chain more WithX on it to override a default for this one call, then finish with a verb — or use the CliClient.Run etc. shortcuts.

func (*CliClient) ExitCode

func (c *CliClient) ExitCode(ctx context.Context, args ...string) (int, error)

ExitCode runs the subcommand and returns its exit code.

func (*CliClient) Output

func (c *CliClient) Output(ctx context.Context, args ...string) (*Result, error)

Output runs the subcommand and returns the full Result (a non-zero exit is data, not an error). Shortcut for c.Command(args...).Output(ctx).

func (*CliClient) Probe

func (c *CliClient) Probe(ctx context.Context, args ...string) (bool, error)

Probe runs the subcommand as a yes/no predicate (exit 0 → true, 1 → false).

func (*CliClient) Run

func (c *CliClient) Run(ctx context.Context, args ...string) (string, error)

Run runs the subcommand and returns trimmed stdout; a non-zero exit errors. Shortcut for c.Command(args...).Run(ctx).

func (*CliClient) WithDir

func (c *CliClient) WithDir(dir string) *CliClient

WithDir returns a copy of the client whose commands default to the working directory dir (a per-command Cmd.WithDir overrides it).

func (*CliClient) WithEnv

func (c *CliClient) WithEnv(env ...string) *CliClient

WithEnv returns a copy of the client whose commands run with the full environment env (each entry "KEY=VALUE"), *replacing* the inherited one — so include everything the tool needs (HOME, PATH, …), not just an override. For the common "inherit, plus set a few" case, pass append(os.Environ(), "GIT_TERMINAL_PROMPT=0").

func (*CliClient) WithLogger

func (c *CliClient) WithLogger(logger *slog.Logger) *CliClient

WithLogger returns a copy of the client whose commands emit structured log/slog lifecycle events (see Cmd.WithLogger). The default is no logging; pass nil to disable. Arguments and environment are never logged.

func (*CliClient) WithOkCodes

func (c *CliClient) WithOkCodes(codes ...int) *CliClient

WithOkCodes returns a copy of the client whose commands default to treating the listed exit codes as success in addition to 0.

func (*CliClient) WithRunner

func (c *CliClient) WithRunner(r ProcessRunner) *CliClient

WithRunner returns a copy of the client whose commands all run through r — the dependency-injection and test seam. The default is a JobRunner.

func (*CliClient) WithTimeout

func (c *CliClient) WithTimeout(d time.Duration) *CliClient

WithTimeout returns a copy of the client whose commands default to the deadline d (a per-command Cmd.WithTimeout overrides it).

type Cmd

type Cmd struct {
	// contains filtered or unexported fields
}

Cmd describes a command to run: a program, its arguments, and run options. Build it with Command and the chainable WithX methods, then finish with a verb (Cmd.Output, Cmd.Run, Cmd.ExitCode, Cmd.Probe).

Each WithX method returns a new, independent *Cmd (copy-on-write), so a partly configured command is safe to reuse and branch:

base := processkit.Command("git").WithDir(repo)
status := base.WithArgs("status")  // base is unchanged
log := base.WithArgs("log")        // independent of status

func Command

func Command(program string, args ...string) *Cmd

Command starts building a command that runs program with args. Finish with a verb (Output / Run / ExitCode / Probe).

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	ctx := context.Background()

	// Run-and-capture; a non-zero exit is data, not an error.
	res, err := processkit.Command("git", "rev-parse", "HEAD").Output(ctx)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(res.Stdout(), res.Outcome())
}

func (*Cmd) AppendEnv

func (c *Cmd) AppendEnv(env ...string) *Cmd

AppendEnv returns a copy of the command with entries added to its environment. Unlike Cmd.WithEnv (which replaces the whole environment), AppendEnv builds on the existing one — the inherited process environment if WithEnv was never called — so it is the tool for the common "inherit, plus set a few" case (e.g. AppendEnv("GIT_TERMINAL_PROMPT=0")). A later entry overrides an earlier one for the same key, per exec's last-wins rule.

func (*Cmd) ExitCode

func (c *Cmd) ExitCode(ctx context.Context) (int, error)

ExitCode runs the command and returns its exit code. A run with no exit code (a timeout or signal kill) is an error rather than a fabricated -1. Honours Cmd.WithRetry.

func (*Cmd) Output

func (c *Cmd) Output(ctx context.Context) (*Result, error)

Output runs the command and returns the full Result. A non-zero exit is data here, not an error; only a spawn failure, a cancelled context, or a context deadline errors.

func (*Cmd) Probe

func (c *Cmd) Probe(ctx context.Context) (bool, error)

Probe runs the command as a yes/no predicate: exit 0 → true, exit 1 → false, anything else (another code, a timeout, a signal kill) → error. OkCodes does not apply to a probe. Honours Cmd.WithRetry.

func (*Cmd) Run

func (c *Cmd) Run(ctx context.Context) (string, error)

Run requires a successful exit and returns stdout as text with trailing whitespace trimmed. A non-zero exit, timeout, signal kill, or cancellation is an error. Honours Cmd.WithRetry.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	// Run requires success and returns trimmed stdout.
	version, err := processkit.Command("go", "version").Run(context.Background())
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(version)
}

func (*Cmd) WithArgs

func (c *Cmd) WithArgs(args ...string) *Cmd

WithArgs returns a copy of the command with additional arguments appended.

func (*Cmd) WithDir

func (c *Cmd) WithDir(dir string) *Cmd

WithDir returns a copy of the command with the given working directory.

func (*Cmd) WithEnv

func (c *Cmd) WithEnv(env ...string) *Cmd

WithEnv returns a copy of the command with the full environment set, replacing the inherited one. Each entry is "KEY=VALUE"; calling it with no entries runs with an *empty* environment (no PATH) — usually you want to pass through the vars the program needs, or use Cmd.AppendEnv to add to the inherited set.

func (*Cmd) WithLogger

func (c *Cmd) WithLogger(logger *slog.Logger) *Cmd

WithLogger returns a copy of the command that emits structured log/slog events over its lifetime — spawn, exit, timeout, cancellation, and retries. The default is no logging; pass nil to disable. The events carry the program name, pid, mechanism, outcome, and durations, but NEVER the command's arguments, environment, working directory, or output — those routinely carry secrets. Lifecycle events come from the built-in JobRunner; with a custom [WithRunner] only the retry events are emitted (the runner logs its own runs).

Example
package main

import (
	"context"
	"fmt"
	"log"
	"log/slog"
	"os"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	// Structured slog events for the run's lifecycle (spawn, exit, timeout, …). The
	// program name, pid, and outcome are logged; arguments and environment never are.
	logger := slog.New(slog.NewJSONHandler(os.Stderr, nil))
	out, err := processkit.Command("git", "rev-parse", "HEAD").
		WithLogger(logger).
		Output(context.Background())
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(out.Stdout())
}

func (*Cmd) WithOkCodes

func (c *Cmd) WithOkCodes(codes ...int) *Cmd

WithOkCodes returns a copy of the command whose listed exit codes count as success in addition to 0. Affects Result.Success and the success-requiring verbs, but not Cmd.Probe.

func (*Cmd) WithRetry

func (c *Cmd) WithRetry(maxAttempts int, backoff time.Duration, retryIf func(error) bool) *Cmd

WithRetry returns a copy of the command that replays a failed run up to maxAttempts times total (so maxAttempts <= 1 runs exactly once), sleeping backoff between tries, but only while retryIf classifies the failure as retryable. It stops on the first success, the first non-retryable failure, or the attempt budget — returning the last error unchanged (there is no retries-exhausted error). A cancelled context is terminal: it is never retried, whatever retryIf says, and it aborts a backoff sleep promptly.

Retry applies to the success-requiring verbs (Cmd.Run, Cmd.ExitCode, Cmd.Probe) — the ones that turn a bad run into an error for retryIf to judge. It does NOT apply to Cmd.Output (a non-zero exit there is data, not an error), nor to a command used as a Pipe stage or under a Supervisor (those have their own control flow). There is no default classifier; pass one — for example errors.Is(err, ErrTimeout) to retry timeouts, or IsTransient for transient low-level spawn failures. A nil retryIf retries nothing (the command runs once).

Example
package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"time"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	ctx := context.Background()
	// Retry a flaky fetch up to 5 times, 1s apart, but only when it times out.
	out, err := processkit.Command("curl", "-sf", "https://example.com").
		WithTimeout(10*time.Second).
		WithRetry(5, time.Second, func(err error) bool {
			return errors.Is(err, processkit.ErrTimeout)
		}).
		Run(ctx)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(out)
}

func (*Cmd) WithRunner

func (c *Cmd) WithRunner(r ProcessRunner) *Cmd

WithRunner returns a copy of the command that executes through r — the dependency-injection seam for tests. The default is a JobRunner.

func (*Cmd) WithStdin

func (c *Cmd) WithStdin(r io.Reader) *Cmd

WithStdin returns a copy of the command that feeds r as the process's standard input for the capture verbs (Cmd.Output, Cmd.Run, Cmd.ExitCode, Cmd.Probe) — e.g. streaming a source into a tool. r is read ONCE as the run proceeds, so it is not safe to reuse across attempts: with Cmd.WithRetry or under a Supervisor (which re-run the command) a second attempt sees EOF — use Cmd.WithStdinBytes / Cmd.WithStdinString for a re-readable buffer instead. WithStdin does NOT apply to a command used as a Pipe stage (the chain wires stdin) or started in a Group (use the WithStdin start option there); record/replay cassettes reject a command with stdin, whose result isn't reproducible from the recorded key.

Example
package main

import (
	"context"
	"fmt"
	"log"
	"strings"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	// Pipe a buffer into a tool and capture its output (no shell).
	out, err := processkit.Command("tr", "a-z", "A-Z").
		WithStdin(strings.NewReader("hello")).
		Run(context.Background())
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(out)
}

func (*Cmd) WithStdinBytes

func (c *Cmd) WithStdinBytes(b []byte) *Cmd

WithStdinBytes returns a copy of the command that feeds b as the process's standard input for the capture verbs. Unlike Cmd.WithStdin, the buffer is re-readable — each run reads it afresh — so it is safe to combine with Cmd.WithRetry and Supervisor (each attempt/restart gets the full input). The other limitations of Cmd.WithStdin (Pipe/Group/cassette) apply.

func (*Cmd) WithStdinString

func (c *Cmd) WithStdinString(s string) *Cmd

WithStdinString returns a copy of the command that feeds s as the process's standard input — the string form of Cmd.WithStdinBytes (re-readable).

func (*Cmd) WithTimeout

func (c *Cmd) WithTimeout(d time.Duration) *Cmd

WithTimeout returns a copy of the command bounded by d. At the deadline the process tree is killed and the Result reports Outcome.TimedOut — a timeout is captured in the result, not raised, until a success-requiring verb turns it into an error. (Cancelling the context you pass is different: that is an error.)

func (*Cmd) WithUncheckedInPipe

func (c *Cmd) WithUncheckedInPipe() *Cmd

WithUncheckedInPipe returns a copy of the command exempt from a Pipeline's pipefail attribution: as a pipeline stage, its failure never blames the chain — a non-zero exit always, and for a non-final stage a signal (including the SIGPIPE it gets when a downstream stage stops reading) or its own per-stage timeout too. This is the tool for the `producer | head` pattern. A final stage is only forgiven its non-zero exit; a timeout or signal kill still surfaces. Outside a pipeline it has no effect.

type ExitError

type ExitError struct {
	Program   string
	Outcome   Outcome
	Stdout    string
	Stderr    string
	Mechanism Mechanism
}

ExitError reports a run that completed but was not a success — a non-zero exit code, a signal kill (Unix), or a timeout. It carries the captured output so the caller can diagnose the failure. Match it with errors.As; a timed-out ExitError additionally matches errors.Is(err, ErrTimeout).

func (*ExitError) Error

func (e *ExitError) Error() string

Error renders a safe, bounded summary. Captured streams are previewed (not dumped in full) and sanitized so child-controlled bytes can't inject terminal escapes or bidi overrides (Trojan-Source, CVE-2021-42574).

func (*ExitError) Is

func (e *ExitError) Is(target error) bool

Is reports a match for ErrTimeout when this exit was a timeout, so errors.Is(err, ErrTimeout) works on a timed-out ExitError.

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group is an explicit, shared kill-on-drop container for a set of processes. Every process started into the group — and everything those processes spawn — lives in one OS container (a Windows Job Object, or POSIX process groups), so Group.Close reaps the whole tree, grandchildren included. Always pair a Group with `defer group.Close()`.

Example
package main

import (
	"context"
	"log"
	"time"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	ctx := context.Background()

	group, err := processkit.NewGroup()
	if err != nil {
		log.Fatal(err)
	}
	defer group.Close() // reaps the whole tree, grandchildren included

	server, err := group.Start(ctx, processkit.Command("my-server"))
	if err != nil {
		log.Fatal(err)
	}
	_ = server

	// ... use the server, then end it gracefully (SIGTERM → grace → SIGKILL on
	// Unix; an atomic kill on Windows):
	_ = group.Shutdown(ctx, processkit.ShutdownGrace(5*time.Second))
}
Example (Streaming)
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	ctx := context.Background()

	group, err := processkit.NewGroup()
	if err != nil {
		log.Fatal(err)
	}
	defer group.Close()

	// Stream a child's output line by line over a merged channel.
	proc, err := group.Start(ctx,
		processkit.Command("journalctl", "-f"),
		processkit.StreamLines())
	if err != nil {
		log.Fatal(err)
	}

	// Range until the channel closes (the process produced all its output);
	// cancelling ctx tears the tree down and closes the channel early.
	for line := range proc.Lines() {
		if line.Stream == processkit.StreamStderr {
			fmt.Fprintln(log.Writer(), line.Text)
			continue
		}
		fmt.Println(line.Text)
	}
}

func NewGroup

func NewGroup(opts ...GroupOption) (*Group, error)

NewGroup creates an empty process group. Pass WithMemoryMax, WithMaxProcesses, or WithCPUQuota to cap the whole tree's resources; those caps are applied to the OS container now (a Windows Job Object, or a Linux cgroup v2 subtree at the real cgroup-v2 root). If a cap is invalid, or the active mechanism can't enforce it (macOS/BSD, the Linux process-group fallback, or a Linux cgroup that isn't the real root), NewGroup returns a *ResourceLimitError (matching ErrResourceLimit) rather than handing back a silently-unbounded group.

Example (Limits)
package main

import (
	"context"
	"errors"
	"log"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	ctx := context.Background()

	// Cap the whole tree's resources. A Windows Job Object enforces all three; a
	// mechanism without a whole-tree limit primitive (every Unix backend today)
	// returns a *ResourceLimitError rather than a silently-unbounded group.
	group, err := processkit.NewGroup(
		processkit.WithMemoryMax(512*1024*1024), // 512 MiB
		processkit.WithMaxProcesses(64),
		processkit.WithCPUQuota(1.5), // 1.5 cores' worth
	)
	if errors.Is(err, processkit.ErrResourceLimit) {
		log.Printf("limits unenforceable here: %v", err)
		return
	}
	if err != nil {
		log.Fatal(err)
	}
	defer group.Close()
	_, _ = group.Start(ctx, processkit.Command("my-worker"))
}
Example (WithLogger)
package main

import (
	"context"
	"log"
	"log/slog"
	"os"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	// On a Group the logger is a GroupOption (the package-level WithLogger), not a
	// method — it logs each child's spawn/exit plus group teardown and shutdown.
	logger := slog.New(slog.NewJSONHandler(os.Stderr, nil))
	group, err := processkit.NewGroup(processkit.WithLogger(logger))
	if err != nil {
		log.Fatal(err)
	}
	defer group.Close() // logs "terminating every process in the group"
	_, _ = group.Start(context.Background(), processkit.Command("my-worker"))
}

func (*Group) Adopt

func (g *Group) Adopt(p *os.Process) error

Adopt pulls an externally-started process — one you started yourself (e.g. via os/exec) and have NOT yet waited on — into the group, so it is torn down when the group is closed. Pass its os.Process (e.g. exec.Cmd.Process).

Containment is best-effort: on Windows the process joins the Job Object; in a POSIX process group it becomes a group leader when it can (capturing its future descendants too), or is tracked individually if it has already exec'd. A process that has already exited is a benign success. The adopted process is not listed by Group.Processes (which reports the processes you Started through the group).

An adopted process counts against WithMaxProcesses: on a group capped at its active-process limit, Adopt is refused (the process is not pulled in) and returns the assignment error.

func (*Group) Close

func (g *Group) Close() error

Close hard-kills every process in the group (grandchildren included) and releases the container. Idempotent.

func (*Group) Mechanism

func (g *Group) Mechanism() Mechanism

Mechanism reports the containment mechanism the group is using.

func (*Group) Processes

func (g *Group) Processes() []*RunningProcess

Processes returns a snapshot of the live process handles in the group. A process that has exited is omitted. Use it to inspect the group, WaitAll over its processes without having retained every Group.Start handle yourself, or read their pids (range and call RunningProcess.Pid). Adopted processes are not included (they have no handle).

func (*Group) Resume

func (g *Group) Resume() error

Resume thaws a group frozen by Group.Suspend.

func (*Group) SampleStats

func (g *Group) SampleStats(ctx context.Context, interval time.Duration) <-chan GroupStats

SampleStats returns a channel of resource snapshots sampled every interval (clamped to a 1ms minimum), starting with an immediate sample. The channel is closed when ctx is cancelled or a sample fails. Drain it until it closes, or cancel ctx, so the sampler doesn't block on a slow reader. Cancelling ctx is the only way to stop the sampler — Group.Close does not (cancel ctx, then Close).

func (*Group) Shutdown

func (g *Group) Shutdown(ctx context.Context, opts ...ShutdownOption) error

Shutdown tears the group down gracefully: on Unix it sends SIGTERM to the whole tree, waits up to the grace period (default 5s; set with ShutdownGrace) for members to exit, then hard-kills the survivors and closes the container. On Windows there is no signal tier, so it is an immediate atomic kill (the grace is ignored). Idempotent via Group.Close.

func (*Group) Signal

func (g *Group) Signal(sig Signal) error

Signal sends sig to every process in the group (and everything they spawned). SignalKill works on every platform — it is the atomic whole-tree kill, like Group.Close; the other signals are delivered on Unix but return ErrUnsupported on Windows, whose Job Object has no signal tier. Signalling a group whose members have already exited is a no-op success.

Example
package main

import (
	"context"
	"log"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	ctx := context.Background()
	group, err := processkit.NewGroup()
	if err != nil {
		log.Fatal(err)
	}
	defer group.Close()

	server, err := group.Start(ctx, processkit.Command("my-server"))
	if err != nil {
		log.Fatal(err)
	}
	_ = server

	// Ask the whole tree to reload its config (SIGHUP). SignalKill works on every
	// platform; the other signals are Unix-only and return ErrUnsupported on Windows.
	if err := group.Signal(processkit.SignalHup); err != nil {
		log.Println("reload not delivered:", err) // e.g. ErrUnsupported on Windows
	}

	// Pause and resume the whole tree (Unix; ErrUnsupported on Windows).
	_ = group.Suspend()
	_ = group.Resume()
}

func (*Group) Start

func (g *Group) Start(ctx context.Context, cmd *Cmd, opts ...StartOption) (*RunningProcess, error)

Start runs cmd as a member of the group and returns a live handle. The process keeps running until it exits, is killed, or the group is closed.

Start uses cmd's program, arguments, working directory, and environment, but NOT its WithTimeout / WithOkCodes / WithRunner / WithStdin — those configure the capture verbs (Cmd.Output etc.), not a live start. Feed a started process's stdin with the WithStdin start option instead. Bound a started process with the ctx you pass and tear it down with RunningProcess.Kill or Group.Close.

By default a group-started process discards its stdout/stderr. Pass stream [StartOption]s to observe its I/O: StreamLines (then range RunningProcess.Lines), the OnStdoutLine / OnStderrLine callbacks, the WithStdout / WithStderr tees, and WithStdin for interactive input.

func (*Group) Stats

func (g *Group) Stats() (GroupStats, error)

Stats returns a whole-tree resource snapshot of the group. After Group.Close it reports a zero snapshot (no error): the tree has been torn down, so there is nothing left to count.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	ctx := context.Background()
	group, err := processkit.NewGroup()
	if err != nil {
		log.Fatal(err)
	}
	defer group.Close()
	_, _ = group.Start(ctx, processkit.Command("my-worker"))

	st, err := group.Stats()
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("%d live processes\n", st.ActiveProcesses())
	if cpu, ok := st.CPUTime(); ok { // available on Windows; count-only on POSIX groups
		fmt.Printf("CPU: %s\n", cpu)
	}
}

func (*Group) Suspend

func (g *Group) Suspend() error

Suspend freezes every process in the group; Group.Resume thaws them. On Unix this is SIGSTOP / SIGCONT to the whole tree. On Windows it returns ErrUnsupported (a Job Object has no freeze). A suspended group is still killed by Group.Close; resume before Group.Shutdown, as a frozen tree can't act on SIGTERM.

type GroupOption

type GroupOption func(*groupConfig)

GroupOption configures a Group at creation — currently the whole-tree resource caps WithMemoryMax, WithMaxProcesses, and WithCPUQuota. Limits are applied to the OS container when the group is created; they cannot be changed afterwards.

func WithCPUQuota

func WithCPUQuota(cores float64) GroupOption

WithCPUQuota caps the tree's CPU at cores cores' worth (0.5 = half a core, 2.0 = two cores; must be finite and > 0). On Windows the hard cap is expressed against total system CPU and so is approximate; a quota at or above the core count saturates at 100%. See WithMemoryMax for the unsupported-mechanism behaviour.

func WithLogger

func WithLogger(logger *slog.Logger) GroupOption

WithLogger configures a Group to emit structured log/slog events — child spawn and exit, group teardown, graceful shutdown, and adoption. The default is no logging; pass nil to disable. Events carry the program name, pid, mechanism, outcome, and durations, but NEVER arguments, environment, working directory, or output. (As a GroupOption it sits alongside WithMemoryMax etc.)

func WithMaxProcesses

func WithMaxProcesses(n uint32) GroupOption

WithMaxProcesses caps the number of live processes in the tree at n (> 0). On Windows the Job Object's active-process limit refuses the process that would exceed it, so a Group.Start (or Group.Adopt) past the cap fails — the process is rejected, never silently admitted. On a Linux cgroup the cap is the cgroup's pids.max: a fork that would push the tree past n fails (EAGAIN), so the tree can't grow beyond the cap. See WithMemoryMax for the unsupported-mechanism behaviour.

func WithMemoryMax

func WithMemoryMax(bytes uint64) GroupOption

WithMemoryMax caps the whole tree's memory at bytes (which must be > 0). Enforced by a Windows Job Object; on a mechanism without a whole-tree limit primitive NewGroup returns a *ResourceLimitError rather than an unbounded group.

type GroupStats

type GroupStats struct {
	// contains filtered or unexported fields
}

GroupStats is a whole-tree resource snapshot from Group.Stats. CPU time and peak memory are reported only by the Job Object backend (Windows); the POSIX process-group backend reports the active count only (no kernel accumulator without a cgroup), so their accessors return false there.

func (GroupStats) ActiveProcesses

func (s GroupStats) ActiveProcesses() int

ActiveProcesses returns the number of live processes (or contained process groups, on the POSIX backend) in the group.

func (GroupStats) CPUTime

func (s GroupStats) CPUTime() (time.Duration, bool)

CPUTime returns the group's cumulative CPU time (user+system) and whether the backend could read it.

func (GroupStats) PeakMemoryBytes

func (s GroupStats) PeakMemoryBytes() (uint64, bool)

PeakMemoryBytes returns the group's peak memory and whether the backend could read it. The figure is not comparable across platforms (Windows reports peak committed memory) and is not the sum of per-process peaks.

type Invocation

type Invocation struct {
	Program string
	Args    []string
	Dir     string   // working directory; "" inherits the parent's
	Env     []string // full environment as "KEY=VALUE"; nil inherits the parent's
	OkCodes []int    // exit codes treated as success in addition to 0
	Timeout time.Duration
	Stdin   io.Reader // standard input for the run, or nil for none (capture verbs only)
}

Invocation is the immutable description of a single run, handed to a ProcessRunner. It is what a test double matches on.

Build it with keyed literals; new fields may be added in later versions. The slices (Args, Env, OkCodes) are borrowed — a runner must not retain or mutate them.

type JobRunner

type JobRunner struct {
	// contains filtered or unexported fields

} // log is wired by Cmd.WithLogger; zero value is silent

JobRunner is the real runner. Each command runs inside its own private, kill-on-drop job (a Windows Job Object, or a POSIX process group) so the whole tree — grandchildren included — dies with the run. The zero value is ready to use.

func (JobRunner) Output

func (r JobRunner) Output(ctx context.Context, inv Invocation) (*Result, error)

Output runs inv to completion inside a fresh job and captures stdout/stderr.

Timeout semantics: the run's own deadline (Cmd.WithTimeout → inv.Timeout) is *captured* — the Result reports Outcome.TimedOut and no error. The caller's context ending the run (cancelled, or its own deadline elapsed) is instead an error (*CancelError); it wins over the run's own timeout and over a natural exit. A process that exits exactly as its own deadline fires is reported as a timeout (the ambiguity is resolved in favour of the deadline).

type Line

type Line struct {
	Stream StreamID
	Text   string
}

Line is one line of streamed output, tagged with the stream it came from. The trailing newline (and a preceding carriage return) is stripped from Text.

type Mechanism

type Mechanism int

Mechanism reports which operating-system primitive contains a process tree — the basis of processkit's whole-tree, no-orphan teardown guarantee. It is observable so callers can tell *how* containment is enforced, in particular when it is the weaker POSIX process group rather than a cgroup or Job Object.

const (
	// MechanismUnknown is the zero value: no containment has been determined yet.
	MechanismUnknown Mechanism = iota

	// MechanismJobObject is a Windows Job Object with
	// JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE: closing or terminating the job reaps
	// every process in the tree, grandchildren included.
	MechanismJobObject

	// MechanismProcessGroup is a POSIX process group, torn down via killpg — the
	// mechanism on every Unix (Linux, macOS, the BSDs) today. Weaker than a Job
	// Object: a child that calls setsid escapes it.
	MechanismProcessGroup

	// MechanismCgroupV2 is a Linux cgroup v2 subtree: children are placed atomically
	// at clone (clone3 CLONE_INTO_CGROUP, kernel ≥ 5.7) and the tree is torn down
	// with cgroup.kill. It is preferred on Linux wherever a cgroup can be made, and
	// additionally enforces [WithMemoryMax] / [WithMaxProcesses] / [WithCPUQuota]
	// where the controllers are delegated (the real cgroup-v2 root). Linux degrades
	// to MechanismProcessGroup where no cgroup is available.
	MechanismCgroupV2
)

func (Mechanism) String

func (m Mechanism) String() string

String returns the mechanism's name (e.g. "JobObject").

type NotFoundError

type NotFoundError struct {
	Program  string
	Searched []string
}

NotFoundError reports that a program could not be resolved. Searched holds the PATH directories that were checked, when known. Matches errors.Is(err, ErrNotFound).

func (*NotFoundError) Error

func (e *NotFoundError) Error() string

Error renders the failure, naming how many PATH directories were searched (not their contents, to avoid leaking the environment).

func (*NotFoundError) Is

func (e *NotFoundError) Is(target error) bool

Is matches the ErrNotFound sentinel.

type NotReadyError

type NotReadyError struct {
	Program string        // the process that did not become ready
	Probe   string        // which probe: "line", "port", or "predicate"
	Timeout time.Duration // the probe deadline that elapsed
	Cause   error         // the last underlying failure (e.g. the last dial error), if any
}

NotReadyError reports that a readiness probe (RunningProcess.WaitForLine, RunningProcess.WaitForPort, RunningProcess.WaitFor) did not pass — the line never appeared, the port never accepted, the predicate never held, or the process exited before becoming ready. Matches errors.Is(err, ErrNotReady).

It is distinct from ErrTimeout: a probe deadline is the caller's own readiness budget, not the run's Cmd.WithTimeout, and a failed probe does NOT kill the process — the caller decides what happens next.

func (*NotReadyError) Error

func (e *NotReadyError) Error() string

Error renders the readiness failure.

func (*NotReadyError) Is

func (e *NotReadyError) Is(target error) bool

Is matches the ErrNotReady sentinel.

func (*NotReadyError) Unwrap

func (e *NotReadyError) Unwrap() error

Unwrap exposes the last underlying failure (if any) to errors.Is / errors.As.

type Outcome

type Outcome struct {
	// contains filtered or unexported fields
}

Outcome describes how a process ended: a normal exit with a code, a kill by a signal (Unix only), or a timeout. Inspect it via the accessors; a missing exit code is reported as (0, false), never a fabricated -1 sentinel.

Outcomes are normally produced by running a command; for a fake ProcessRunner or a custom runner, build one with Exited, Signalled, or TimedOut (and a whole Result with NewResult). The zero value is exited(0) — build outcomes with the constructors rather than relying on a bare Outcome{}.

func Exited

func Exited(code int) Outcome

Exited builds an Outcome for a normal termination with the given exit code. It is a construction seam for fake [ProcessRunner]s (see NewResult); real runs produce outcomes themselves.

func Signalled

func Signalled(signal int) Outcome

Signalled builds an Outcome for a Unix signal kill with the given signal number — for fakes modelling a signal kill. (A real Windows kill is reported as exited, never signalled.)

func TimedOut

func TimedOut() Outcome

TimedOut builds an Outcome for a run killed by its own deadline — for fakes.

func WaitAll

func WaitAll(ctx context.Context, procs ...*RunningProcess) ([]Outcome, error)

WaitAll waits for every process to exit, returning their [Outcome]s in input order. It returns early with ctx's error if the context is done first, or with the first process's wait error — in which case the returned slice is nil (a wait error is a rare reap failure, not a non-zero exit, which is carried in the Outcome).

func WaitAny

func WaitAny(ctx context.Context, procs ...*RunningProcess) (int, Outcome, error)

WaitAny waits for whichever of the given processes exits first, returning its index, its Outcome, and any wait error. It returns early with ctx's error if the context is done first. The processes are only observed — the losers stay usable afterwards.

func (Outcome) Code

func (o Outcome) Code() (int, bool)

Code returns the exit code and true for a normal exit; (0, false) for a signal kill or a timeout.

func (Outcome) Signal

func (o Outcome) Signal() (int, bool)

Signal returns the signal number and true when the process was killed by a known signal (Unix only); (0, false) otherwise.

func (Outcome) String

func (o Outcome) String() string

String renders the outcome, e.g. "exited(0)", "signalled(9)", "timedOut".

func (Outcome) TimedOut

func (o Outcome) TimedOut() bool

TimedOut reports whether the run was killed by its own timeout.

type OverflowPolicy

type OverflowPolicy uint8

OverflowPolicy decides what a streamed line channel does when it is full and the consumer is not keeping up. See OnOverflow.

const (
	// OverflowBlock applies backpressure: a full channel blocks the drain, which
	// eventually blocks the process on a full OS pipe. No line is lost. This is the
	// default. Cancelling the start context always releases a blocked drain.
	OverflowBlock OverflowPolicy = iota
	// OverflowDropNewest drops an incoming line when the channel is full rather
	// than blocking, so a slow consumer can never stall the process. The number of
	// dropped lines is reported by [RunningProcess.DroppedLines].
	OverflowDropNewest
)

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline is a shell-free chain of commands wired stdout→stdin, like a | b | c. Every stage runs inside one shared kill-on-drop container, so the whole chain lives and dies together. Build it with Pipe and finish with a verb (Pipeline.Output, Pipeline.Run, Pipeline.ExitCode, Pipeline.Probe).

The connection between stages is a real OS pipe (no shell, no temp files): each stage's standard output is the next stage's standard input. Only the LAST stage's stdout is captured; every stage's stderr is captured for failure attribution. The first stage's stdin can be fed with Pipeline.WithStdin.

Like Cmd, a Pipeline is a value built by chainable WithX methods (each returning a new, independent *Pipeline) and is safe to reuse and re-run.

func Pipe

func Pipe(stages ...*Cmd) *Pipeline

Pipe builds a pipeline from two or more stages, wiring each stage's stdout to the next stage's stdin. A pipeline needs at least two stages; the verbs return ErrTooFewStages for fewer. Each stage's program, arguments, Cmd.WithDir, Cmd.WithEnv, Cmd.WithTimeout (per-stage deadline), Cmd.WithOkCodes, and Cmd.WithUncheckedInPipe are honoured; its Cmd.WithRunner, Cmd.WithRetry, and Cmd.WithStdin are not (a pipeline always runs real processes and wires each stage's stdin from the previous stage — feed the chain's head with Pipeline.WithStdin instead; retry the whole chain).

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	ctx := context.Background()

	// Shell-free `grep error log.txt | sort | uniq -c` — one kill-on-drop chain.
	// The captured stdout is the last stage's; a failure is attributed to the
	// first failing stage.
	counts, err := processkit.Pipe(
		processkit.Command("grep", "error", "log.txt"),
		processkit.Command("sort"),
		processkit.Command("uniq", "-c"),
	).Run(ctx)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(counts)
}

func (*Pipeline) ExitCode

func (p *Pipeline) ExitCode(ctx context.Context) (int, error)

ExitCode runs the chain and returns the pipefail-attributed exit code. A chain with no exit code (a timeout or signal kill) is an error.

func (*Pipeline) Output

func (p *Pipeline) Output(ctx context.Context) (*Result, error)

Output runs the whole chain and returns a single Result folded by pipefail attribution: stdout is the last stage's, while the program / stderr / exit outcome are the first failing (non-exempt) stage's — see the package overview. A non-zero exit anywhere is data in the Result, not an error; only a spawn failure, a cancelled context, the caller's context deadline, or fewer than two stages (ErrTooFewStages) errors.

Because a chain has no single program, Result.Program and Result.Args on a pipeline Result reflect the *attributed* stage (the blamed one, or the last on success) — except a whole-chain timeout, whose Program is the joined "a | b | c" chain name. Stdout is always the last stage's.

func (*Pipeline) Probe

func (p *Pipeline) Probe(ctx context.Context) (bool, error)

Probe runs the chain as a yes/no predicate on the pipefail-attributed code: exit 0 → true, exit 1 → false, anything else → error. OkCodes does not apply.

func (*Pipeline) Run

func (p *Pipeline) Run(ctx context.Context) (string, error)

Run requires the chain to succeed and returns the last stage's stdout as text with trailing whitespace trimmed. A pipefail-attributed failure is an error.

func (*Pipeline) WithLogger

func (p *Pipeline) WithLogger(logger *slog.Logger) *Pipeline

WithLogger returns a copy of the pipeline that emits structured log/slog events when the chain starts and finishes (the finish event carries the attributed outcome and elapsed time). The default is no logging; pass nil to disable. As always, the stages' arguments and environment are never logged.

func (*Pipeline) WithStdin

func (p *Pipeline) WithStdin(r io.Reader) *Pipeline

WithStdin returns a copy of the pipeline whose first stage reads its standard input from r. Inner stages always read the previous stage's output, so r feeds only the head of the chain. r is consumed as the chain runs; to re-run a pipeline, supply a re-readable source (e.g. a fresh strings.Reader each time).

func (*Pipeline) WithTimeout

func (p *Pipeline) WithTimeout(d time.Duration) *Pipeline

WithTimeout returns a copy of the pipeline bounded by a whole-chain deadline. At the deadline the entire chain is killed and the Result reports Outcome.TimedOut with no captured stdout (the chain was abandoned) and the joined chain name as the program. This is distinct from a stage's own Cmd.WithTimeout, which kills only that stage and is attributed to it.

type ProcessRunner

type ProcessRunner interface {
	Output(ctx context.Context, inv Invocation) (*Result, error)
}

ProcessRunner runs a command to completion and returns the captured Result. It is processkit's dependency-injection and test seam: a fake runner needs no real subprocess. A non-zero exit is reported in the Result, not as an error.

type ResourceLimitError

type ResourceLimitError struct {
	Limit  string // which cap: "memory", "processes", "cpu", or "" for the whole request
	Reason string // why it could not be enforced (always set)
	Cause  error  // the underlying OS error, if any (nil for a rejected value)
}

ResourceLimitError reports that a whole-tree resource cap requested via NewGroupWithMemoryMax, WithMaxProcesses, or WithCPUQuota — could not be enforced. Either the value was invalid, or the active mechanism has no whole-tree limit primitive: a Windows Job Object enforces all three, but every Unix backend here does not (a Linux cgroup-v2 backend is planned), so a limit requested there fails fast. An unenforced limit is no protection, so this is raised rather than handing back a silently-unbounded group. Matches errors.Is(err, ErrResourceLimit).

func (*ResourceLimitError) Error

func (e *ResourceLimitError) Error() string

Error renders the limit failure.

func (*ResourceLimitError) Is

func (e *ResourceLimitError) Is(target error) bool

Is matches the ErrResourceLimit sentinel.

func (*ResourceLimitError) Unwrap

func (e *ResourceLimitError) Unwrap() error

Unwrap exposes the underlying OS error (if any) to errors.Is / errors.As.

type RestartPolicy

type RestartPolicy uint8

RestartPolicy decides when a Supervisor re-runs its command.

const (
	// RestartOnCrash restarts only after a crash — a clean run ends supervision
	// ([StoppedPolicySatisfied]). This is the default (the zero value). A crash is
	// any run that is not a success ([Result.Success]): a rejected exit code, a
	// timeout, a signal kill, or a spawn failure. (Exit code 0 is always a success;
	// [Cmd.WithOkCodes] widens which other codes count.)
	RestartOnCrash RestartPolicy = iota
	// RestartAlways restarts after every run, clean or not. It loops until a
	// [Supervisor.StopWhen] predicate or [Supervisor.WithMaxRestarts] stops it.
	RestartAlways
	// RestartNever runs the command exactly once.
	RestartNever
)

func (RestartPolicy) String

func (p RestartPolicy) String() string

String renders the policy.

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result is the captured result of a run. A non-zero exit is *data* here, not an error: it is reported in the Outcome and only turned into an error by the success-requiring verbs (Cmd.Run, Cmd.ExitCode, Cmd.Probe) or by Result.Err.

func NewResult

func NewResult(inv Invocation, outcome Outcome, stdout, stderr []byte) *Result

NewResult builds a Result for a fake ProcessRunner to return from its Output method — the construction seam the dependency-injection model needs (a real run produces a Result through the verbs; a double or a custom runner builds one here). The program, args, and ok-codes are taken from inv; stdout is stored as produced and stderr is normalized to \n. The mechanism is MechanismUnknown and the duration is zero (set neither for a fake). Build the outcome with Exited, Signalled, or TimedOut.

func (*Result) Args

func (r *Result) Args() []string

Args returns a copy of the arguments the program was run with.

func (*Result) Code

func (r *Result) Code() (int, bool)

Code returns the exit code and true for a normal exit; (0, false) otherwise.

func (*Result) Duration

func (r *Result) Duration() time.Duration

Duration returns how long the run took.

func (*Result) Err

func (r *Result) Err() error

Err returns nil when the run was a success, otherwise an *ExitError carrying the captured streams (as sanitized text). The success-requiring verbs report this.

func (*Result) Mechanism

func (r *Result) Mechanism() Mechanism

Mechanism returns the containment mechanism that was in effect for the run.

func (*Result) Outcome

func (r *Result) Outcome() Outcome

Outcome returns how the process ended.

func (*Result) Program

func (r *Result) Program() string

Program returns the program that was run.

func (*Result) Stderr

func (r *Result) Stderr() string

Stderr returns the captured stderr as text (normalized to \n).

func (*Result) Stdout

func (r *Result) Stdout() string

Stdout returns the captured stdout as text, with line endings normalized to \n. Use Result.StdoutBytes for exact bytes.

func (*Result) StdoutBytes

func (r *Result) StdoutBytes() []byte

StdoutBytes returns a copy of the captured stdout, exactly as produced (line endings preserved). Use this for binary output; use Result.Stdout for text.

func (*Result) Success

func (r *Result) Success() bool

Success reports whether the run succeeded: a clean exit whose code is 0 or in the configured OkCodes set.

func (*Result) TimedOut

func (r *Result) TimedOut() bool

TimedOut reports whether the run was killed by its own timeout.

type RunProfile

type RunProfile struct {
	// contains filtered or unexported fields
}

RunProfile summarises one run's resource usage, sampled over its lifetime by RunningProcess.Profile.

func (RunProfile) AvgCPU

func (p RunProfile) AvgCPU() (float64, bool)

AvgCPU returns the average CPU usage in cores (CPU time / wall duration); ok is false when there was no CPU sample or the duration was zero. It can exceed 1.0 for a multi-threaded process.

func (RunProfile) CPUTime

func (p RunProfile) CPUTime() (time.Duration, bool)

CPUTime returns the cumulative CPU time at the last sample, and whether any CPU sample succeeded.

func (RunProfile) Duration

func (p RunProfile) Duration() time.Duration

Duration returns the wall-clock time from start to exit.

func (RunProfile) ExitCode

func (p RunProfile) ExitCode() (int, bool)

ExitCode returns the run's exit code and true, or (0, false) if it was killed by a timeout or signal.

func (RunProfile) PeakMemoryBytes

func (p RunProfile) PeakMemoryBytes() (uint64, bool)

PeakMemoryBytes returns the maximum peak-RSS observed across samples.

func (RunProfile) Samples

func (p RunProfile) Samples() int

Samples returns how many times the run was sampled.

type RunningProcess

type RunningProcess struct {
	// contains filtered or unexported fields
}

RunningProcess is a live handle to a process started in a Group. Wait for it to exit, read its pid, kill it, or — when the Group.Start was given streaming options — read its output line by line via RunningProcess.Lines (or the OnStdoutLine / OnStderrLine callbacks and WithStdout / WithStderr tees).

func (*RunningProcess) CPUTime

func (p *RunningProcess) CPUTime() (time.Duration, bool)

CPUTime returns this process's cumulative CPU time (user+system) and whether it could be read — available on Linux and Windows, not on macOS / the BSDs. It is unavailable once the process has exited (its pid may have been recycled).

func (*RunningProcess) DroppedLines

func (p *RunningProcess) DroppedLines() int

DroppedLines reports how many lines the OverflowDropNewest policy discarded because the RunningProcess.Lines channel was full. It counts policy drops only and is always 0 under the default OverflowBlock (a line lost when cancellation or teardown releases a backpressured send is not counted). Read it after the channel has closed for the final count.

func (*RunningProcess) Elapsed

func (p *RunningProcess) Elapsed() time.Duration

Elapsed returns the wall-clock time since the process was started. Unlike RunProfile.Duration it is not frozen at exit — it keeps increasing after the process ends (it is "time since start", not "run duration").

func (*RunningProcess) Kill

func (p *RunningProcess) Kill() error

Kill terminates this process. Its descendants are reaped when the owning Group is closed; killing one process does not tear down the whole group.

func (*RunningProcess) Lines

func (p *RunningProcess) Lines() <-chan Line

Lines returns the merged stdout/stderr line channel for a process started with StreamLines; each Line is tagged with its StreamID. The channel closes once both streams reach EOF (the process has produced all its output). If the start did not enable streaming, Lines returns an already-closed channel, so ranging over it is always safe. Drain it until it closes, or cancel the start context, so a slow reader can't stall the process under OverflowBlock.

RunningProcess.WaitForLine consumes this same channel, so don't range Lines concurrently with a readiness probe — they would steal lines from each other.

func (*RunningProcess) PeakMemoryBytes

func (p *RunningProcess) PeakMemoryBytes() (uint64, bool)

PeakMemoryBytes returns this process's peak resident memory and whether it could be read — available on Linux and Windows, not on macOS / the BSDs. It is unavailable once the process has exited (its pid may have been recycled).

func (*RunningProcess) Pid

func (p *RunningProcess) Pid() int

Pid returns the process id, or 0 if it never started.

func (*RunningProcess) Profile

func (p *RunningProcess) Profile(ctx context.Context, interval time.Duration) (RunProfile, error)

Profile waits for the process to exit, sampling its CPU time and peak memory every interval (clamped to a 1ms minimum), and returns a RunProfile. Like RunningProcess.Wait it returns ctx's bare error if ctx is done first (the process keeps running). CPU and memory are unavailable on macOS / the BSDs, so a profile there has only the duration, exit code, and sample count.

func (*RunningProcess) Wait

func (p *RunningProcess) Wait(ctx context.Context) (Outcome, error)

Wait blocks until the process exits and returns its Outcome, or returns early with ctx's bare error (context.Canceled / context.DeadlineExceeded, not an ErrCancelled) if the context is done first — in which case the process keeps running; kill it via the owning Group or RunningProcess.Kill.

func (*RunningProcess) WaitFor

func (p *RunningProcess) WaitFor(ctx context.Context, check func(context.Context) bool, within time.Duration) error

WaitFor waits until check returns true, re-invoking it on a ~50ms cadence (and immediately first). check should be a cheap, non-blocking readiness test — an HTTP /health request, a file existing, a database accepting connections. It is passed ctx so it can honour cancellation. Like the other probes it does not kill the process: on the within deadline it returns a *NotReadyError; if the process exits first it returns one promptly; if ctx is cancelled it returns ctx's bare error.

func (*RunningProcess) WaitForLine

func (p *RunningProcess) WaitForLine(ctx context.Context, match func(string) bool, within time.Duration) (string, error)

WaitForLine waits until a line of the process's merged output (stdout or stderr) satisfies match, and returns that line. It consumes the RunningProcess.Lines channel up to and including the matched line, so probe first, then continue ranging Lines for the rest — don't read Lines concurrently with this.

The process must have been started with StreamLines, and you must not range RunningProcess.Lines concurrently with this (they would steal lines from each other). WaitForLine does not kill the process: on the within deadline it returns a *NotReadyError (the process keeps running); if the output stream ends before a match (the process exited) it returns a *NotReadyError promptly; if ctx is cancelled it returns ctx's bare error. A zero or negative within still checks the already-buffered lines once.

func (*RunningProcess) WaitForPort

func (p *RunningProcess) WaitForPort(ctx context.Context, addr string, within time.Duration) error

WaitForPort waits until a TCP connection to addr (a "host:port" address, IPv4 or bracketed IPv6) succeeds. It dials repeatedly on a ~50ms cadence, trying immediately first, and drops each successful connection at once. It tests the address, not this specific process — if the process dies and something else binds the port, the probe reports ready (use RunningProcess.WaitFor with an identity check if that matters). Like the other probes it does not kill the process: on the within deadline it returns a *NotReadyError carrying the last dial error; if the process exits first it returns a *NotReadyError promptly; if ctx is cancelled it returns ctx's bare error. The deadline may be overrun by up to one connect attempt (≤1s).

Example
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	ctx := context.Background()
	group, err := processkit.NewGroup()
	if err != nil {
		log.Fatal(err)
	}
	defer group.Close()

	server, err := group.Start(ctx, processkit.Command("my-server"))
	if err != nil {
		log.Fatal(err)
	}
	// Wait for the server to accept connections. A probe never kills the process —
	// if it isn't ready in time you get a *NotReadyError and decide what to do.
	if err := server.WaitForPort(ctx, "127.0.0.1:8080", 10*time.Second); err != nil {
		log.Fatal(err)
	}
	fmt.Println("server is accepting connections")
}

type ShutdownOption

type ShutdownOption func(*shutdownConfig)

ShutdownOption configures Group.Shutdown.

func ShutdownGrace

func ShutdownGrace(d time.Duration) ShutdownOption

ShutdownGrace sets how long graceful shutdown waits for members to exit before hard-killing the survivors (Unix only — Windows shutdown is an atomic kill).

type Signal

type Signal struct {
	// contains filtered or unexported fields
}

Signal is a portable signal to deliver to a Group with Group.Signal. Use a curated value (SignalTerm, SignalKill, …) for portability, or RawSignal to pass a raw Unix signal number. Only SignalKill is honoured on every platform; the rest are Unix-only (Windows returns ErrUnsupported). The zero value is an unspecified Signal — Group.Signal rejects it rather than guessing.

func RawSignal

func RawSignal(n int) Signal

RawSignal is the escape hatch for a raw Unix signal number not in the curated set (e.g. SIGWINCH). It is always ErrUnsupported on Windows.

func (Signal) String

func (s Signal) String() string

String renders the signal, e.g. "SIGTERM" or "signal 28".

type StartError

type StartError struct {
	Program string
	Err     error
}

StartError reports a spawn failure that is not a not-found (e.g. a permission error, a bad working directory). It wraps the underlying cause.

func (*StartError) Error

func (e *StartError) Error() string

Error renders the spawn failure.

func (*StartError) Is

func (e *StartError) Is(target error) bool

Is matches the ErrStart sentinel (in addition to the wrapped cause via Unwrap).

func (*StartError) Unwrap

func (e *StartError) Unwrap() error

Unwrap exposes the underlying OS error to errors.Is / errors.As.

type StartOption

type StartOption func(*startConfig)

StartOption configures a single Group.Start — its output streaming, line callbacks, and interactive stdin. See WithStdout, OnStdoutLine, StreamLines, WithStdin, and the other With/On options. The zero set discards the process's output.

func BufferLines

func BufferLines(n int) StartOption

BufferLines sets the capacity of the RunningProcess.Lines channel (default 1024). A value <= 0 selects the default — the channel cannot be made unbuffered this way. It has no effect unless StreamLines is also given.

func OnOverflow

func OnOverflow(p OverflowPolicy) StartOption

OnOverflow sets what the RunningProcess.Lines channel does when it is full (default OverflowBlock). It governs the channel only — the OnStdoutLine / OnStderrLine callbacks always apply backpressure and never drop. It has no effect unless StreamLines is also given.

func OnStderrLine

func OnStderrLine(fn func(string)) StartOption

OnStderrLine registers a callback invoked for each line of standard error, in order, on a background goroutine. See OnStdoutLine for the blocking contract.

func OnStdoutLine

func OnStdoutLine(fn func(string)) StartOption

OnStdoutLine registers a callback invoked for each line of standard output, in order, on a background goroutine. The callback runs inline with the drain, so a slow callback applies backpressure to the process. It MUST return: unlike a blocked RunningProcess.Lines send (which cancelling the context or closing the group releases), a callback stuck forever cannot be interrupted and will pin the process's drain goroutine. For cancel-safe consumption, prefer StreamLines.

func StreamLines

func StreamLines() StartOption

StreamLines enables the merged line channel returned by RunningProcess.Lines, carrying both stdout and stderr lines tagged by StreamID in arrival order. The channel is closed once both streams reach EOF. You must drain it until it closes (or cancel the start context), otherwise — under the default OverflowBlock — a full channel will stall the process.

func WithDecoder

func WithDecoder(d func(io.Reader) io.Reader) StartOption

WithDecoder transforms each output stream before it is split into lines — the seam for non-UTF-8 console output. Pass a function that wraps a byte reader in a decoding reader (for example one from golang.org/x/text/encoding, which this package does not depend on). It applies to the line callbacks and the RunningProcess.Lines channel, not to the raw WithStdout / WithStderr tees.

func WithMaxLineBytes

func WithMaxLineBytes(n int) StartOption

WithMaxLineBytes bounds how many bytes a single line may accumulate before it is emitted as a fragment (default 1 MiB), so a stream with no newline cannot exhaust memory. It does not abort the stream.

func WithStderr

func WithStderr(w io.Writer) StartOption

WithStderr mirrors the process's standard error, verbatim and undecoded, to w as it is produced. Configuring any stderr consumer also drains stderr in the background, so the process can't deadlock on a full stderr pipe while you read only stdout.

func WithStdin

func WithStdin(r io.Reader) StartOption

WithStdin feeds the process's standard input from r. For interactive input, pass the read end of an io.Pipe and write to its write end over time; close the write end to signal EOF. This is a Group.Start option; to feed the head of a chain, use the Pipeline.WithStdin method instead.

You must eventually close (or exhaust) r once the process no longer needs input. If r is not an *os.File, a background goroutine copies it to the process, and a process that exits while that copy is blocked on r will stall RunningProcess.Wait for up to a few seconds before the pipe is force-closed; an r that never reaches EOF leaks that copy goroutine past Group.Close, which cannot reach it.

func WithStdout

func WithStdout(w io.Writer) StartOption

WithStdout mirrors the process's standard output, verbatim and undecoded, to w as it is produced — for forwarding to os.Stdout, a log file, or a bytes.Buffer to capture the full output alongside streaming.

type StopReason

type StopReason uint8

StopReason explains why a Supervisor concluded.

const (
	// StoppedByPredicate means a [Supervisor.StopWhen] predicate matched a run.
	StoppedByPredicate StopReason = iota
	// StoppedPolicySatisfied means the [RestartPolicy] wanted no further run (a
	// clean run under RestartOnCrash, or any run under RestartNever).
	StoppedPolicySatisfied
	// StoppedRestartsExhausted means the restart budget ([Supervisor.WithMaxRestarts])
	// ran out while the policy still wanted to restart.
	StoppedRestartsExhausted
)

func (StopReason) String

func (r StopReason) String() string

String renders the stop reason.

type StreamID

type StreamID uint8

StreamID identifies which standard stream a Line came from.

const (
	// StreamStdout marks a line that came from the process's standard output.
	StreamStdout StreamID = iota
	// StreamStderr marks a line that came from the process's standard error.
	StreamStderr
)

func (StreamID) String

func (s StreamID) String() string

String renders the stream as "stdout" or "stderr".

type SupervisionOutcome

type SupervisionOutcome struct {
	// Final is the [Result] of the last (terminating) run.
	Final *Result
	// Restarts is the number of re-runs; the first run is not a restart, so
	// Restarts == 2 means three runs total.
	Restarts int
	// Stopped is why supervision ended.
	Stopped StopReason
	// StormPauses is how many failure-storm pauses were taken (0 unless
	// [Supervisor.WithStormPause] is set).
	StormPauses int
}

SupervisionOutcome is what a concluded Supervisor.Run reports. A non-error Run means supervision concluded — inspect Final for the last run's verdict (it may be a failure, e.g. when the restart budget ran out).

type Supervisor

type Supervisor struct {
	// contains filtered or unexported fields
}

Supervisor keeps a command alive: it runs the command, and on a crash re-runs it with capped-exponential backoff, optionally guarding against a crash storm, until a stop condition is met. It is a value built by chainable WithX methods (each returning a new, independent *Supervisor) and run with Supervisor.Run.

Supervision is sequential and single-flight: the next run never starts until the previous one has fully exited and been reaped, so a command's whole tree is always torn down before a restart — no overlap, no orphan.

Example
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ZelAnton/processkit-go"
)

func main() {
	ctx := context.Background()

	// Keep a worker alive: restart it on a crash with exponential backoff, and
	// give up after 10 restarts.
	outcome, err := processkit.Supervise(processkit.Command("my-worker")).
		WithMaxRestarts(10).
		WithBackoff(time.Second, 2).
		Run(ctx)
	if err != nil {
		log.Fatal(err) // the caller's context was cancelled, or a run never started
	}
	fmt.Printf("supervision ended (%s) after %d restarts\n", outcome.Stopped, outcome.Restarts)
}

func Supervise

func Supervise(cmd *Cmd) *Supervisor

Supervise starts building a supervisor for cmd with sensible defaults: RestartOnCrash, unlimited restarts, 200ms base backoff doubling to a 30s cap, jitter on, and the failure-storm guard off. The command runs once per incarnation; its WithTimeout / WithOkCodes / WithEnv etc. apply each run, but its WithRunner and WithRetry do not — set the supervisor's runner with Supervisor.WithRunner, and let the restart policy drive retries.

func (*Supervisor) Run

Run supervises the command until a stop condition is met, returning the SupervisionOutcome. A nil error means supervision *concluded* (not that the command succeeded — check Final). It returns an error only when the caller's context is cancelled or a run that could not even start is the terminating one (a spawn failure with no further restart allowed).

func (*Supervisor) StopWhen

func (s *Supervisor) StopWhen(predicate func(*Result) bool) *Supervisor

StopWhen sets a predicate evaluated on each completed run (clean or not), before the restart policy. The first run it matches ends supervision with StoppedByPredicate. It never sees a run that failed to start.

func (*Supervisor) WithBackoff

func (s *Supervisor) WithBackoff(base time.Duration, factor float64) *Supervisor

WithBackoff sets the base delay before the first restart and the multiplier applied per subsequent restart (delay n = base × factor^n, capped by Supervisor.WithMaxBackoff). A base of 0 restarts immediately; a factor below 1 (or non-finite) is treated as 1 (a constant delay).

func (*Supervisor) WithFailureDecay

func (s *Supervisor) WithFailureDecay(decay time.Duration) *Supervisor

WithFailureDecay sets the half-life of the failure-storm score (default 30s): the score halves every decay of quiet, so failures spaced wider than this never build a storm. Only meaningful with Supervisor.WithStormPause.

func (*Supervisor) WithFailureThreshold

func (s *Supervisor) WithFailureThreshold(threshold float64) *Supervisor

WithFailureThreshold sets the score above which the storm guard pauses (default 5.0). Only meaningful with Supervisor.WithStormPause.

func (*Supervisor) WithJitter

func (s *Supervisor) WithJitter(enabled bool) *Supervisor

WithJitter enables or disables ±50% jitter on every backoff and storm pause (default on). Disable it for deterministic delays in tests.

func (*Supervisor) WithLogger

func (s *Supervisor) WithLogger(logger *slog.Logger) *Supervisor

WithLogger configures the supervisor to emit structured log/slog events — each restart (with its number and backoff delay) and each failure-storm pause. The default is no logging; pass nil to disable. Set a logger on the supervised Cmd (via Cmd.WithLogger) too if you also want per-run spawn/exit events. As always, arguments and environment are never logged.

func (*Supervisor) WithMaxBackoff

func (s *Supervisor) WithMaxBackoff(limit time.Duration) *Supervisor

WithMaxBackoff caps the per-restart backoff delay (default 30s).

func (*Supervisor) WithMaxRestarts

func (s *Supervisor) WithMaxRestarts(n int) *Supervisor

WithMaxRestarts caps the number of restarts over the supervisor's lifetime; after n restarts (n+1 runs) it stops with StoppedRestartsExhausted. n == 0 means a single run, and a negative n is treated as 0. The default (don't call this) is unlimited.

func (*Supervisor) WithRestart

func (s *Supervisor) WithRestart(policy RestartPolicy) *Supervisor

WithRestart sets the restart policy (default RestartOnCrash).

func (*Supervisor) WithRunner

func (s *Supervisor) WithRunner(r ProcessRunner) *Supervisor

WithRunner sets the ProcessRunner each incarnation runs through — the dependency-injection and test seam. The default is a JobRunner (a fresh kill-on-drop job per run).

func (*Supervisor) WithStormPause

func (s *Supervisor) WithStormPause(pause time.Duration) *Supervisor

WithStormPause turns on the failure-storm guard and sets the pause it takes when a crash storm is detected. The guard is off by default; pass 0 to keep it off. See Supervisor.WithFailureDecay and Supervisor.WithFailureThreshold.

Directories

Path Synopsis
internal
sys
Package sys is the platform layer behind processkit's whole-tree, no-orphan containment.
Package sys is the platform layer behind processkit's whole-tree, no-orphan containment.
Package processkittest provides hermetic test doubles for the processkit processkit.ProcessRunner seam.
Package processkittest provides hermetic test doubles for the processkit processkit.ProcessRunner seam.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL