Documentation ¶
Index ¶
- Variables
- func ScanLFTerminatedLines(data []byte, atEOF bool) (advance int, token []byte, err error)
- type AppendVars
- type ContextValueFunc
- type ContextValuesFunc
- type Env
- type EnvVar
- type ErrorFilter
- type ErrorMatcher
- type Event
- type LimitableStage
- type LinewiseStageFunc
- type NewPipeFn
- type NewScannerFunc
- type Option
- func WithDir(dir string) Option
- func WithEnvVar(key, value string) Option
- func WithEnvVarFunc(key string, valueFunc ContextValueFunc) Option
- func WithEnvVars(b []EnvVar) Option
- func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option
- func WithEventHandler(handler func(e *Event)) Option
- func WithStdin(stdin io.Reader) Option
- func WithStdout(stdout io.Writer) Option
- func WithStdoutCloser(stdout io.WriteCloser) Option
- type Pipeline
- func (p *Pipeline) Add(stages ...Stage)
- func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage)
- func (p *Pipeline) Output(ctx context.Context) ([]byte, error)
- func (p *Pipeline) Run(ctx context.Context) error
- func (p *Pipeline) Start(ctx context.Context) error
- func (p *Pipeline) Wait() error
- type Scanner
- type Stage
- func Command(command string, args ...string) Stage
- func CommandStage(name string, cmd *exec.Cmd) Stage
- func FilterError(s Stage, filter ErrorFilter) Stage
- func Function(name string, f StageFunc) Stage
- func IgnoreError(s Stage, em ErrorMatcher) Stage
- func LinewiseFunction(name string, f LinewiseStageFunc) Stage
- func MemoryLimit(stage Stage, byteLimit uint64, eventHandler func(e *Event)) Stage
- func MemoryObserver(stage Stage, eventHandler func(e *Event)) Stage
- func Print(a ...interface{}) Stage
- func Printf(format string, a ...interface{}) Stage
- func Println(a ...interface{}) Stage
- func ScannerFunction(name string, newScanner NewScannerFunc, f LinewiseStageFunc) Stage
- type StageFunc
Constants ¶
This section is empty.
Variables ¶
var ( // IsSIGPIPE is an `ErrorMatcher` that matches `*exec.ExitError`s // that were caused by SIGPIPE. The match for `*exec.ExitError`s // uses `errors.As()`. Use like // // p.Add(IgnoreError(someStage, IsSIGPIPE)) IsSIGPIPE = IsSignal(syscall.SIGPIPE) // IsEPIPE is an `ErrorMatcher` that matches `syscall.EPIPE` using // `errors.Is()`. Use like // // p.Add(IgnoreError(someStage, IsEPIPE)) IsEPIPE = IsError(syscall.EPIPE) // IsErrClosedPipe is an `ErrorMatcher` that matches // `io.ErrClosedPipe` using `errors.Is()`. (`io.ErrClosedPipe` is // the error that results from writing to a closed // `*io.PipeWriter`.) Use like // // p.Add(IgnoreError(someStage, IsErrClosedPipe)) IsErrClosedPipe = IsError(io.ErrClosedPipe) // IsPipeError is an `ErrorMatcher` that matches a few different // errors that typically result if a stage writes to a subsequent // stage that has stopped reading from its stdin. Use like // // p.Add(IgnoreError(someStage, IsPipeError)) IsPipeError = AnyError(IsSIGPIPE, IsEPIPE, IsErrClosedPipe) )
var ErrMemoryLimitExceeded = errors.New("memory limit exceeded")
ErrMemoryLimitExceeded is the error that will be used to kill a process, if necessary, from MemoryLimit.
var FinishEarly = errors.New("finish stage early")
FinishEarly is an error that can be returned by a `Stage` to request that the iteration be ended early (possibly without reading all of its input). This "error" is considered a successful return, and is not reported to the caller.
Functions ¶
Types ¶
type ContextValuesFunc ¶
type Env ¶
type Env struct { // The directory in which external commands should be executed by // default. Dir string // Vars are extra environment variables. These will override any // environment variables that would be inherited from the current // process. Vars []AppendVars }
Env represents the environment that a pipeline stage should run in. It is passed to `Stage.Start()`.
type EnvVar ¶
type EnvVar struct { // The name of the environment variable. Key string // The value. Value string }
EnvVar represents an environment variable that will be provided to any child process spawned in this pipeline.
type ErrorFilter ¶
ErrorFilter is a function that can filter errors from `Stage.Wait()`. The original error (possibly nil) is passed in as an argument, and whatever the function returns is the error (possibly nil) that is actually emitted.
type ErrorMatcher ¶
ErrorMatcher decides whether its argument matches some class of errors (e.g., errors that we want to ignore). The function will only be invoked for non-nil errors.
func AnyError ¶
func AnyError(ems ...ErrorMatcher) ErrorMatcher
AnyError returns an `ErrorMatcher` that returns true for an error that matches any of the `ems`.
func IsError ¶
func IsError(target error) ErrorMatcher
IsError returns an ErrorIdentifier for the specified target error, matched using `errors.Is()`. Use like
p.Add(pipe.IgnoreError(someStage, IsError(io.EOF)))
func IsSignal ¶
func IsSignal(signal syscall.Signal) ErrorMatcher
IsSIGPIPE returns an `ErrorMatcher` that matches `*exec.ExitError`s that were caused by the specified signal. The match for `*exec.ExitError`s uses `errors.As()`. Note that under Windows this always returns false, because on that platform `WaitStatus.Signaled()` isn't implemented (it is hardcoded to return `false`).
type LimitableStage ¶
LimitableStage is the superset of Stage that must be implemented by stages passed to MemoryLimit and MemoryObserver.
type LinewiseStageFunc ¶
type LinewiseStageFunc func( ctx context.Context, env Env, line []byte, stdout *bufio.Writer, ) error
LinewiseStageFunc is a function that can be embedded in a `goStage`. It is called once per line in the input (where "line" can be defined via any `bufio.Scanner`). It should process the line and may write whatever it likes to `stdout`, which is a buffered writer whose contents are forwarded to the input of the next stage of the pipeline. The function needn't write one line of output per line of input.
The function mustn't retain copies of `line`, since it may be overwritten every time the function is called.
The function needn't flush or close `stdout` (this will be done automatically when all of the input has been processed).
If there is an error parsing the input into lines, or if this function returns an error, then the whole pipeline will be aborted with that error. However, if the function returns the special error `pipe.FinishEarly`, the stage will stop processing immediately with a `nil` error value.
The function will be called in a separate goroutine, so it must be careful to synchronize any data access aside from writing to `stdout`.
type NewScannerFunc ¶
NewScannerFunc is used to create a `Scanner` for scanning input that is coming from `r`.
type Option ¶
type Option func(*Pipeline)
Option is a type alias for Pipeline functional options.
func WithEnvVar ¶
WithEnvVar appends an environment variable for the pipeline.
func WithEnvVarFunc ¶
func WithEnvVarFunc(key string, valueFunc ContextValueFunc) Option
WithEnvVarFunc appends a context-based environment variable for the pipeline.
func WithEnvVars ¶
WithEnvVars appends several environment variable for the pipeline.
func WithEnvVarsFunc ¶
func WithEnvVarsFunc(valuesFunc ContextValuesFunc) Option
WithEnvVarsFunc appends several context-based environment variables for the pipeline.
func WithEventHandler ¶
WithEventHandler sets a handler for the pipeline. Setting one will emit and event for each process.
func WithStdout ¶
WithStdout assigns stdout to the last command in the pipeline.
func WithStdoutCloser ¶
func WithStdoutCloser(stdout io.WriteCloser) Option
WithStdoutCloser assigns stdout to the last command in the pipeline, and closes stdout when it's done.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline represents a Unix-like pipe that can include multiple stages, including external processes but also and stages written in Go.
func (*Pipeline) AddWithIgnoredError ¶
func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage)
AddWithIgnoredError appends one or more stages that are ignoring the passed in error to the pipeline.
type Scanner ¶
Scanner defines the interface (which is implemented by `bufio.Scanner`) that is needed by `AddScannerFunction()`. See `bufio.Scanner` for how these methods should behave.
type Stage ¶
type Stage interface { // Name returns the name of the stage. Name() string // Start starts the stage in the background, in the environment // described by `env`, and using `stdin` as input. (`stdin` should // be set to `nil` if the stage is to receive no input, which // might be the case for the first stage in a pipeline.) It // returns an `io.ReadCloser` from which the stage's output can be // read (or `nil` if it generates no output, which should only be // the case for the last stage in a pipeline). It is the stages' // responsibility to close `stdin` (if it is not nil) when it has // read all of the input that it needs, and to close the write end // of its output reader when it is done, as that is generally how // the subsequent stage knows that it has received all of its // input and can finish its work, too. // // If `Start()` returns without an error, `Wait()` must also be // called, to allow all resources to be freed. Start(ctx context.Context, env Env, stdin io.ReadCloser) (io.ReadCloser, error) // Wait waits for the stage to be done, either because it has // finished or because it has been killed due to the expiration of // the context passed to `Start()`. Wait() error }
Stage is an element of a `Pipeline`.
func Command ¶
Command returns a pipeline `Stage` based on the specified external `command`, run with the given command-line `args`. Its stdin and stdout are handled as usual, and its stderr is collected and included in any `*exec.ExitError` that the command might emit.
func CommandStage ¶
CommandStage returns a pipeline `Stage` with the name `name`, based on the specified `cmd`. Its stdin and stdout are handled as usual, and its stderr is collected and included in any `*exec.ExitError` that the command might emit.
func FilterError ¶
func FilterError(s Stage, filter ErrorFilter) Stage
func Function ¶
Function returns a pipeline `Stage` that will run a `StageFunc` in a separate goroutine to process the data. See `StageFunc` for more information.
func IgnoreError ¶
func IgnoreError(s Stage, em ErrorMatcher) Stage
IgnoreError creates a stage that acts like `s` except that it ignores any errors that are matched by `em`. Use like
p.Add(pipe.IgnoreError( someStage, func(err error) bool { var myError *MyErrorType return errors.As(err, &myError) && myError.foo == 42 }, )
The second argument can also be one of the `ErrorMatcher`s that are provided by this package (e.g., `IsError(target)`, IsSignal(signal), `IsSIGPIPE`, `IsEPIPE`, `IsPipeError`), or one of the functions from the standard library that has the same signature (e.g., `os.IsTimeout`), or some combination of these (e.g., `AnyError(IsSIGPIPE, os.IsTimeout)`).
func LinewiseFunction ¶
func LinewiseFunction(name string, f LinewiseStageFunc) Stage
LinewiseFunction returns a function-based `Stage`. The input will be split into LF-terminated lines and passed to the function one line at a time (without the LF). The function may emit output to its `stdout` argument. See the definition of `LinewiseStageFunc` for more information.
Note that the stage will emit an error if any line (including its end-of-line terminator) exceeds 64 kiB in length. If this is too short, use `ScannerFunction()` directly with your own `NewScannerFunc` as argument, or use `Function()` directly with your own `StageFunc`.
func MemoryLimit ¶
MemoryLimit watches the memory usage of the stage and stops it if it exceeds the given limit.
func MemoryObserver ¶
MemoryObserver watches memory use of the stage and logs the maximum value when the stage exits.
func ScannerFunction ¶
func ScannerFunction( name string, newScanner NewScannerFunc, f LinewiseStageFunc, ) Stage
ScannerFunction creates a function-based `Stage`. The function will be passed input, one line at a time, and may emit output. See the definition of `LinewiseStageFunc` for more information.
type StageFunc ¶
StageFunc is a function that can be used to power a `goStage`. It should read its input from `stdin` and write its output to `stdout`. `stdin` and `stdout` will be closed automatically (if necessary) once the function returns.
Neither `stdin` nor `stdout` are necessarily buffered. If the `StageFunc` requires buffering, it needs to arrange that itself.
A `StageFunc` is run in a separate goroutine, so it must be careful to synchronize any data access aside from reading and writing.