Documentation
¶
Overview ¶
gopp is package restricts max num of parallel processing.
Example
// Context
ctx, cancel := context.WithCancel(context.Background())
// can set a timeout for the entire process using context.WithTimeout
//ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer cancel()
// New
// process result is int (and error)
p := gopp.New[int](
// context
ctx,
// max num of parallel processing is 4
gopp.Procs(4),
// timeout for each process is 3s
gopp.RunnerTimeout(3*time.Second))
// Add Runner
r := gopp.NewRunner[int](func(ctx context.Context) (int, error) {
// heavy process
time.Sleep(time.Second)
return 123, nil
})
for i := 0; i < 100; i++ {
p.Add(r)
}
// Wait and receive results
ress := p.Wait()
for _, res := range ress {
if res.Err != nil {
fmt.Printf("Error: %v\n", res.Err)
continue
}
fmt.Println(res.Value)
}
// Don't reuse p, because it will not working properly.
// If you want to reuse it, please recreate it.
p = gopp.New[int](context.Background(), gopp.Procs(4))
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/shiolier/gopp"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
// can set a timeout for the entire process using context.WithTimeout
//ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer cancel()
// New
// process result is int (and error)
p := gopp.New[int](
// context
ctx,
// max num of parallel processing is 4
gopp.Procs(4),
// timeout for each process is 3s
gopp.RunnerTimeout(3*time.Second))
// Add Runner
rs := make([]gopp.Runner[int], 10)
for i := 0; i < 10; i++ {
rs[i] = &SampleRunner{i}
// can also add each time.
//p.Add(&SampleRunner{i})
}
p.Add(rs...)
// Receive the result
loop: // label for break
for {
select {
case res := <-p.Result(): // receive the result
// error check
if res.Err != nil {
fmt.Printf("Error: %v\n", res.Err)
// If you want to stop other processing, please call cancel.
cancel()
// But you should use continue instead of break to receive all results (including context.Canceled error).
// Otherwise the sender will block and cause a goroutine leak.
continue
}
fmt.Println(res.Value)
// can also add Runner here
//p.Add(&SampleRunner{10})
case <-p.Done(): // all processing done
// break for
break loop
}
}
// or
// ress := p.Wait()
// for _, res := range ress {
// // omit
// }
// p cannot be reused, because it will not working properly.
// If you want to use it again, please recreate it in New.
//p = gopp.New[int](ctx, gopp.ProcsNumCPU())
}
type SampleRunner struct {
n int
}
func (s *SampleRunner) Run(ctx context.Context) (int, error) {
// heavy process
time.Sleep(time.Second)
return s.n * s.n, nil
}
Output: 0 1 4 9 16 25 36 49 64 81
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ProcsDefault = 1
default max num of parallel processing
Functions ¶
This section is empty.
Types ¶
type ErrContextDone ¶
type ErrContextDone struct {
// Err is the return value of context.Cause(ctx)
Err error
}
ErrContextDone is error with context.Done.
func (*ErrContextDone) Error ¶
func (e *ErrContextDone) Error() string
func (*ErrContextDone) Unwrap ¶
func (e *ErrContextDone) Unwrap() error
Unwrap returns e.Err that is context.Cause(ctx).
type Parallel ¶
type Parallel[T any] interface { // Add adds Runner(s) Add(rs ...Runner[T]) error // Result returns channel for receiving Result Result() <-chan *Result[T] // Done returns channel that will be closed when all processing is done. Done() <-chan struct{} // Wait blocks until all processing to done, and returns all results. Wait() []*Result[T] }
Click to show internal directories.
Click to hide internal directories.