parallelizer

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2020 License: Apache-2.0 Imports: 6 Imported by: 0

README

============
Parallelizer
============

.. image:: https://img.shields.io/github/tag/klmitch/parallelizer.svg
    :target: https://github.com/klmitch/parallelizer/tags
.. image:: https://img.shields.io/hexpm/l/plug.svg
    :target: https://github.com/klmitch/parallelizer/blob/master/LICENSE
.. image:: https://travis-ci.org/klmitch/parallelizer.svg?branch=master
    :target: https://travis-ci.org/klmitch/parallelizer
.. image:: https://coveralls.io/repos/github/klmitch/parallelizer/badge.svg?branch=master
    :target: https://coveralls.io/github/klmitch/parallelizer?branch=master
.. image:: https://godoc.org/github.com/klmitch/parallelizer?status.svg
    :target: http://godoc.org/github.com/klmitch/parallelizer
.. image:: https://img.shields.io/github/issues/klmitch/parallelizer.svg
    :target: https://github.com/klmitch/parallelizer/issues
.. image:: https://img.shields.io/github/issues-pr/klmitch/parallelizer.svg
    :target: https://github.com/klmitch/parallelizer/pulls
.. image:: https://goreportcard.com/badge/github.com/klmitch/parallelizer
    :target: https://goreportcard.com/report/github.com/klmitch/parallelizer

This repository contains Parallelizer.  Parallelizer is a library for
enabling the addition of controlled parallelization utilizing a pool
of worker goroutines in a simple manner.  This is not intended as an
external job queue, where outside programs may submit jobs, although
it could easily be used to implement such a tool.

Interfaces
==========

The workers in this package implement the ``Worker`` interface,
consisting of two methods: ``Call()`` submits data to be worked on,
while ``Wait()`` stops data submission and waits for a final result.
Each worker is initialized with an instance of ``Runner`` provided by
the caller.  A ``Runner`` must provide 3 methods: ``Run()``, which
acts on the data and returns a result; ``Integrate()``, which takes a
result produced by ``Run()`` and combines it with the other results
collected so far in an application-dependent fashion; and
``Result()``, which returns the final integrated result.  Note that
each of these three methods may potentially run in a different
goroutine, and ``Run()`` may be invoked in parallel many times, but
calls to ``Integrate()`` are all performed synchronously, and
``Result()`` is only called once after all calls to ``Run()`` and
``Integrate()`` have completed.  Further, it is not safe to call the
methods of the same ``Worker`` instance from any of the ``Runner``
methods, but the ``Integrate()`` method will be called with an
instance of ``Worker`` that it is safe to call ``Call()`` on, allowing
recursion by having ``Run()`` return lists of additional data that
``Integrate()`` then passes to ``Call()``.

In addition to the two interfaces above, aimed at allowing
parallelization, there are also three interfaces concerned with
serialization--having a bunch of disparate goroutines make calls that
are not thread-safe without necessarily blocking themselves.  These
are the ``Doer`` interface, which is to be implemented by client code;
the ``Serializer`` interface, which allows those calls to be made; and
``CallResult``, which is a "future"-style object allowing asynchronous
retrieval of the results of a call made via ``Serializer``.  A user
would call ``Serializer.Call()`` (or any of its sister methods) to
have a ``Doer.Do()`` call made in the wrapped ``Doer`` in a
synchronized manner; then calling ``Serializer.Wait()`` will result in
a call to ``Doer.Finish()``, returning the result.

Available Implementations
-------------------------

The parallelizer package provides 3 implementations of the ``Worker``
interface.  The first is ``MockWorker``, which is a simple struct that
may be used to mock the parallelizer out for the purposes of unit
testing.

The second implementation of ``Worker`` is a trivial implementation of
a synchronous worker, where all activity happens in a single
goroutine: the one that is calling the ``Call()`` and ``Wait()``
methods.  This worker is not thread-safe, and should not be used
simultaneously from different goroutines.  An instance of a
synchronous worker may be created by passing the application's
``Runner`` to the ``NewSynchronousWorker()`` function.

The third implementation of ``Worker`` is a parallel worker; this
worker starts up a defined number of worker goroutines, plus a manager
goroutine.  This worker can be called into from almost any goroutine
(subject to the restriction noted above: ``Run()``, ``Integrate()``,
and ``Result()`` cannot invoke ``Call()`` on the worker itself, but
``Integrate()`` is passed a variant that is safe to ``Call()``).  To
create an instance of a parallel worker, pass the runner and the
desired number of worker goroutines to the ``NewParallelWorker()``
function.

The parallelizer package also provides 2 implementation of the
``Serializer`` interface.  The first is ``MockSerializer``, which is a
simple struct that may be used to mock the serializer out for the
purposes of unit testing.

The second implementation of ``Serializer`` utilizes a manager
goroutine that actually makes the calls to the ``Doer.Do()`` method.
An instance of this serializer may be created by passing the
application's ``Doer`` to the ``NewSerializer()`` function.

Additional Utilities
--------------------

The parallelizer package also provides a ``MockRunner``, a struct
which implements the ``Runner`` interface.  This may be useful for
other applications that utilize ``Runner``, or which need to pass
``Runner`` instances around internally.  Similarly, it provides the
``MockDoer``, another struct which implements the ``Doer`` interface,
and ``MockCallResult``, which implements the ``CallResult`` interface.
This latter may be useful if the application being tested uses
``Serializer.CallAsync()``.

Testing
=======

This repository is a standard go repository, and so may be tested and
built in the standard go ways.  However, the repository also contains
a ``Makefile`` to aid in repeatable testing and reformatting;
developers that wish to contribute to Parallelizer may find it useful
to utilize ``make`` to ensure that their code conforms to the
standards enforced by Travis CI.  The following is a run-down of the
available ``make`` targets.

``make format-test``
--------------------

This target is called by Travis to ensure that the formatting conforms
to that recommended by the standard go tools ``goimports`` and
``gofmt``.  Most developers should prefer the ``make format`` target,
which is automatically run by ``make test`` or ``make cover``, and
will rewrite non-conforming files.  Note that ``goimports`` is a
third-party package; it may be installed using::

    % go get -u -v golang.org/x/tools/cmd/goimports

``make format``
---------------

This target may be called by developers to ensure that the source code
conforms to the recommended style.  It runs ``goimports`` and
``gofmt`` to this end.  Most developers will prefer to use ``make
test`` or ``make cover``, which automatically invoke ``make format``.
Note that ``goimports`` is a third-party package; it may be installed
using::

    % go get -u -v golang.org/x/tools/cmd/goimports

``make lint``
-------------

This target may be called to run a lint check.  This tests for such
things as the presence of documentation comments on exported functions
and types, etc.  To this end, this target runs ``golint`` in enforcing
mode.  Most developers will prefer to use ``make test`` or ``make
cover``, which automatically invoke ``make lint``.  Note that
``golint`` is a third-party package; it may be installed using::

    % go get -u -v golang.org/x/lint/golint

``make vet``
------------

This target may be called to run a "vet" check.  This vets the source
code, looking for common problems prior to attempting to compile it.
Most developers will prefer to use ``make test`` or ``make cover``,
which automatically invoke ``make vet``.

``make test-only``
------------------

This target may be called to run only the unit tests.  A coverage
profile will be output to ``coverage.out``, but none of the other
tests, such as ``make vet``, will be invoked.  Most developers will
prefer to use ``make test`` or ``make cover``, which automatically
invoke ``make test-only``, among other targets.

``make test``
-------------

This target may be called to run all the tests.  It ensures that
``make format``, ``make lint``, ``make vet``, and ``make test-only``
are all called, in that order.

``make cover``
--------------

This target may be called to run ``make test``, but will additionally
generate an HTML file named ``coverage.html`` which will report on the
coverage of the source code by the test suite.

``make clean``
--------------

This target may be called to remove the temporary files
``coverage.out`` and ``coverage.html``, as well as any future
temporary files that are added in the testing process.

Contributing
============

Contributions are welcome!  Please ensure that all tests described
above pass prior to proposing pull requests; pull requests that do not
pass the test suite unfortunately cannot be merged.  Also, please
ensure adequate test coverage of additional code and branches of
existing code; the ideal target is 100% coverage, to ensure adequate
confidence in the function of Parallelizer.

Documentation

Overview

Package parallelizer is a library for enabling the addition of controlled parallelization utilizing a pool of worker goroutines in a simple manner. This is not intended as an external job queue, where outside programs may submit jobs, although it could easily be used to implement such a tool.

The parallelizer package provides a Runner interface, which is for client applications to implement. Instances of the Runner interface may then be passed to the constructor functions NewSynchronousWorker or NewParallelWorker, which construct objects conforming to the Worker interface. Data items may then be passed to the Worker instances via the Worker.Call method, and the processing completed and the final result obtained by calling Worker.Wait.

A Runner implementation must provide a Runner.Run method, which will actually process the data in a goroutine and return a result; the result is then passed to the Runner.Integrate method, which is run synchronously with other Runner.Integrate calls, and which can submit additional data items for processing. Once all data is processed, and the client code has called Worker.Wait, the Worker will call the Runner.Result method to obtain the result. The Runner.Result method will be called exactly once; the returned value is cached in the Worker to be returned by future calls to Worker.Wait. The Worker.Call method may not be called again after Worker.Wait has been called.

The parallelizer package also provides a Doer interface, which is for client applications to implement. Instances of the Doer interface may then be passed to the constructor function NewSerializer, which constructs objects conforming to the Serializer interface. Data items may then be passed to the Serializer objects to be executed via Doer.Do in a synchronous manner without necessarily blocking the calling goroutine.

A Doer implementation must provide a Doer.Do method, which will actually process the data in a separate goroutine; each call will be executed synchronously, so thread-safety in Doer.Do is not a concern. When the code using the wrapping Serializer is done, it will call Serializer.Wait, which will call the Doer.Finish method and return its result to the caller. Note that none of the Serializer.Call methods may be called again after calling Serializer.Wait.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrClosed        = errors.New("Object has been closed by a call to Wait")
	ErrWouldDeadlock = errors.New("Called Wait from Integrate; would deadlock")
)

Various errors that may be returned by Worker.Call.

Functions

This section is empty.

Types

type CallResult added in v0.3.0

type CallResult interface {
	// Wait is used to retrieve the result of the call.  The
	// result is not cached in the CallResult object, so
	// subsequent calls to Wait will return nil.
	Wait() *Result

	// TryWait is a non-blocking variant of Wait.  It attempts to
	// retrieve the result, and returns the value and a boolean
	// value that indicates whether the result has already been
	// retrieved.
	TryWait() (*Result, bool)

	// Channel returns the channel that the CallResult object uses
	// to receive the results.  This allows the caller to directly
	// select on the channel.  Note that if the result has already
	// been received, the channel returned by this method will be
	// nil.  Using this method effectively closes the CallResult;
	// subsequent calls to Wait and TryWait will return nil
	// results.
	Channel() <-chan *Result
}

CallResult is an interface describing a "future" returned by Serializer.CallAsync. It allows the call to be made without blocking the goroutine calling Serializer.CallAsync, but the result of the call may still be waited upon at a later date.

type Doer added in v0.3.0

type Doer interface {
	// Do does some operation.  It receives some data and returns
	// some result.  A serializer wraps a Doer to ensure the
	// operation is done in a single goroutine, synchronously.
	Do(data interface{}) interface{}

	// Finish is called when the manager goroutine of a Serializer
	// implementation has been signaled to exit.  It may return a
	// value, which becomes the return value from Serializer.Wait.
	Finish() interface{}
}

Doer is an interface describing an operation to be done in a synchronized fashion, such as building a data structure.

type MockCallResult added in v0.3.0

type MockCallResult struct {
	mock.Mock
}

MockCallResult is a mock for the CallResult interface. It is provided to facilitate testing code that utilizes the serializer.

func (*MockCallResult) Channel added in v0.3.0

func (m *MockCallResult) Channel() <-chan *Result

Channel returns the channel that the CallResult object uses to receive the results. This allows the caller to directly select on the channel. Note that if the result has already been received, the channel returned by this method will be nil. Using this method effectively closes the CallResult; subsequent calls to Wait and TryWait will return nil results.

func (*MockCallResult) TryWait added in v0.3.0

func (m *MockCallResult) TryWait() (*Result, bool)

TryWait is a non-blocking variant of Wait. It attempts to retrieve the result, and returns the value and a boolean value that indicates whether the result has already been retrieved.

func (*MockCallResult) Wait added in v0.3.0

func (m *MockCallResult) Wait() *Result

Wait is used to retrieve the result of the call. The result is not cached in the CallResult object, so subsequent calls to Wait will return nil.

type MockDoer added in v0.3.0

type MockDoer struct {
	mock.Mock
}

MockDoer is a mock for the Doer interface. It is provided to facilitate testing code that utilizes the serializer.

func (*MockDoer) Do added in v0.3.0

func (m *MockDoer) Do(data interface{}) interface{}

Do does some operation. It receives some data and returns some result. A serializer wraps a Doer to ensure the operation is done in a single goroutine, synchronously.

func (*MockDoer) Finish added in v0.3.0

func (m *MockDoer) Finish() interface{}

Finish is called when the manager goroutine of a Serializer implementation has been signaled to exit. It may return a value, which becomes the return value from Serializer.Wait.

type MockRunner

type MockRunner struct {
	mock.Mock
}

MockRunner is a mock for the Runner interface. It is provided to facilitate internal testing of the Runner implementations, but may be used by external users to test other code that utilizes a Runner.

func (*MockRunner) Integrate

func (m *MockRunner) Integrate(worker Worker, result *Result)

Integrate is used to combine all the data returned by Run method invocations. It is passed a Worker object, which it may use to make additional calls to Worker.Call, even if Worker.Wait has been called. All instances of Integrate operate synchronously in a single goroutine, and must not block; a side-effect is that the elements they interact with may be safely accessed without concern for parallel calls to Integrate. The idea of Integrate is to allow the results from the various Run method calls to be combined together into a single result, which may then be obtained through a call to Result. Note that if Run panics, the data will be passed to Integrate as the "panicData" parameter, and the "result" parameter will be nil.

Note that Integrate is not running in the same goroutine as that which is making Worker.Call calls; in fact, those calls may be from multiple goroutines.

func (*MockRunner) Result

func (m *MockRunner) Result() interface{}

Result is called by the Worker.Wait method a single time, once all the worker goroutines have been terminated. It is intended to work in conjunction with Integrate to enable the final result of the work to be reported to the caller of Worker.Wait. It runs in the same goroutine as Worker.Wait, and need not worry about any other goroutine calling any other method from the Runner.

func (*MockRunner) Run

func (m *MockRunner) Run(data interface{}) interface{}

Run is the method that will be called to actually process the data. It will be passed the data that was passed to Worker.Call, and may return data that will be subsequently passed to the Integrate method. The Run method may be called from any number of goroutines (workers), so any resources it interacts with, including those embedded in the object, must be accessed in a thread-safe fashion.

It is not safe for Run to make any calls to Worker.Call; this may potentially lead to a deadlock scenario. Instead, return those items and handle the calls to Worker.Call from the Integrate method.

type MockSerializer added in v0.3.0

type MockSerializer struct {
	mock.Mock
}

MockSerializer is a mock for the Serializer interface. It is provided to facilitate testing code that utilizes Serializer implementations.

func (*MockSerializer) Call added in v0.3.0

func (m *MockSerializer) Call(data interface{}) (*Result, error)

Call is used to invoke the Doer.Do method of the wrapped Doer. It may return an error if the Serializer is closed. Call is synchronous, and will not return until the Doer.Do method has completed.

func (*MockSerializer) CallAsync added in v0.3.0

func (m *MockSerializer) CallAsync(data interface{}) (CallResult, error)

CallAsync is used to invoke the Doer.Do method, like Call, but it does not block; instead, it returns a CallResult object, which may be queried later for the result of the call.

func (*MockSerializer) CallOnly added in v0.3.0

func (m *MockSerializer) CallOnly(data interface{}) error

CallOnly is used to invoke the Doer.Do method, but it does not block; instead, the result of the call is discarded.

func (*MockSerializer) Wait added in v0.3.0

func (m *MockSerializer) Wait() interface{}

Wait signals the manager goroutine to exit, then waits for it to do so. The manager will call the Doer.Finish method and return its result to Wait, which will in turn return it to the caller. The result will be cached to satisfy future calls to Wait.

type MockWorker

type MockWorker struct {
	mock.Mock
}

MockWorker is a mock for the Worker interface. It is provided to facilitate testing code that utilizes Worker implementations.

func (*MockWorker) Call

func (m *MockWorker) Call(data interface{}) error

Call is the method used to submit data to be worked in a call to the Runner.Run method. It may return an error if the worker has been shut down through a call to Wait.

func (*MockWorker) Wait

func (m *MockWorker) Wait() (interface{}, error)

Wait is called to shut down the worker and return the final result; it will block the caller until all data has been processed and all worker goroutines have stopped. Note that the final result, generated by Runner.Result, is saved by Worker to satisfy later calls to Wait. If Wait is called before any calls to Call, the worker will go straight to a stopped state, and no further Call calls may be made; no error will be returned in that case.

type Result added in v0.3.0

type Result struct {
	Result interface{} // The function result
	Panic  interface{} // The captured panic
}

Result describes a result from calling a Run or Do function. These functions are called in such a way as to capture panics, and the Result structure will contain both the return value and the captured panic.

type Runner

type Runner interface {
	// Run is the method that will be called to actually process
	// the data.  It will be passed the data that was passed to
	// Worker.Call, and may return data that will be subsequently
	// passed to the Integrate method.  The Run method may be
	// called from any number of goroutines (workers), so any
	// resources it interacts with, including those embedded in
	// the object, must be accessed in a thread-safe fashion.
	//
	// It is not safe for Run to make any calls to Worker.Call;
	// this may potentially lead to a deadlock scenario.  Instead,
	// return those items and handle the calls to Worker.Call from
	// the Integrate method.
	Run(data interface{}) interface{}

	// Integrate is used to combine all the data returned by Run
	// method invocations.  It is passed a Worker object, which it
	// may use to make additional calls to Worker.Call, even if
	// Worker.Wait has been called.  All instances of Integrate
	// operate synchronously in a single goroutine, and must not
	// block; a side-effect is that the elements they interact
	// with may be safely accessed without concern for parallel
	// calls to Integrate.  The idea of Integrate is to allow the
	// results from the various Run method calls to be combined
	// together into a single result, which may then be obtained
	// through a call to Result.  Note that if Run panics, the
	// data will be passed to Integrate as the "panicData"
	// parameter, and the "result" parameter will be nil.
	//
	// Note that Integrate is not running in the same goroutine as
	// that which is making Worker.Call calls; in fact, those
	// calls may be from multiple goroutines.
	Integrate(worker Worker, result *Result)

	// Result is called by the Worker.Wait method a single time,
	// once all the worker goroutines have been terminated.  It is
	// intended to work in conjunction with Integrate to enable
	// the final result of the work to be reported to the caller
	// of Worker.Wait.  It runs in the same goroutine as
	// Worker.Wait, and need not worry about any other goroutine
	// calling any other method from the Runner.
	Result() interface{}
}

Runner is an interface describing the work to be done. A Worker is typically instantiated by passing it a Runner, which it will then use to process the submitted data.

type Serializer added in v0.3.0

type Serializer interface {
	// Call is used to invoke the Doer.Do method of the wrapped
	// Doer.  It may return an error if the Serializer is closed.
	// Call is synchronous, and will not return until the Doer.Do
	// method has completed.
	Call(data interface{}) (*Result, error)

	// CallAsync is used to invoke the Doer.Do method, like Call,
	// but it does not block; instead, it returns a CallResult
	// object, which may be queried later for the result of the
	// call.
	CallAsync(data interface{}) (CallResult, error)

	// CallOnly is used to invoke the Doer.Do method, but it does
	// not block; instead, the result of the call is discarded.
	CallOnly(data interface{}) error

	// Wait signals the manager goroutine to exit, then waits for
	// it to do so.  The manager will call the Doer.Finish method
	// and return its result to Wait, which will in turn return it
	// to the caller.  The result will be cached to satisfy future
	// calls to Wait.
	Wait() interface{}
}

Serializer is an interface for doing the opposite of parallelizing an operation. The use case for Serializer is when something must be done in a single goroutine, for synchronization, but the calls need to be able to come from a multitude of goroutines. In this case, the client code would implement the Doer interface, then wrap it using Serializer.

func NewSerializer added in v0.3.0

func NewSerializer(doer Doer) Serializer

NewSerializer constructs a serializer wrapping the specified Doer. All calls to Doer.Do will occur in a single manager goroutine, but the calls can be made from almost any other goroutine. Note that Doer.Do cannot call any of the Call* methods of Serializer due to the potential for deadlocks.

type Worker

type Worker interface {
	// Call is the method used to submit data to be worked in a
	// call to the Runner.Run method.  It may return an error if
	// the worker has been shut down through a call to Wait.
	Call(data interface{}) error

	// Wait is called to shut down the worker and return the final
	// result; it will block the caller until all data has been
	// processed and all worker goroutines have stopped.  Note
	// that the final result, generated by Runner.Result, is saved
	// by Worker to satisfy later calls to Wait.  If Wait is
	// called before any calls to Call, the worker will go
	// straight to a stopped state, and no further Call calls may
	// be made; no error will be returned in that case.
	Wait() (interface{}, error)
}

Worker is an interface describing implementations of the parallelizer. A Worker is typically initialized by passing a Runner instance to a constructor; data submitted with Worker.Call is then passed to the Runner.Run methods, the results of which are integrated using Runner.Integrate; finally, the caller calls Worker.Wait to shut down the parallelizer, which in turn will call Worker.Result to obtain the final result of the processing.

func NewParallelWorker

func NewParallelWorker(runner Runner, workers int) Worker

NewParallelWorker constructs a worker utilizing a pool of worker goroutines. Parallel workers can receive Call and Wait invocations from almost any goroutine (with the exception of the goroutines running the Runner.Run and Runner.Integrate methods; the latter, however, receives an alternate implementation of Worker which is safe to use). A parallel worker is initialized with a desired number of worker goroutines; if that number is less than or equal to 0, the number of CPU cores detected by the go runtime will be used instead. (See runtime.NumCPU.)

func NewSynchronousWorker

func NewSynchronousWorker(runner Runner) Worker

NewSynchronousWorker constructs a synchronous worker. Synchronous workers do not utilize parallelism at all; they are provided to allow for transition from a single-threaded algorithm to a multithreaded one, or to enable optional parallelization in cases where ordering may be important for certain invocations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL