parallel

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2023 License: BSD-2-Clause Imports: 6 Imported by: 0

README

Substantially reduce latency for go command-line programs

Introduction

parallel coordinates and serialises output written to stdout and stderr by concurrent goroutines. The goal is to make it easy for go command-line tools to process all their arguments in parallel, thus reducing latency, while maintaining the illusion that each argument is processed serially.

parallel is designed for commands which process multiple arguments similar to:

    $ grep pattern file1 file2...
    $ sha256 filea fileb filec...
    $ gzip --verbose --best jan.tar feb.tar mar.tar...
    $ checkzone --verbose domain1 domain2 domain3...
    $ wget -O all.html https://google.com https://yahoo.com https://apple.com

Normally such commands are constrained from running a goroutine-per-argument because their output is randomly intermingled and thus rendered unintelligible. This is unfortunate as go commands are well suited to a goroutine-per-argument style of implementation.

parallel removes this constraint and enables a goroutine-per-argument approach by ensuring output is not intermingled and that all output appears in serial argument order with minimal changes to the command-line program.

For those familiar with GNU parallel, this package achieves similar functionality within commands written in go.

Project Status

Build Status codecov CodeQL Go Report Card Go Reference

parallel is known to compile and run on go versions 1.20 and beyond.

Background

A key feature of go is the ease with which programs can use goroutines to reduce latency as well as take advantage of modern multi-core CPUs. Unfortunately these advantages are rarely taken up by command-line programs since they need to present output to stdout and stderr in serial-processing order. The end-result is that most go command-line programs revert to processing arguments serially and thus incur much greater latency than they otherwise could. This is particularly true of command-line programs which reach out across the network and incur significant network delays.

parallel removes this impediment by allowing a command-line program to run a goroutine-per-argument while still presenting their output in apparent serial-processing order.

Target Audience

parallel is designed for commands which process multiple independent arguments which take a noticeable amount of time to complete; whether that be due to CPU time, network latency or other external factors. The general idea is that a command uses parallel to start a separate goroutine for each command-line argument and these goroutines run in parallel to reduce latency for the total run-time of the command. For its part, parallel coordinates the output of these goroutines such that the illusion of serial processing is maintained.

This latency reduction is particularly apparent for network-centric commands. By using parallel the total latency is bound by the slowest argument, thus O(1) as opposed to the total number of arguments which is O(n). Clearly as 'n' grows, parallel offers more latency reduction.

Idiomatic Code

Assuming your current code serially processes command-line arguments something like this:

for _, arg := range os.Args {
    handleArg(arg, os.Stdout, os.Stderr)        // Dispatch to handler
}

then to process all arguments in parallel while still generating identical output, your replacement code will look something like this:

group := parallel.NewGroup()

for _, arg := range os.Args {
    argCopy := arg                                      // (pre go 1.21.1 semantics)
    group.Add("", "",
              func(stdout, stderr io.Writer) {          // Use a Closure function
                  handleArg(argCopy, stdout, stderr)    // Dispatch to handler
              })
}

group.Run()
group.Wait()

Assuming handleArg is self-contained (which is to say that it does not modify global data) and consistently refers to the provided io.Writers for stdout and stderr, no other changes are required.

This example pretty much demonstrates all of the parallel functionality. IOWs, parallel is not a complicated package. Nonetheless, for those interested in more detail, complete package documentation is available at parallel.

Installation

When imported by your program, github.com/markdingo/parallel should automatically install with go mod tidy or go mod build.

If not, try running:

go get github.com/markdingo/parallel

Once installed, you can run the package tests with:

 go test -v github.com/markdingo/parallel

as well as display the package documentation with:

 go doc github.com/markdingo/parallel

Community

If you have any problems using parallel or suggestions on how it can do a better job, don't hesitate to create an issue on the project home page. This package can only improve with your feedback.

parallel is Copyright © 2023 Mark Delany. This software is licensed under the BSD 2-Clause "Simplified" License.

Documentation

Overview

Package parallel coordinates and serialises output written to stdout and stderr by concurrent goroutines. The goal is to make it easy for go command-line tools to process all arguments in parallel, thus reducing latency, while maintaining the illusion that each argument is processed serially.

This package is designed for commands which process multiple arguments similar to:

$ grep pattern file1 file2...
$ sha256 filea fileb filec...
$ gzip --verbose --best jan.tar feb.tar mar.tar...
$ checkzone --verbose domain1 domain2 domain3...
$ wget -O all.html https://google.com https://yahoo.com https://apple.com

Normally such commands are constrained from running a goroutine per argument because their output is randomly intermingled and thus rendered unintelligible. This is unfortunate as go commands are well suited to this style of implementation.

The parallel package removes this constraint and enables a goroutine per argument by ensuring output is not intermingled and that all output appears in serial argument order.

For those familiar with the GNU parallel, this package achieves similar functionality within commands written in go.

Comparison to [x/sync/errgroup]

Superficially “parallel” appears similar to x/sync/errgroup in the standard library, however they perform quite different functions in that “errgroup” is designed to manage goroutines working to achieve a common goal where a single failure causes a collective failure. In contrast, “parallel” is designed to manage independent goroutines contained in a command-line program. Most importantly, “parallel” is largely about coordinating output to stdout and stderr whereas “errgroup” plays no part in that.

Caveat

When adapting existing commands to use “parallel”, programmers needs to be aware of newly created concurrent interactions between goroutines which may not have existed with the original implementation. In this situation it is suggested that such commands initially be built and tested with the “-race” option.

How To Use

Idiomatic use is to populate a Group with a RunFunc for each command-line argument. Once populated, Group.Run starts a goroutine for each RunFunc in the Group. Following that, a call to Group.Wait is made to wait for completion of all RunFuncs.

If your current code serially processes command-line arguments something like this:

for _, arg := range os.Args {
    handleArg(arg, os.Stdout, os.Stderr)	// Dispatch to handler
}

then to process all arguments in parallel while still generating serially identical output, your replacement code will look something like this:

group := parallel.NewGroup()

for _, arg := range os.Args {
    argCopy := arg					// (pre 1.21.1 semantics)
    group.Add("", "",
	      func(stdout, stderr io.Writer) {		// Closure function
		  handleArg(argCopy, stdout, stderr)	// Dispatch to handler
	      })
}

group.Run()
group.Wait()

which in this case uses a closure to satisfy the RunFunc signature. An alternative is to use a struct function to satisfy the signature, as described in RunFunc.

The main change you have to make is to ensure that your RunFunc *always* uses the io.Writers designated as stdout and stderr as *all* output must be written to these io.Writers, never to os.Stdout or os.Stderr directly.

Further examples of how to use “parallel” can be found in the _examples sub-directory.

Capturing references to os.Stdout and os.Stderr

If your code-base is large or complicated it may not be easy to find every relevant reference to os.Stdout and os.Stderr, however since these variables can be modified it's relatively easy to at least identify which output is still being written directly. One way to do this is to replace os.Stdout and os.Stderr with your own os.File *after* calling NewGroup. E.g.:

grp,_ := parallel.NewGroup(...)
os.Stdout, _ = os.OpenFile("out.missed", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
os.Stderr = os.Stdout

then examine "out.missed" after running your test suite.

(More sophisiticated blocking and capturing at the time of occurrence is possible with os.NewFile and Unix named pipes.)

Timeouts and error returns

Unlike GNU parallel this package does not support detecting RunFunc timeouts or errors, nor does it offer retry attempts or job resumption. Firstly because this adds a lot of opinionated complexity to the API and secondly because such features designed to best suit individual applications can be readily added via a closure or a struct function.

As one example, if an application wants their RunFunc to stop the whole Group on error somewhat like x/sync/errgroup, one approach is to create a “terminate” channel which is populated on error. Each RunFunc monitors this channel and terminates immediately if it is written to.

group := parallel.NewGroup()
terminate := make(chan any, len(os.Args))

for _, arg := range os.Args {
    argCopy := arg
    group.Add("", "",
         func(stdout, stderr io.Writer) {
            err := handleArg(terminate, argCopy, stdout, stderr)
            if err != nil {
                terminate <- any
            }
         })
}

group.Run()
group.Wait()

Concurrency

Serial processing command-line programs typically do not have to worry about concurrency controls, but when adopting this package, they will now have to do so. Such programs should be particularly aware of modifying shared data such as global counters, progress meters and similar. All such modifications need to be concurrency protected. Naturally access to read-only data structures such as option settings do not require any protection.

Design

The parallel package achieves most of its functionality via a “Pipeline” assigned to each RunFunc. Output is steered thru writers in the pipeline based on the Group config options. Specific features are handled by different writers such as “head” and “tagger”. The theory being that new writers which implement future functionality can easily slot into the pipeline.

There are currently two types of Pipelines: Queue and Passthru.

Queue Pipeline

The initial pipeline for each RunFunc is normally a Queue Pipeline which starts in “background” mode because, much like a background command in a shell, it continues to run, except that all output is buffered until the pipeline is switched to “foreground” mode. This diagram illustrates the “writers” in a Queue Pipeline.

    RunFunc
(stdout,   stderr)
   v         v
   |         |
  head      head        Adapts io.Writer to parallel.writer
   |_       _|
     |     |
      queue             Buffers arrival order of outputs
    _|     |_
   |         |
 tagger    tagger       Prefix each line with 'tag' if set
   |         |
  tail      tail        Serialises Group output access
   |         |          Adapts parallel.writer to io.Writer
 Group     Group
 stdout    stderr
   |         |
   v         v

The “queue” writer buffers and tracks the arrival order of stdout and stderr outputs. Arrival order has to be maintained to ensure correct transfer to the Group io.Writers. This is necessary because the Group io.Writers may well be one and the same, e.g. if a command is invoked with stderr re-directed to stdout or if both stdout and stderr are a terminal.

When a RunFunc becomes a candidate for foreground output (because it has percolated to the front of the queue with OrderRunners(true)), the “queue” buffered output is written to the Group io.Writers and the Queue Pipeline is switched to "foreground" mode.

Passthru Pipeline

Passthru is a skeletal pipeline intended as a diagnostic tool which bypasses most of the “parallel” functionality. It is created when the Group is constructed with Passthru(true).

    RunFunc
(stdout,   stderr)
   v	     v
   |	     |
  head	    head	Adapts io.Writer to parallel.writer
   |	     |
  tail	    tail	Serialises Group output access
   |	     |		Adapts parallel.writer to io.Writer
   |	     |
 Group	   Group
 stdout	   stderr
   |	     |
   v	     v

The main reason for using Passthru is for situations where you have suspicions about the parallel package and want a relatively unfiltered view of what your RunFuncs are generating.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group manages a set of RunFuncs which are ultimately run in parallel by Group.Run and waited on by Group.Wait. A Group must be constructed with NewGroup otherwise a panic will occur when used.

Group has a strict calling sequence: Multiple Group.Add calls followed by Group.Run followed by Group.Wait after which no calls to the Group are valid. Any deviation from this call sequence results in a panic.

A Group cannot be reused, however multiple Groups can be created and used independently of each other.

A Group is not concurrency-safe and must only be accessed by a single goroutine at a time. This does not imply anything about the concurrency of RunFuncs which normally are run concurrently and which are supplied with concurrency-safe io.Writers.

func NewGroup

func NewGroup(opts ...Option) (*Group, error)

NewGroup constructs a Group ready for use. A Group must be constructed with this function as a "zero" struct causes panics. Multiple Options can be supplied to NewGroup to modify its behaviour e.g.:

group := parallel.NewGroup(WithSeparator("----\n"), OrderStderr(true))

Unless otherwise set the Group is returned with OrderRunners(true) and StderrLast(false) which results in RunFuncs optimally writing to the Group output io.Writers (which in turn are normally os.Stdout and os.Stderr).

NewGroup copies os.Stdout and os.Stderr to Group io.Writers for the purpose of writing the serialised output thus any subseqent changes to os.Stdout and os.Stderr have no impact on Group output.

If an option is invalid or would result in a nonsensical Group an error is returned. Most errors are caused by Options which create the possibility that a RunFunc could stall indefinitely. See Option for more details.

func (*Group) Add

func (grp *Group) Add(outTag, errTag string, rFunc RunFunc)

Add appends the supplied RunFunc to the Group in anticipation of Group.Run. Typically a RunFunc is implemented as either a closure or a struct function so as to pass additional parameters to the underlying function. See RunFunc for details.

Order of adding is important when the OrderRunners Option is set true as the current “front” RunFunc pipeline is arranged such that output typically goes directly to os.Stdout and os.Stderr which helps give the appearance of “liveliness”.

The outTag and errTag strings are prepended to all output written by the RunFunc to stdout and stderr respectively and help mimic the “--tag” option in GNU parallel.

func (*Group) Run

func (grp *Group) Run()

Run starts each previously added RunFunc in a separate go routine and transitions the Group to being ready for a Group.Wait call.

When a RunFunc pipeline percolates to the front of all “active” RunFuncs, it is set to foreground mode so that the output is potentially written directly to os.Stdout and os.Stderr thus affording “liveliness” output to the user.

Run constrains the number of “active” RunFuncs to the LimitActiveRunners Option setting. If no limit is set, all RunFuncs are started immediately. A RunFunc is considered “active” only up until it returns, not when the output is sent to the Group io.Writers.

Regardless of any LimitActiveRunners constraints, Run returns to the caller immediately with any outstanding RunFuncs started in the background as LimitActiveRunners allows.

func (*Group) Wait

func (grp *Group) Wait()

Wait waits for all RunFuncs started by Group.Run to complete before returning. If any RunFunc fails to complete, Wait will never return.

While Group.Run starts all RunFuncs, it is Wait which progresses RunFuncs and transitions them from background mode to foreground mode to completion, so it's important that the caller not presume that RunFuncs will complete prior to calling Wait, because they wont. If callers want to perform other activities between calls to Group.Run and Wait, they may want to consider running Wait in a separate goroutine which notifies them when Wait returns.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option functions configure a Group when created with NewGroup. Each Option is documented separately. Some options cannot be mixed with others, primarily because that configuration could cause a RunFunc to stall forever. These limitations are described in each Option.

func LimitActiveRunners

func LimitActiveRunners(maxActive uint) Option

LimitActiveRunners limits the number of “active” (or concurrent) RunFuncs running in a separate goroutine within a Group to the “max” value. It can be used in conjunction with LimitMemoryPerRunner to limit total buffer memory used by the Group, or set independently when there is a risk that too many RunFuncs could be started concurrently. If set to zero, all RunFuncs run concurrently.

A RunFunc is considered “active” until it returns, not when the output is sent to the Group io.Writers.

While tens of thousands of goroutines can run concurrently on most systems, if they all contend for limited resources such as CPU, file descriptors, sockets or disk bandwidth, then constraining concurrency with this option is likely to improve aggregate system throughput or prevent a program from failing due to depleted system resources.

It's sensible to set LimitActiveRunners such that in the general case a command-line program processes all arguments in parallel, but in the extreme case, the program still progresses towards completion without contending too much for limited resources. For example, if the RunFuncs rely on external network connections, perhaps a limit of 10-50 might be a good setting. Conversely, if the RunFuncs are CPU-intensive, setting LimitActiveRunners to some proportion of runtime.NumCPU is likely a good strategy.

LimitActiveRunners is intimately connected to LimitMemoryPerRunner in that the two combine to define the maximum memory resources any Group consumes while buffering output.

func LimitMemoryPerRunner

func LimitMemoryPerRunner(limit uint64) Option

LimitMemoryPerRunner limits the number of output bytes buffered for each RunFunc before being stalled on their Write() call. This setting is mostly of use when RunFuncs may generate multiple MBytes of output, otherwise the benefits are likely to be minimal.

If LimitMemoryPerRunner is set, so too must LimitActiveRunners, otherwise the total amount of memory consumed is unbounded.

If you know your RunFunc only every generates minimal output then leaving this value at the default of zero is a reasonable choice. However, if your RunFunc potentially generates very large amounts of output which may exhaust available memory, then setting LimitMemoryPerRunner and LimitActiveRunners to non-zero values is a good strategy.

This limit only affects background RunFuncs as the foreground RunFunc writes directly to the Group output io.Writers. Ultimately all background RunFuncs switched to foreground mode so reaching this limit only ever temporarily stalls a RunFunc.

LimitMemoryPerRunner cannot be set with OrderStderr == true or OrderRunners == false as that could cause a RunFunc to stall indefinitely.

func OrderRunners

func OrderRunners(setting bool) Option

OrderRunners causes output to being written in strict order of RunFunc addition to the Group. If set false output is in order of runner completion. This option exists to mimic the GNU parallel “--keep-order” option. The default is true (which differs from the default for “--keep-order”).

When OrderRunners is set false, LimitMemoryPerRunner cannot be set non-zero as it creates a situation where all RunFuncs could stall indefinitely.

func OrderStderr

func OrderStderr(setting bool) Option

OrderStderr causes all stderr output to be written *after* all stdout output for each RunFunc. This can result in an output stream which differs from one written by a RunFunc run serially and writing directly to os.Stdout and os.Stderr. This option exists to mimic the GNU parallel “--group” option.

When OrderStderr is set true, LimitMemoryPerRunner cannot be set true as it creates a situation when all runners could stall indefinitely.

func Passthru

func Passthru(setting bool) Option

Passthru is a debug setting. When set true all output is transferred more or less directly to the Group io.Writers. In effect, the Group pipeline plays a very limited part in managing the output stream.

If this option is set true the following options cannot be set true: LimitMemoryPerRunner, OrderStderr and OrderRunners.

func WithStderr

func WithStderr(wtr io.Writer) Option

WithStderr sets the Group stderr destination to the supplied io.Writer replacing the default of os.Stderr.

func WithStderrSeparator

func WithStderrSeparator(sep string) Option

WithStderrSeparator sets the separator string printed to the Group stderr io.Writer between the output of each RunFunc. If WithStdoutSeparator is also set, that string is printed first. If WithStderrSeparator is set to a non-empty string it should normally include a trailing newline. The default is an empty string.

func WithStdout

func WithStdout(wtr io.Writer) Option

WithStdout sets the Group stdout destination to the supplied io.Writer replacing the default of os.Stdout.

func WithStdoutSeparator

func WithStdoutSeparator(sep string) Option

WithStdoutSeparator sets the separator string printed to the Group stdout io.Writer between the output of RunFunc. If WithStdoutSeparator is set to a non-empty string it should normally include a trailing newline. The default is an empty string.

type RunFunc

type RunFunc func(stdout, stderr io.Writer)

RunFunc is the caller supplied function added to a Group with Group.Add. Each RunFunc is run in a separate goroutine when Group.Run is called.

A RunFunc *must* only write to the supplied io.Writers for stdout and stderr. It should never write to os.Stdout or os.Stderr directly unless that output is purposely intended to bypass this package (debug output being a possible example).

Naturally each RunFunc instance should avoid interacting with other RunFunc instances unless suitable concurrency controls are used. This advice is mentioned because cli programs adopting this package may not have had to worry about concurrency previously but may have to do so now. For example, unprotected modification of global variables is not uncommon in serial cli programs.

A RunFunc is free to create internal goroutines and concurrently write to the provided io.Writers... But all such goroutines must complete all writing before RunFunc returns and perhaps more importantly, concurrently writing to the assigned stdout/stderr io.Writers is likely to produce intermingled output, so it's not recommended.

Typically a RunFunc is constructed as a closure to allow additional parameters to be passed to the desired function. This example shows a closure passing in a context and the individual command-line argument to each RunFunc.

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
group := parallel.NewGroup()
for _, arg := range os.Args {
    argCopy := arg					   // (pre 1.21.1 semantics)
    group.Add("", "",
	      func(stdout, stderr io.Writer) {		  // Closure function
		  handleArg(ctx, argCopy, stdout, stderr) // Dispatch to handler
	     })
}
group.Run()
group.Wait()

An alternative strategy is to use a struct function which satisfies the RunFunc signature as shown here:

type file string

grp, _ := parallel.NewGroup()
for _, f := range os.Args {
    runner := file(f)
        grp.Add(f+"\t", "", runner.run)
    }
    grp.Run()
    grp.Wait()
}

func (f *file) run(stdout, stderr io.Writer) {
    fmt.Fprintln(stdout, "Running")
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL