pipe

package
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// IsSIGPIPE is an `ErrorMatcher` that matches `*exec.ExitError`s
	// that were caused by SIGPIPE. The match for `*exec.ExitError`s
	// uses `errors.As()`. Use like
	//
	//     p.Add(IgnoreError(someStage, IsSIGPIPE))
	IsSIGPIPE = IsSignal(syscall.SIGPIPE)

	// IsEPIPE is an `ErrorMatcher` that matches `syscall.EPIPE` using
	// `errors.Is()`. Use like
	//
	//     p.Add(IgnoreError(someStage, IsEPIPE))
	IsEPIPE = IsError(syscall.EPIPE)

	// IsErrClosedPipe is an `ErrorMatcher` that matches
	// `io.ErrClosedPipe` using `errors.Is()`. (`io.ErrClosedPipe` is
	// the error that results from writing to a closed
	// `*io.PipeWriter`.) Use like
	//
	//     p.Add(IgnoreError(someStage, IsErrClosedPipe))
	IsErrClosedPipe = IsError(io.ErrClosedPipe)

	// IsPipeError is an `ErrorMatcher` that matches a few different
	// errors that typically result if a stage writes to a subsequent
	// stage that has stopped reading from its stdin. This is commonly
	// useful with `IgnoreError` for stateless producer stages whose only
	// job is writing output. Stateful producers should continue any
	// producer-owned state updates needed for consistency before
	// returning the pipe error for `IgnoreError` to suppress. Use like
	//
	//     p.Add(IgnoreError(someStage, IsPipeError))
	IsPipeError = AnyError(IsSIGPIPE, IsEPIPE, IsErrClosedPipe)
)
View Source
var FinishEarly = errors.New("finish stage early")

FinishEarly is an error that can be returned by a `Stage` to request that the iteration be ended early (possibly without reading all of its input). This "error" is considered a successful return, and is not reported to the caller.

Functions

func ScanLFTerminatedLines

func ScanLFTerminatedLines(data []byte, atEOF bool) (advance int, token []byte, err error)

ScanLFTerminatedLines is a `bufio.SplitFunc` that splits its input into lines at LF characters (not treating CR specially).

Types

type AppendVars

type AppendVars func(context.Context, []EnvVar) []EnvVar

type ContextValueFunc

type ContextValueFunc func(context.Context) (string, bool)

type ContextValuesFunc

type ContextValuesFunc func(context.Context) []EnvVar

type Env

type Env struct {
	// The directory in which external commands should be executed by
	// default.
	Dir string

	// Vars are extra environment variables. These will override any
	// environment variables that would be inherited from the current
	// process.
	Vars []AppendVars
}

Env represents the environment that a pipeline stage should run in. It is passed to `Stage.Start()`.

type EnvVar

type EnvVar struct {
	// The name of the environment variable.
	Key string
	// The value.
	Value string
}

EnvVar represents an environment variable that will be provided to any child process spawned in this pipeline.

type ErrorFilter

type ErrorFilter func(err error) error

ErrorFilter is a function that can filter errors from `Stage.Wait()`. The original error (possibly nil) is passed in as an argument, and whatever the function returns is the error (possibly nil) that is actually emitted.

type ErrorMatcher

type ErrorMatcher func(err error) bool

ErrorMatcher decides whether its argument matches some class of errors (e.g., errors that we want to ignore). The function will only be invoked for non-nil errors.

func AnyError

func AnyError(ems ...ErrorMatcher) ErrorMatcher

AnyError returns an `ErrorMatcher` that returns true for an error that matches any of the `ems`.

func IsError

func IsError(target error) ErrorMatcher

IsError returns an ErrorIdentifier for the specified target error, matched using `errors.Is()`. Use like

p.Add(pipe.IgnoreError(someStage, IsError(io.EOF)))

func IsSignal

func IsSignal(signal syscall.Signal) ErrorMatcher

IsSIGPIPE returns an `ErrorMatcher` that matches `*exec.ExitError`s that were caused by the specified signal. The match for `*exec.ExitError`s uses `errors.As()`. Note that under Windows this always returns false, because on that platform `WaitStatus.Signaled()` isn't implemented (it is hardcoded to return `false`).

type Event

type Event struct {
	Command string
	Msg     string
	Err     error
	Context map[string]interface{}
}

Event represents anything that could happen during the pipeline execution

type FunctionOption

type FunctionOption func(*goStage)

FunctionOption configures a Function stage.

func ForbidStdin

func ForbidStdin() FunctionOption

ForbidStdin returns a FunctionOption declaring that the stage must not be connected to stdin.

func ForbidStdout

func ForbidStdout() FunctionOption

ForbidStdout returns a FunctionOption declaring that the stage must not be connected to stdout.

func WithStdinRequirement

func WithStdinRequirement(requirement StreamRequirement) FunctionOption

WithStdinRequirement returns a FunctionOption declaring the stage's stdin requirement.

func WithStdoutRequirement

func WithStdoutRequirement(requirement StreamRequirement) FunctionOption

WithStdoutRequirement returns a FunctionOption declaring the stage's stdout requirement.

type InputStream

type InputStream struct {
	// contains filtered or unexported fields
}

InputStream represents `stdin` for a stage, which might or might not need to be closed when the stage is done with it. It usually holds an `io.Reader`, which can be retrieved using `Reader()`. Its `Close()` method closes the reader if necessary (i.e., if the `InputStream` was constructed using `ClosingInput()`. The `Close()` method is idempotent.

A nil `*InputStream` is a valid value. Its `Reader()` method returns `nil` and `Close()` does nothing successfully.

It might seem like `InputStream` should implement `io.Reader` itself. But we want to avoid hiding the dynamic type of the `io.Reader` that is being used as the stdin of a pipeline. That object might be of a type that is subject to optimizations that aren't available for a generic `io.Reader`. For example, it might be an `*os.File` (which can be passed directly to subcommands or to `splice(2)`), or it might implement `io.WriterTo`.

func ClosingInput

func ClosingInput(r io.ReadCloser) *InputStream

The stage is responsible for closing r.

func Input

func Input(r io.Reader) *InputStream

The stage may read from r but must not close it.

func (*InputStream) Close

func (s *InputStream) Close() error

Close closes the underlying reader if necessary. If `s` was constructed using `ClosingInput()`, then close the `io.ReadCloser` that was passed to that function. If `s` is `nil` or was constructed using `Input()`, then do nothing successfully.

func (*InputStream) Reader

func (s *InputStream) Reader() io.Reader

type LinewiseStageFunc

type LinewiseStageFunc func(
	ctx context.Context, env Env, line []byte, stdout *bufio.Writer,
) error

LinewiseStageFunc is a function that can be embedded in a `goStage`. It is called once per line in the input (where "line" can be defined via any `bufio.Scanner`). It should process the line and may write whatever it likes to `stdout`, which is a buffered writer whose contents are forwarded to the input of the next stage of the pipeline. The function needn't write one line of output per line of input.

The function mustn't retain copies of `line`, since it may be overwritten every time the function is called.

The function needn't flush or close `stdout` (this will be done automatically when all of the input has been processed).

If there is an error parsing the input into lines, or if this function returns an error, then the whole pipeline will be aborted with that error. However, if the function returns the special error `pipe.FinishEarly`, the stage will stop processing immediately with a `nil` error value.

The function will be called in a separate goroutine, so it must be careful to synchronize any data access aside from writing to `stdout`.

type NewPipeFn

type NewPipeFn func(opts ...Option) *Pipeline

type NewScannerFunc

type NewScannerFunc func(r io.Reader) (Scanner, error)

NewScannerFunc is used to create a `Scanner` for scanning input that is coming from `r`.

type Option

type Option func(*Pipeline)

Option is a type alias for Pipeline functional options.

func WithDir

func WithDir(dir string) Option

WithDir sets the default directory for running external commands.

func WithEnvVar

func WithEnvVar(key, value string) Option

WithEnvVar appends an environment variable for the pipeline.

func WithEnvVarFunc

func WithEnvVarFunc(key string, valueFunc ContextValueFunc) Option

WithEnvVarFunc appends a context-based environment variable for the pipeline.

func WithEnvVars

func WithEnvVars(b []EnvVar) Option

WithEnvVars appends several environment variable for the pipeline.

func WithEnvVarsFunc

func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option

WithEnvVarsFunc appends several context-based environment variables for the pipeline.

func WithEventHandler

func WithEventHandler(handler func(e *Event)) Option

WithEventHandler sets a handler for the pipeline. Setting one will emit and event for each process.

func WithStagePanicHandler

func WithStagePanicHandler(ph StagePanicHandler) Option

WithStagePanicHandler sets a panic handler for the stages within a pipeline. When a pipeline stage panics, the provided handler will be invoked, allowing the client to handle the panic in whatever way they see fit.

Note:

  • The client is responsible for deciding whether to recover from the panic or panicking again.
  • If a panic handler is not set, the panic will be propagated normally.

func WithStdin

func WithStdin(stdin io.Reader) Option

WithStdin assigns stdin to the first command in the pipeline. The caller retains ownership of stdin; the pipeline will not close it, even if `Start()` returns an error.

If the first stage is a `Command` and stdin is not an `*os.File`, `exec.Cmd` has to copy stdin through an internal goroutine, and `Cmd.Wait()` waits for that copy to finish. This is fine for bounded readers such as `strings.Reader` and `bytes.Reader`, and for `*os.File` values, which are passed to the command directly. But a borrowed, non-file reader that can block forever can also block the pipeline forever if the command exits without consuming all of its stdin. See `TestPipelineIOPipeStdinThatIsNeverClosed` for the known limitation.

func WithStdout

func WithStdout(stdout io.Writer) Option

WithStdout assigns stdout to the last command in the pipeline. The caller retains ownership of stdout; the pipeline will not close it, even if `Start()` returns an error.

func WithStdoutCloser

func WithStdoutCloser(stdout io.WriteCloser) Option

WithStdoutCloser assigns stdout to the last command in the pipeline, and closes stdout when the pipeline is done with it. The pipeline is responsible for closing stdout even if `Start()` returns an error.

type OutputStream

type OutputStream struct {
	// contains filtered or unexported fields
}

OutputStream represents `stdout` for a stage, which might or might not need to be closed when the stage is done with it. It usually holds an `io.Writer`, which can be retrieved using `Writer()`. Its `Close()` method closes the writer if necessary (i.e., if the `OutputStream` was constructed using `ClosingOutput()`. The `Close()` method is idempotent.

A nil `*OutputStream` is a valid value. Its `Writer()` method returns `nil` and `Close()` does nothing successfully.

It might seem like `OutputStream` should implement `io.Writer` itself. But we want to avoid hiding the dynamic type of the `io.Writer` that is being used as the stdout of a pipeline. That object might be of a type that is subject to optimizations that aren't available for a generic `io.Writer`. For example, it might be an `*os.File` (which can be passed directly to subcommands or to `splice(2)`), or it might implement `io.ReaderFrom`.

func ClosingOutput

func ClosingOutput(w io.WriteCloser) *OutputStream

The stage is responsible for closing w.

func Output

func Output(w io.Writer) *OutputStream

The stage may write to w but must not close it.

func (*OutputStream) Close

func (s *OutputStream) Close() error

Close closes the underlying writer if necessary. If `s` was constructed using `ClosingOutput()`, then close the `io.WriteCloser` that was passed to that function. If `s` is `nil` or was constructed using `Output()`, then do nothing successfully.

func (*OutputStream) Writer

func (s *OutputStream) Writer() io.Writer

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline represents a Unix-like pipe that can include multiple stages, including external processes but also and stages written in Go.

func New

func New(options ...Option) *Pipeline

NewPipeline returns a Pipeline struct with all of the `options` applied.

func (*Pipeline) Add

func (p *Pipeline) Add(stages ...Stage)

Add appends one or more stages to the pipeline.

func (*Pipeline) AddWithIgnoredError

func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage)

AddWithIgnoredError appends one or more stages that are ignoring the passed in error to the pipeline.

func (*Pipeline) Output

func (p *Pipeline) Output(ctx context.Context) ([]byte, error)

func (*Pipeline) Run

func (p *Pipeline) Run(ctx context.Context) error

Run starts and waits for the commands in the pipeline. If startup fails, it returns the `Start()` error after `Start()` has performed its failure cleanup.

func (*Pipeline) Start

func (p *Pipeline) Start(ctx context.Context) error

Start starts the commands in the pipeline. If `Start()` exits without an error, `Wait()` must also be called, to allow all resources to be freed.

If `Start()` returns an error, `Wait()` must not be called. Before returning an error, `Start()` cancels and waits for any stages that were started, closes any inter-stage pipes that the pipeline owns, and closes stdout if it was supplied with `WithStdoutCloser()`. Streams supplied with `WithStdin()` or `WithStdout()` remain owned by the caller and are not closed by the pipeline.

func (*Pipeline) Wait

func (p *Pipeline) Wait() error

Wait waits for each stage in the pipeline to exit.

type Scanner

type Scanner interface {
	Scan() bool
	Bytes() []byte
	Err() error
}

Scanner defines the interface (which is implemented by `bufio.Scanner`) that is needed by `AddScannerFunction()`. See `bufio.Scanner` for how these methods should behave.

type Stage

type Stage interface {
	// Name returns the name of the stage.
	Name() string

	// Requirements returns this stage's requirements regarding how its
	// stdin and stdout pipes should be created.
	Requirements() StageRequirements

	// Start starts the stage in the background, in the environment
	// described by `opts.Env`, using `stdin` to provide its input and
	// `stdout` to collect its output. (`stdin.Reader()` or
	// `stdout.Writer()` might be `nil` if the stage is to receive no
	// input or produce no output, which might be the case for the
	// first/last stage in a pipeline.) The stage is responsible for
	// calling `stdin.Close()` and `stdout.Close()`, even if `Start()`
	// returns an error. See the `Stage` type comment for more
	// information about responsibility for closing stdin and stdout.
	//
	// If `Start()` returns without an error, `Wait()` must also be
	// called, to allow all resources to be freed. If `Start()` returns
	// an error, `Wait()` must not be called.
	Start(
		ctx context.Context, opts StageOptions,
		stdin *InputStream, stdout *OutputStream,
	) error

	// Wait waits for the stage to be done, either because it has
	// finished or because it has been killed due to the expiration of
	// the context passed to `Start()`.
	Wait() error
}

Stage is an element of a `Pipeline`. It reads from standard input and writes to standard output.

Who closes stdin and stdout?

A `Stage` is responsible for calling `Close()` on the `InputStream`/`OutputStream` that represent its stdin and stdout as soon as it doesn't need them anymore. That responsibility begins as soon as the stage's `Start()` method is called, and applies regardless of whether `Start()` returns an error. It must close the streams before its `Wait()` method returns. The caller must not close the streams after calling `Start()`.

Closing stdin/stdout tells the previous/next stage that this stage is done reading/writing data, which can affect their behavior. Therefore, it is important for a stage to close each one as soon as it is done with it.

From the point of view of the pipeline as a whole, if stdin is provided by the user (`WithStdin()`), then we don't want the first stage to close it at all. This is arranged by passing a non-closing `InputStream` when it starts that stage. For stdout, it depends on whether the user supplied it using `WithStdout()` or `WithStdoutCloser()`, and in the former case provides the last stage with a non-closing `OutputStream`. Calling `Close()` on a non-closing stream (or even on a nil stream) is a NOP, so the `Stage` can always call `Close()` and doesn't have to worry about whether a stdin/stdout stream is non-closing.

func Command

func Command(command string, args ...string) Stage

Command returns a pipeline `Stage` based on the specified external `command`, run with the given command-line `args`. Its stdin and stdout are handled as usual, and its stderr is collected and included in any `*exec.ExitError` that the command might emit.

func CommandStage

func CommandStage(name string, cmd *exec.Cmd) Stage

CommandStage returns a pipeline `Stage` with the name `name`, based on the specified `cmd`. Its stdin and stdout are handled as usual, and its stderr is collected and included in any `*exec.ExitError` that the command might emit.

func FilterError

func FilterError(s Stage, filter ErrorFilter) Stage

func Function

func Function(name string, f StageFunc, opts ...FunctionOption) Stage

Function returns a pipeline `Stage` that will run a `StageFunc` in a separate goroutine to process the data. See `StageFunc` for more information.

func IgnoreError

func IgnoreError(s Stage, em ErrorMatcher) Stage

IgnoreError creates a stage that acts like `s` except that it ignores any errors that are matched by `em`. Use like

p.Add(pipe.IgnoreError(
    someStage,
    func(err error) bool {
        var myError *MyErrorType
        return errors.As(err, &myError) && myError.foo == 42
    },
)

The second argument can also be one of the `ErrorMatcher`s that are provided by this package (e.g., `IsError(target)`, IsSignal(signal), `IsSIGPIPE`, `IsEPIPE`, `IsPipeError`), or one of the functions from the standard library that has the same signature (e.g., `os.IsTimeout`), or some combination of these (e.g., `AnyError(IsSIGPIPE, os.IsTimeout)`).

`IgnoreError` only suppresses the error returned by the wrapped stage. If a producer ignores pipe errors because a later stage can stop reading early, the producer is still responsible for keeping any producer-owned state, metrics, cursors, or other side effects consistent before returning the ignored error.

func LinewiseFunction

func LinewiseFunction(name string, f LinewiseStageFunc) Stage

LinewiseFunction returns a function-based `Stage`. The input will be split into LF-terminated lines and passed to the function one line at a time (without the LF). The function may emit output to its `stdout` argument. See the definition of `LinewiseStageFunc` for more information.

Note that the stage will emit an error if any line (including its end-of-line terminator) exceeds 64 kiB in length. If this is too short, use `ScannerFunction()` directly with your own `NewScannerFunc` as argument, or use `Function()` directly with your own `StageFunc`.

func Print

func Print(a ...interface{}) Stage

func Printf

func Printf(format string, a ...interface{}) Stage

func Println

func Println(a ...interface{}) Stage

func ScannerFunction

func ScannerFunction(
	name string, newScanner NewScannerFunc, f LinewiseStageFunc,
) Stage

ScannerFunction creates a function-based `Stage`. The function will be passed input, one line at a time, and may emit output. See the definition of `LinewiseStageFunc` for more information.

func WithExtraEnv

func WithExtraEnv(inner Stage, env []EnvVar) Stage

WithExtraEnv returns a Stage that adds env to the environment seen by inner.

type StageFunc

type StageFunc func(ctx context.Context, env Env, stdin io.Reader, stdout io.Writer) error

StageFunc is a function that can be used to power a `goStage`. It should read its input from `stdin` and write its output to `stdout`. The Function stage closes `stdin` and `stdout` after the function returns only when the pipeline gave the stage ownership of those streams; StageFunc implementations should not close them directly.

Neither `stdin` nor `stdout` are necessarily buffered. If the `StageFunc` requires buffering, it needs to arrange that itself.

A later stage can stop reading before this function has written all of its output. In that case, writes to `stdout` can fail with an error matched by `IsPipeError`. If the function only writes output and is otherwise stateless, callers can usually wrap the stage with `IgnoreError(stage, IsPipeError)`. If the function also updates producer-owned state, metrics, cursors, or other side effects that depend on how much output was produced, it should bring those side effects to a consistent point before returning the write error.

A `StageFunc` is run in a separate goroutine, so it must be careful to synchronize any data access aside from reading and writing.

type StageOptions

type StageOptions struct {
	// Env is the environment (working directory and extra environment
	// variables) that the stage should run in.
	Env

	// PanicHandler, if non-nil, is invoked to recover a panic that escapes
	// user code that a stage runs in a library-spawned goroutine,
	// converting it into an error. Stage types that don't run user code in
	// a library-spawned goroutine ignore it.
	PanicHandler StagePanicHandler
}

StageOptions carries everything (other than `ctx`, `stdin`, and `stdout`) that a pipeline passes to `Stage.Start`.

type StagePanicHandler

type StagePanicHandler func(p any) error

StagePanicHandler is a function that handles panics in a pipeline's stages.

type StageRequirements

type StageRequirements struct {
	Stdin  StreamRequirement
	Stdout StreamRequirement
}

StageRequirements describes what a Stage needs from the streams connected to its stdin and stdout. The zero value is correct for stages that are happy with arbitrary io.Reader/io.Writer streams, such as Function stages.

type StreamRequirement

type StreamRequirement int

StreamRequirement describes a `Stage`'s requirement for its stdin or stdout, namely whether it can be anything, whether it should preferably be an `*os.File`, or whether it must be `nil`. The zero value `StreamAcceptAny` is a valid value that indicates that the stage has no particular requirements or preferences for its stdin/stdout, such as a typical `Function` stage.

const (
	// StreamAcceptAny indicates that the stage hasn't declared what
	// kind of stream it requires, maybe even `nil`.
	StreamAcceptAny StreamRequirement = iota

	// StreamPreferFile indicates that the stage prefers the
	// corresponding stream to be backed by an `*os.File` (a real file
	// descriptor), but it can work with any io.Reader/io.Writer.
	StreamPreferFile

	// StreamForbidden indicates that the stage requires the
	// corresponding stream to be nil. It won't read/write the stream
	// or close it.
	StreamForbidden
)

func (StreamRequirement) Validate

func (requirement StreamRequirement) Validate() error

Validate checks that `req` has a valid value and returns an error otherwise.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL