gopp

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2023 License: MIT Imports: 6 Imported by: 0

README

GOPP - Go parallel processing

Go Reference

GOPP is a Go library for restricting max num of parallel processing.

Import

import "github.com/shiolier/gopp"

Example


import (
	"context"
	"fmt"
	"time"

	"github.com/shiolier/gopp"
)

func Example() {
	ctx, cancel := context.WithCancel(context.Background())
	// can set a timeout for the entire process using context.WithTimeout
	//ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
	defer cancel()

	// New
	// process result is int (and error)
	p := gopp.New[int](
		// context
		ctx,
		// max num of parallel processing is 4
		gopp.Procs(4),
		// timeout for each process is 3s
		gopp.RunnerTimeout(3*time.Second))

	// Add Runner
	rs := make([]gopp.Runner[int], 10)
	for i := 0; i < 10; i++ {
		rs[i] = &SampleRunner{i}
		// can also add each time.
		//p.Add(&SampleRunner{i})
	}
	p.Add(rs...)

	// Receive the result
loop: // label for break
	for {
		select {
		case res := <-p.Result(): // receive the result
			// error check
			if res.Err != nil {
				fmt.Printf("Error: %v\n", res.Err)
				// If you want to stop other processing, please call cancel.
				cancel()
				// But you should use continue instead of break to receive all results (including context.Canceled error).
				// Otherwise the sender will block and cause a goroutine leak.
				continue
			}
			fmt.Println(res.Value)

			// can also add Runner here
			//p.Add(&SampleRunner{10})
		case <-p.Done(): // all processing done
			// break for
			break loop
		}
	}
	// or
	// ress := p.Wait()
	// for _, res := range ress {
	// 	// omit
	// }

	// p cannot be reused, because it will not working properly.
	// If you want to use it again, please recreate it in New.
	//p = gopp.New[int](ctx, gopp.ProcsNumCPU())

	// Unordered output:
	// 0
	// 1
	// 4
	// 9
	// 16
	// 25
	// 36
	// 49
	// 64
	// 81
}

type SampleRunner struct {
	n int
}

func (s *SampleRunner) Run(ctx context.Context) (int, error) {
	// heavy process
	time.Sleep(time.Second)
	return s.n * s.n, nil
}

License

MIT License

Documentation

Overview

gopp is package restricts max num of parallel processing.

Example

// Context
ctx, cancel := context.WithCancel(context.Background())
// can set a timeout for the entire process using context.WithTimeout
//ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer cancel()

// New
// process result is int (and error)
p := gopp.New[int](
	// context
	ctx,
	// max num of parallel processing is 4
	gopp.Procs(4),
	// timeout for each process is 3s
	gopp.RunnerTimeout(3*time.Second))

// Add Runner
r := gopp.NewRunner[int](func(ctx context.Context) (int, error) {
	// heavy process
	time.Sleep(time.Second)
	return 123, nil
})
for i := 0; i < 100; i++ {
	p.Add(r)
}

// Wait and receive results
ress := p.Wait()
for _, res := range ress {
	if res.Err != nil {
		fmt.Printf("Error: %v\n", res.Err)
		continue
	}
	fmt.Println(res.Value)
}

// Don't reuse p, because it will not working properly.
// If you want to reuse it, please recreate it.
p = gopp.New[int](context.Background(), gopp.Procs(4))
Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/shiolier/gopp"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	// can set a timeout for the entire process using context.WithTimeout
	//ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
	defer cancel()

	// New
	// process result is int (and error)
	p := gopp.New[int](
		// context
		ctx,
		// max num of parallel processing is 4
		gopp.Procs(4),
		// timeout for each process is 3s
		gopp.RunnerTimeout(3*time.Second))

	// Add Runner
	rs := make([]gopp.Runner[int], 10)
	for i := 0; i < 10; i++ {
		rs[i] = &SampleRunner{i}
		// can also add each time.
		//p.Add(&SampleRunner{i})
	}
	p.Add(rs...)

	// Receive the result
loop: // label for break
	for {
		select {
		case res := <-p.Result(): // receive the result
			// error check
			if res.Err != nil {
				fmt.Printf("Error: %v\n", res.Err)
				// If you want to stop other processing, please call cancel.
				cancel()
				// But you should use continue instead of break to receive all results (including context.Canceled error).
				// Otherwise the sender will block and cause a goroutine leak.
				continue
			}
			fmt.Println(res.Value)

			// can also add Runner here
			//p.Add(&SampleRunner{10})
		case <-p.Done(): // all processing done
			// break for
			break loop
		}
	}
	// or
	// ress := p.Wait()
	// for _, res := range ress {
	// 	// omit
	// }

	// p cannot be reused, because it will not working properly.
	// If you want to use it again, please recreate it in New.
	//p = gopp.New[int](ctx, gopp.ProcsNumCPU())

}

type SampleRunner struct {
	n int
}

func (s *SampleRunner) Run(ctx context.Context) (int, error) {
	// heavy process
	time.Sleep(time.Second)
	return s.n * s.n, nil
}
Output:

0
1
4
9
16
25
36
49
64
81

Index

Examples

Constants

This section is empty.

Variables

View Source
var ProcsDefault = 1

default max num of parallel processing

Functions

This section is empty.

Types

type ErrContextDone

type ErrContextDone struct {
	// Err is the return value of context.Cause(ctx)
	Err error
}

ErrContextDone is error with context.Done.

func (*ErrContextDone) Error

func (e *ErrContextDone) Error() string

func (*ErrContextDone) Unwrap

func (e *ErrContextDone) Unwrap() error

Unwrap returns e.Err that is context.Cause(ctx).

type Option

type Option func(*option)

Option is option for New.

func Procs

func Procs(n int) Option

Procs specifies max num of parallel processing.

func ProcsNumCPU

func ProcsNumCPU() Option

ProcsNumCPU sets runtime.NumCPU to Procs.

func RunnerTimeout

func RunnerTimeout(d time.Duration) Option

RunnerTimeout specifies timeout of Runner.

If 0 or less is specified, no timeout is set.

type Parallel

type Parallel[T any] interface {
	// Add adds Runner(s)
	Add(rs ...Runner[T]) error
	// Result returns channel for receiving Result
	Result() <-chan *Result[T]
	// Done returns channel that will be closed when all processing is done.
	Done() <-chan struct{}
	// Wait blocks until all processing to done, and returns all results.
	Wait() []*Result[T]
}

func New

func New[T any](ctx context.Context, opts ...Option) Parallel[T]

New returns Parallel.

type Result

type Result[T any] struct {
	Value T
	Err   error
}

Result is result of Runner.

type Runner

type Runner[T any] interface {
	Run(context.Context) (T, error)
}

Runner is a process of parallel processing.

func NewRunner

func NewRunner[T any](fn func(context.Context) (T, error)) Runner[T]

NewRunner converts func(context.Context) (T, error) to Runner

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL