parallel

package module
v0.0.0-...-591a77e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2021 License: MIT Imports: 6 Imported by: 0

README

Parallel

Build Status Coverage Status Go Report Card

zh_CN

A golang parallel library, used for business logic aggregation and refactory without changing function declaration.

Install

go get github.com/buptmiao/parallel

Usage

eg.1

There are three methods: testjobA, testjobB, testjobC, execute them in parallel:

import (
	"github.com/buptmiao/parallel"
)

func testJobA() string {
	return "job"
}

func testJobB(x, y int) int {
	return x + y
}

func testJobC(x int) int {
	return -x
}

func main() {
	var s string
	var x, y int

	p := parallel.NewParallel()

	p.Register(testJobA).SetReceivers(&s)
	p.Register(testJobB, 1, 2).SetReceivers(&x)
	p.Register(testJobC, 3).SetReceivers(&y)
	// block here
	p.Run()

	if s != "job" || x != 3 || y != -3{
		panic("unexpected result")
	}
}

eg.2

Let's see a little complex case, there are three parallel jobs: jobA, jobB, jobC and a final Job which aggregates the result. The final depends on jobA and middle which depends on jobB and jobC.

jobA  jobB   jobC
 \      \     /
  \      \   /
   \      middle
    \      /
     \    /
     final

Refer to the demo below:

import (
	"github.com/buptmiao/parallel"
)

type middle struct {
	B int
	C int
}

type testResult struct {
	A string
	M middle
}

func testJobA() string {
	return "job"
}

func testJobB(x, y int) int {
	return x + y
}

func testJobC(x int) int {
	return -x
}

func testFinal(s *string, m *middle) testResult {
	return testResult{
		*s, *m,
	}
}

func main() {
	var m middle
	var s string
	var res testResult

	p := parallel.NewParallel()

	// Create a child 1
	child1 := p.NewChild()
	child1.Register(testJobA).SetReceivers(&s)

	// Create another child 2
	child2 := p.NewChild()
	child2.Register(testJobB, 1, 2).SetReceivers(&m.B)
	child2.Register(testJobC, 2).SetReceivers(&m.C)

	p.Register(testFinal, &s, &m).SetReceivers(&res)
	// block here
	p.Run()

	expect := testResult{
		"job",
		middle{
			3, -2,
		},
	}
	if res != expect {
		panic("unexpected result")
	}
}

eg.3

By default, Parallel will ignore panics of jobs. But parallel supports customized exception handler, which is used for dealing with unexpected panics. For example, alerting or logging.

// handle the panic
func exceptionHandler(topic string, e interface{}) {
	fmt.Println(topic, e)
}

// will panic
func exceptionJob() {
	var a map[string]int
	//assignment to entry in nil map
	a["123"] = 1
}

func main() {
	p := parallel.NewParallel()
	p.Register(exceptionJob)
	// miss the last argument on purpose
	p.Except(exceptionHandler, "topic1")
	p.Run()
}

more examples

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrArgNotFunction    = errors.New("argument type not function")            //argument type not function
	ErrInArgLenNotMatch  = errors.New("input arguments length not match")      //input arguments length not match
	ErrOutArgLenNotMatch = errors.New("output arguments length not match")     //output arguments length not match
	ErrRecvArgTypeNotPtr = errors.New("receiver argument type is not pointer") //receiver argument type is not pointer
	ErrRecvArgNil        = errors.New("receiver argument must not be nil")     //receiver argument must not be nil
)

Functions

This section is empty.

Types

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler instance

func NewHandler

func NewHandler(f interface{}, args ...interface{}) *Handler

NewHandler create a new Handler which contains a single function call

func (*Handler) Do

func (h *Handler) Do()

Do call the function and return values if exists

func (*Handler) OnExcept

func (h *Handler) OnExcept(e interface{})

OnExcept will executed by parallel when application panic occur Note that the type of e is unknown.

func (*Handler) SetReceivers

func (h *Handler) SetReceivers(receivers ...interface{}) *Handler

SetReceivers sets the receivers of return values

type Parallel

type Parallel struct {
	// contains filtered or unexported fields
}

Parallel instance, which executes pipelines by parallel

func NewParallel

func NewParallel() *Parallel

NewParallel creates a new Parallel instance

func (*Parallel) Add

func (p *Parallel) Add(pipes ...*Pipeline) *Parallel

Add add new pipelines to parallel

func (*Parallel) AddChildren

func (p *Parallel) AddChildren(children ...*Parallel) *Parallel

AddChildren add children to parallel to handle dependency

func (*Parallel) Except

func (p *Parallel) Except(f interface{}, args ...interface{}) *Handler

Except set the exception handling routine, when unexpected panic occur this routine will be executed.

func (*Parallel) NewChild

func (p *Parallel) NewChild() *Parallel

NewChild create a new child of p

func (*Parallel) NewPipeline

func (p *Parallel) NewPipeline() *Pipeline

NewPipeline create a new pipeline of parallel

func (*Parallel) Register

func (p *Parallel) Register(f interface{}, args ...interface{}) *Handler

Register add a new pipeline with a single handler info parallel

func (*Parallel) Run

func (p *Parallel) Run()

Run start up all the jobs

func (*Parallel) RunWithTimeOut

func (p *Parallel) RunWithTimeOut(d time.Duration)

RunWithTimeOut start up all the jobs, and time out after d duration

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline instance, which executes jobs by serial

func NewPipeline

func NewPipeline() *Pipeline

NewPipeline creates a new Pipeline instance

func (*Pipeline) Add

func (p *Pipeline) Add(hs ...*Handler) *Pipeline

Add add new handlers to pipeline

func (*Pipeline) Do

func (p *Pipeline) Do()

Do calls all handlers as the sequence they are added into pipeline.

func (*Pipeline) Register

func (p *Pipeline) Register(f interface{}, args ...interface{}) *Handler

Register add a new function to pipeline

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL