Documentation
¶
Index ¶
- Variables
- func ScanLFTerminatedLines(data []byte, atEOF bool) (advance int, token []byte, err error)
- type AppendVars
- type ContextValueFunc
- type ContextValuesFunc
- type Env
- type EnvVar
- type ErrorFilter
- type ErrorMatcher
- type Event
- type FunctionOption
- type InputStream
- type LinewiseStageFunc
- type NewPipeFn
- type NewScannerFunc
- type Option
- func WithDir(dir string) Option
- func WithEnvVar(key, value string) Option
- func WithEnvVarFunc(key string, valueFunc ContextValueFunc) Option
- func WithEnvVars(b []EnvVar) Option
- func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option
- func WithEventHandler(handler func(e *Event)) Option
- func WithStagePanicHandler(ph StagePanicHandler) Option
- func WithStdin(stdin io.Reader) Option
- func WithStdout(stdout io.Writer) Option
- func WithStdoutCloser(stdout io.WriteCloser) Option
- type OutputStream
- type Pipeline
- func (p *Pipeline) Add(stages ...Stage)
- func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage)
- func (p *Pipeline) Output(ctx context.Context) ([]byte, error)
- func (p *Pipeline) Run(ctx context.Context) error
- func (p *Pipeline) Start(ctx context.Context) error
- func (p *Pipeline) Wait() error
- type Scanner
- type Stage
- func Command(command string, args ...string) Stage
- func CommandStage(name string, cmd *exec.Cmd) Stage
- func FilterError(s Stage, filter ErrorFilter) Stage
- func Function(name string, f StageFunc, opts ...FunctionOption) Stage
- func IgnoreError(s Stage, em ErrorMatcher) Stage
- func LinewiseFunction(name string, f LinewiseStageFunc) Stage
- func Print(a ...interface{}) Stage
- func Printf(format string, a ...interface{}) Stage
- func Println(a ...interface{}) Stage
- func ScannerFunction(name string, newScanner NewScannerFunc, f LinewiseStageFunc) Stage
- func WithExtraEnv(inner Stage, env []EnvVar) Stage
- type StageFunc
- type StageOptions
- type StagePanicHandler
- type StageRequirements
- type StreamRequirement
Constants ¶
This section is empty.
Variables ¶
var ( // IsSIGPIPE is an `ErrorMatcher` that matches `*exec.ExitError`s // that were caused by SIGPIPE. The match for `*exec.ExitError`s // uses `errors.As()`. Use like // // p.Add(IgnoreError(someStage, IsSIGPIPE)) IsSIGPIPE = IsSignal(syscall.SIGPIPE) // IsEPIPE is an `ErrorMatcher` that matches `syscall.EPIPE` using // `errors.Is()`. Use like // // p.Add(IgnoreError(someStage, IsEPIPE)) IsEPIPE = IsError(syscall.EPIPE) // IsErrClosedPipe is an `ErrorMatcher` that matches // `io.ErrClosedPipe` using `errors.Is()`. (`io.ErrClosedPipe` is // the error that results from writing to a closed // `*io.PipeWriter`.) Use like // // p.Add(IgnoreError(someStage, IsErrClosedPipe)) IsErrClosedPipe = IsError(io.ErrClosedPipe) // IsPipeError is an `ErrorMatcher` that matches a few different // errors that typically result if a stage writes to a subsequent // stage that has stopped reading from its stdin. This is commonly // useful with `IgnoreError` for stateless producer stages whose only // job is writing output. Stateful producers should continue any // producer-owned state updates needed for consistency before // returning the pipe error for `IgnoreError` to suppress. Use like // // p.Add(IgnoreError(someStage, IsPipeError)) IsPipeError = AnyError(IsSIGPIPE, IsEPIPE, IsErrClosedPipe) )
var FinishEarly = errors.New("finish stage early")
FinishEarly is an error that can be returned by a `Stage` to request that the iteration be ended early (possibly without reading all of its input). This "error" is considered a successful return, and is not reported to the caller.
Functions ¶
Types ¶
type ContextValuesFunc ¶
type Env ¶
type Env struct {
// The directory in which external commands should be executed by
// default.
Dir string
// Vars are extra environment variables. These will override any
// environment variables that would be inherited from the current
// process.
Vars []AppendVars
}
Env represents the environment that a pipeline stage should run in. It is passed to `Stage.Start()`.
type EnvVar ¶
type EnvVar struct {
// The name of the environment variable.
Key string
// The value.
Value string
}
EnvVar represents an environment variable that will be provided to any child process spawned in this pipeline.
type ErrorFilter ¶
ErrorFilter is a function that can filter errors from `Stage.Wait()`. The original error (possibly nil) is passed in as an argument, and whatever the function returns is the error (possibly nil) that is actually emitted.
type ErrorMatcher ¶
ErrorMatcher decides whether its argument matches some class of errors (e.g., errors that we want to ignore). The function will only be invoked for non-nil errors.
func AnyError ¶
func AnyError(ems ...ErrorMatcher) ErrorMatcher
AnyError returns an `ErrorMatcher` that returns true for an error that matches any of the `ems`.
func IsError ¶
func IsError(target error) ErrorMatcher
IsError returns an ErrorIdentifier for the specified target error, matched using `errors.Is()`. Use like
p.Add(pipe.IgnoreError(someStage, IsError(io.EOF)))
func IsSignal ¶
func IsSignal(signal syscall.Signal) ErrorMatcher
IsSIGPIPE returns an `ErrorMatcher` that matches `*exec.ExitError`s that were caused by the specified signal. The match for `*exec.ExitError`s uses `errors.As()`. Note that under Windows this always returns false, because on that platform `WaitStatus.Signaled()` isn't implemented (it is hardcoded to return `false`).
type FunctionOption ¶
type FunctionOption func(*goStage)
FunctionOption configures a Function stage.
func ForbidStdin ¶
func ForbidStdin() FunctionOption
ForbidStdin returns a FunctionOption declaring that the stage must not be connected to stdin.
func ForbidStdout ¶
func ForbidStdout() FunctionOption
ForbidStdout returns a FunctionOption declaring that the stage must not be connected to stdout.
func WithStdinRequirement ¶
func WithStdinRequirement(requirement StreamRequirement) FunctionOption
WithStdinRequirement returns a FunctionOption declaring the stage's stdin requirement.
func WithStdoutRequirement ¶
func WithStdoutRequirement(requirement StreamRequirement) FunctionOption
WithStdoutRequirement returns a FunctionOption declaring the stage's stdout requirement.
type InputStream ¶
type InputStream struct {
// contains filtered or unexported fields
}
InputStream represents `stdin` for a stage, which might or might not need to be closed when the stage is done with it. It usually holds an `io.Reader`, which can be retrieved using `Reader()`. Its `Close()` method closes the reader if necessary (i.e., if the `InputStream` was constructed using `ClosingInput()`. The `Close()` method is idempotent.
A nil `*InputStream` is a valid value. Its `Reader()` method returns `nil` and `Close()` does nothing successfully.
It might seem like `InputStream` should implement `io.Reader` itself. But we want to avoid hiding the dynamic type of the `io.Reader` that is being used as the stdin of a pipeline. That object might be of a type that is subject to optimizations that aren't available for a generic `io.Reader`. For example, it might be an `*os.File` (which can be passed directly to subcommands or to `splice(2)`), or it might implement `io.WriterTo`.
func ClosingInput ¶
func ClosingInput(r io.ReadCloser) *InputStream
The stage is responsible for closing r.
func (*InputStream) Close ¶
func (s *InputStream) Close() error
Close closes the underlying reader if necessary. If `s` was constructed using `ClosingInput()`, then close the `io.ReadCloser` that was passed to that function. If `s` is `nil` or was constructed using `Input()`, then do nothing successfully.
func (*InputStream) Reader ¶
func (s *InputStream) Reader() io.Reader
type LinewiseStageFunc ¶
type LinewiseStageFunc func( ctx context.Context, env Env, line []byte, stdout *bufio.Writer, ) error
LinewiseStageFunc is a function that can be embedded in a `goStage`. It is called once per line in the input (where "line" can be defined via any `bufio.Scanner`). It should process the line and may write whatever it likes to `stdout`, which is a buffered writer whose contents are forwarded to the input of the next stage of the pipeline. The function needn't write one line of output per line of input.
The function mustn't retain copies of `line`, since it may be overwritten every time the function is called.
The function needn't flush or close `stdout` (this will be done automatically when all of the input has been processed).
If there is an error parsing the input into lines, or if this function returns an error, then the whole pipeline will be aborted with that error. However, if the function returns the special error `pipe.FinishEarly`, the stage will stop processing immediately with a `nil` error value.
The function will be called in a separate goroutine, so it must be careful to synchronize any data access aside from writing to `stdout`.
type NewScannerFunc ¶
NewScannerFunc is used to create a `Scanner` for scanning input that is coming from `r`.
type Option ¶
type Option func(*Pipeline)
Option is a type alias for Pipeline functional options.
func WithEnvVar ¶
WithEnvVar appends an environment variable for the pipeline.
func WithEnvVarFunc ¶
func WithEnvVarFunc(key string, valueFunc ContextValueFunc) Option
WithEnvVarFunc appends a context-based environment variable for the pipeline.
func WithEnvVars ¶
WithEnvVars appends several environment variable for the pipeline.
func WithEnvVarsFunc ¶
func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option
WithEnvVarsFunc appends several context-based environment variables for the pipeline.
func WithEventHandler ¶
WithEventHandler sets a handler for the pipeline. Setting one will emit and event for each process.
func WithStagePanicHandler ¶
func WithStagePanicHandler(ph StagePanicHandler) Option
WithStagePanicHandler sets a panic handler for the stages within a pipeline. When a pipeline stage panics, the provided handler will be invoked, allowing the client to handle the panic in whatever way they see fit.
Note:
- The client is responsible for deciding whether to recover from the panic or panicking again.
- If a panic handler is not set, the panic will be propagated normally.
func WithStdin ¶
WithStdin assigns stdin to the first command in the pipeline. The caller retains ownership of stdin; the pipeline will not close it, even if `Start()` returns an error.
If the first stage is a `Command` and stdin is not an `*os.File`, `exec.Cmd` has to copy stdin through an internal goroutine, and `Cmd.Wait()` waits for that copy to finish. This is fine for bounded readers such as `strings.Reader` and `bytes.Reader`, and for `*os.File` values, which are passed to the command directly. But a borrowed, non-file reader that can block forever can also block the pipeline forever if the command exits without consuming all of its stdin. See `TestPipelineIOPipeStdinThatIsNeverClosed` for the known limitation.
func WithStdout ¶
WithStdout assigns stdout to the last command in the pipeline. The caller retains ownership of stdout; the pipeline will not close it, even if `Start()` returns an error.
func WithStdoutCloser ¶
func WithStdoutCloser(stdout io.WriteCloser) Option
WithStdoutCloser assigns stdout to the last command in the pipeline, and closes stdout when the pipeline is done with it. The pipeline is responsible for closing stdout even if `Start()` returns an error.
type OutputStream ¶
type OutputStream struct {
// contains filtered or unexported fields
}
OutputStream represents `stdout` for a stage, which might or might not need to be closed when the stage is done with it. It usually holds an `io.Writer`, which can be retrieved using `Writer()`. Its `Close()` method closes the writer if necessary (i.e., if the `OutputStream` was constructed using `ClosingOutput()`. The `Close()` method is idempotent.
A nil `*OutputStream` is a valid value. Its `Writer()` method returns `nil` and `Close()` does nothing successfully.
It might seem like `OutputStream` should implement `io.Writer` itself. But we want to avoid hiding the dynamic type of the `io.Writer` that is being used as the stdout of a pipeline. That object might be of a type that is subject to optimizations that aren't available for a generic `io.Writer`. For example, it might be an `*os.File` (which can be passed directly to subcommands or to `splice(2)`), or it might implement `io.ReaderFrom`.
func ClosingOutput ¶
func ClosingOutput(w io.WriteCloser) *OutputStream
The stage is responsible for closing w.
func Output ¶
func Output(w io.Writer) *OutputStream
The stage may write to w but must not close it.
func (*OutputStream) Close ¶
func (s *OutputStream) Close() error
Close closes the underlying writer if necessary. If `s` was constructed using `ClosingOutput()`, then close the `io.WriteCloser` that was passed to that function. If `s` is `nil` or was constructed using `Output()`, then do nothing successfully.
func (*OutputStream) Writer ¶
func (s *OutputStream) Writer() io.Writer
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline represents a Unix-like pipe that can include multiple stages, including external processes but also and stages written in Go.
func (*Pipeline) AddWithIgnoredError ¶
func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage)
AddWithIgnoredError appends one or more stages that are ignoring the passed in error to the pipeline.
func (*Pipeline) Run ¶
Run starts and waits for the commands in the pipeline. If startup fails, it returns the `Start()` error after `Start()` has performed its failure cleanup.
func (*Pipeline) Start ¶
Start starts the commands in the pipeline. If `Start()` exits without an error, `Wait()` must also be called, to allow all resources to be freed.
If `Start()` returns an error, `Wait()` must not be called. Before returning an error, `Start()` cancels and waits for any stages that were started, closes any inter-stage pipes that the pipeline owns, and closes stdout if it was supplied with `WithStdoutCloser()`. Streams supplied with `WithStdin()` or `WithStdout()` remain owned by the caller and are not closed by the pipeline.
type Scanner ¶
Scanner defines the interface (which is implemented by `bufio.Scanner`) that is needed by `AddScannerFunction()`. See `bufio.Scanner` for how these methods should behave.
type Stage ¶
type Stage interface {
// Name returns the name of the stage.
Name() string
// Requirements returns this stage's requirements regarding how its
// stdin and stdout pipes should be created.
Requirements() StageRequirements
// Start starts the stage in the background, in the environment
// described by `opts.Env`, using `stdin` to provide its input and
// `stdout` to collect its output. (`stdin.Reader()` or
// `stdout.Writer()` might be `nil` if the stage is to receive no
// input or produce no output, which might be the case for the
// first/last stage in a pipeline.) The stage is responsible for
// calling `stdin.Close()` and `stdout.Close()`, even if `Start()`
// returns an error. See the `Stage` type comment for more
// information about responsibility for closing stdin and stdout.
//
// If `Start()` returns without an error, `Wait()` must also be
// called, to allow all resources to be freed. If `Start()` returns
// an error, `Wait()` must not be called.
Start(
ctx context.Context, opts StageOptions,
stdin *InputStream, stdout *OutputStream,
) error
// Wait waits for the stage to be done, either because it has
// finished or because it has been killed due to the expiration of
// the context passed to `Start()`.
Wait() error
}
Stage is an element of a `Pipeline`. It reads from standard input and writes to standard output.
Who closes stdin and stdout? ¶
A `Stage` is responsible for calling `Close()` on the `InputStream`/`OutputStream` that represent its stdin and stdout as soon as it doesn't need them anymore. That responsibility begins as soon as the stage's `Start()` method is called, and applies regardless of whether `Start()` returns an error. It must close the streams before its `Wait()` method returns. The caller must not close the streams after calling `Start()`.
Closing stdin/stdout tells the previous/next stage that this stage is done reading/writing data, which can affect their behavior. Therefore, it is important for a stage to close each one as soon as it is done with it.
From the point of view of the pipeline as a whole, if stdin is provided by the user (`WithStdin()`), then we don't want the first stage to close it at all. This is arranged by passing a non-closing `InputStream` when it starts that stage. For stdout, it depends on whether the user supplied it using `WithStdout()` or `WithStdoutCloser()`, and in the former case provides the last stage with a non-closing `OutputStream`. Calling `Close()` on a non-closing stream (or even on a nil stream) is a NOP, so the `Stage` can always call `Close()` and doesn't have to worry about whether a stdin/stdout stream is non-closing.
func Command ¶
Command returns a pipeline `Stage` based on the specified external `command`, run with the given command-line `args`. Its stdin and stdout are handled as usual, and its stderr is collected and included in any `*exec.ExitError` that the command might emit.
func CommandStage ¶
CommandStage returns a pipeline `Stage` with the name `name`, based on the specified `cmd`. Its stdin and stdout are handled as usual, and its stderr is collected and included in any `*exec.ExitError` that the command might emit.
func FilterError ¶
func FilterError(s Stage, filter ErrorFilter) Stage
func Function ¶
func Function(name string, f StageFunc, opts ...FunctionOption) Stage
Function returns a pipeline `Stage` that will run a `StageFunc` in a separate goroutine to process the data. See `StageFunc` for more information.
func IgnoreError ¶
func IgnoreError(s Stage, em ErrorMatcher) Stage
IgnoreError creates a stage that acts like `s` except that it ignores any errors that are matched by `em`. Use like
p.Add(pipe.IgnoreError(
someStage,
func(err error) bool {
var myError *MyErrorType
return errors.As(err, &myError) && myError.foo == 42
},
)
The second argument can also be one of the `ErrorMatcher`s that are provided by this package (e.g., `IsError(target)`, IsSignal(signal), `IsSIGPIPE`, `IsEPIPE`, `IsPipeError`), or one of the functions from the standard library that has the same signature (e.g., `os.IsTimeout`), or some combination of these (e.g., `AnyError(IsSIGPIPE, os.IsTimeout)`).
`IgnoreError` only suppresses the error returned by the wrapped stage. If a producer ignores pipe errors because a later stage can stop reading early, the producer is still responsible for keeping any producer-owned state, metrics, cursors, or other side effects consistent before returning the ignored error.
func LinewiseFunction ¶
func LinewiseFunction(name string, f LinewiseStageFunc) Stage
LinewiseFunction returns a function-based `Stage`. The input will be split into LF-terminated lines and passed to the function one line at a time (without the LF). The function may emit output to its `stdout` argument. See the definition of `LinewiseStageFunc` for more information.
Note that the stage will emit an error if any line (including its end-of-line terminator) exceeds 64 kiB in length. If this is too short, use `ScannerFunction()` directly with your own `NewScannerFunc` as argument, or use `Function()` directly with your own `StageFunc`.
func ScannerFunction ¶
func ScannerFunction( name string, newScanner NewScannerFunc, f LinewiseStageFunc, ) Stage
ScannerFunction creates a function-based `Stage`. The function will be passed input, one line at a time, and may emit output. See the definition of `LinewiseStageFunc` for more information.
func WithExtraEnv ¶
WithExtraEnv returns a Stage that adds env to the environment seen by inner.
type StageFunc ¶
StageFunc is a function that can be used to power a `goStage`. It should read its input from `stdin` and write its output to `stdout`. The Function stage closes `stdin` and `stdout` after the function returns only when the pipeline gave the stage ownership of those streams; StageFunc implementations should not close them directly.
Neither `stdin` nor `stdout` are necessarily buffered. If the `StageFunc` requires buffering, it needs to arrange that itself.
A later stage can stop reading before this function has written all of its output. In that case, writes to `stdout` can fail with an error matched by `IsPipeError`. If the function only writes output and is otherwise stateless, callers can usually wrap the stage with `IgnoreError(stage, IsPipeError)`. If the function also updates producer-owned state, metrics, cursors, or other side effects that depend on how much output was produced, it should bring those side effects to a consistent point before returning the write error.
A `StageFunc` is run in a separate goroutine, so it must be careful to synchronize any data access aside from reading and writing.
type StageOptions ¶
type StageOptions struct {
// Env is the environment (working directory and extra environment
// variables) that the stage should run in.
Env
// PanicHandler, if non-nil, is invoked to recover a panic that escapes
// user code that a stage runs in a library-spawned goroutine,
// converting it into an error. Stage types that don't run user code in
// a library-spawned goroutine ignore it.
PanicHandler StagePanicHandler
}
StageOptions carries everything (other than `ctx`, `stdin`, and `stdout`) that a pipeline passes to `Stage.Start`.
type StagePanicHandler ¶
StagePanicHandler is a function that handles panics in a pipeline's stages.
type StageRequirements ¶
type StageRequirements struct {
Stdin StreamRequirement
Stdout StreamRequirement
}
StageRequirements describes what a Stage needs from the streams connected to its stdin and stdout. The zero value is correct for stages that are happy with arbitrary io.Reader/io.Writer streams, such as Function stages.
type StreamRequirement ¶
type StreamRequirement int
StreamRequirement describes a `Stage`'s requirement for its stdin or stdout, namely whether it can be anything, whether it should preferably be an `*os.File`, or whether it must be `nil`. The zero value `StreamAcceptAny` is a valid value that indicates that the stage has no particular requirements or preferences for its stdin/stdout, such as a typical `Function` stage.
const ( // StreamAcceptAny indicates that the stage hasn't declared what // kind of stream it requires, maybe even `nil`. StreamAcceptAny StreamRequirement = iota // StreamPreferFile indicates that the stage prefers the // corresponding stream to be backed by an `*os.File` (a real file // descriptor), but it can work with any io.Reader/io.Writer. StreamPreferFile // StreamForbidden indicates that the stage requires the // corresponding stream to be nil. It won't read/write the stream // or close it. StreamForbidden )
func (StreamRequirement) Validate ¶
func (requirement StreamRequirement) Validate() error
Validate checks that `req` has a valid value and returns an error otherwise.