package module
Version: v0.0.0-...-591a77e Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2021 License: MIT Imports: 6 Imported by: 0



Build Status Coverage Status Go Report Card


A golang parallel library, used for business logic aggregation and refactory without changing function declaration.


go get



There are three methods: testjobA, testjobB, testjobC, execute them in parallel:

import (

func testJobA() string {
	return "job"

func testJobB(x, y int) int {
	return x + y

func testJobC(x int) int {
	return -x

func main() {
	var s string
	var x, y int

	p := parallel.NewParallel()

	p.Register(testJobB, 1, 2).SetReceivers(&x)
	p.Register(testJobC, 3).SetReceivers(&y)
	// block here

	if s != "job" || x != 3 || y != -3{
		panic("unexpected result")


Let's see a little complex case, there are three parallel jobs: jobA, jobB, jobC and a final Job which aggregates the result. The final depends on jobA and middle which depends on jobB and jobC.

jobA  jobB   jobC
 \      \     /
  \      \   /
   \      middle
    \      /
     \    /

Refer to the demo below:

import (

type middle struct {
	B int
	C int

type testResult struct {
	A string
	M middle

func testJobA() string {
	return "job"

func testJobB(x, y int) int {
	return x + y

func testJobC(x int) int {
	return -x

func testFinal(s *string, m *middle) testResult {
	return testResult{
		*s, *m,

func main() {
	var m middle
	var s string
	var res testResult

	p := parallel.NewParallel()

	// Create a child 1
	child1 := p.NewChild()

	// Create another child 2
	child2 := p.NewChild()
	child2.Register(testJobB, 1, 2).SetReceivers(&m.B)
	child2.Register(testJobC, 2).SetReceivers(&m.C)

	p.Register(testFinal, &s, &m).SetReceivers(&res)
	// block here

	expect := testResult{
			3, -2,
	if res != expect {
		panic("unexpected result")


By default, Parallel will ignore panics of jobs. But parallel supports customized exception handler, which is used for dealing with unexpected panics. For example, alerting or logging.

// handle the panic
func exceptionHandler(topic string, e interface{}) {
	fmt.Println(topic, e)

// will panic
func exceptionJob() {
	var a map[string]int
	//assignment to entry in nil map
	a["123"] = 1

func main() {
	p := parallel.NewParallel()
	// miss the last argument on purpose
	p.Except(exceptionHandler, "topic1")

more examples




This section is empty.


View Source
var (
	ErrArgNotFunction    = errors.New("argument type not function")            //argument type not function
	ErrInArgLenNotMatch  = errors.New("input arguments length not match")      //input arguments length not match
	ErrOutArgLenNotMatch = errors.New("output arguments length not match")     //output arguments length not match
	ErrRecvArgTypeNotPtr = errors.New("receiver argument type is not pointer") //receiver argument type is not pointer
	ErrRecvArgNil        = errors.New("receiver argument must not be nil")     //receiver argument must not be nil


This section is empty.


type Handler

type Handler struct {
	// contains filtered or unexported fields

Handler instance

func NewHandler

func NewHandler(f interface{}, args ...interface{}) *Handler

NewHandler create a new Handler which contains a single function call

func (*Handler) Do

func (h *Handler) Do()

Do call the function and return values if exists

func (*Handler) OnExcept

func (h *Handler) OnExcept(e interface{})

OnExcept will executed by parallel when application panic occur Note that the type of e is unknown.

func (*Handler) SetReceivers

func (h *Handler) SetReceivers(receivers ...interface{}) *Handler

SetReceivers sets the receivers of return values

type Parallel

type Parallel struct {
	// contains filtered or unexported fields

Parallel instance, which executes pipelines by parallel

func NewParallel

func NewParallel() *Parallel

NewParallel creates a new Parallel instance

func (*Parallel) Add

func (p *Parallel) Add(pipes ...*Pipeline) *Parallel

Add add new pipelines to parallel

func (*Parallel) AddChildren

func (p *Parallel) AddChildren(children ...*Parallel) *Parallel

AddChildren add children to parallel to handle dependency

func (*Parallel) Except

func (p *Parallel) Except(f interface{}, args ...interface{}) *Handler

Except set the exception handling routine, when unexpected panic occur this routine will be executed.

func (*Parallel) NewChild

func (p *Parallel) NewChild() *Parallel

NewChild create a new child of p

func (*Parallel) NewPipeline

func (p *Parallel) NewPipeline() *Pipeline

NewPipeline create a new pipeline of parallel

func (*Parallel) Register

func (p *Parallel) Register(f interface{}, args ...interface{}) *Handler

Register add a new pipeline with a single handler info parallel

func (*Parallel) Run

func (p *Parallel) Run()

Run start up all the jobs

func (*Parallel) RunWithTimeOut

func (p *Parallel) RunWithTimeOut(d time.Duration)

RunWithTimeOut start up all the jobs, and time out after d duration

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields

Pipeline instance, which executes jobs by serial

func NewPipeline

func NewPipeline() *Pipeline

NewPipeline creates a new Pipeline instance

func (*Pipeline) Add

func (p *Pipeline) Add(hs ...*Handler) *Pipeline

Add add new handlers to pipeline

func (*Pipeline) Do

func (p *Pipeline) Do()

Do calls all handlers as the sequence they are added into pipeline.

func (*Pipeline) Register

func (p *Pipeline) Register(f interface{}, args ...interface{}) *Handler

Register add a new function to pipeline


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL