Documentation
¶
Index ¶
- Variables
- func Split[J any, R any](ctx context.Context, jobs <-chan J, funcs WorkerFuncs[J, R], ...) (<-chan R, <-chan error, func())
- func SplitSlice[J any, R any](ctx context.Context, jobs []J, funcs WorkerFuncs[J, R]) ([]R, error)
- type Splitter
- type SplitterOption
- type WorkerFuncs
- func FromErrorFunction[J any, R any](f func(J) (R, error), workerCount int) WorkerFuncs[J, R]
- func FromErrorFunctions[J any, R any](funcs []func(J) (R, error)) WorkerFuncs[J, R]
- func FromFunction[J any, R any](f func(J) R, workerCount int) WorkerFuncs[J, R]
- func FromFunctions[J any, R any](funcs []func(J) R) WorkerFuncs[J, R]
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrContextCancel = fmt.Errorf("context cancellation")
ErrContextCancel is returned on the error channel if the context passed to the constructor is Done before jobs finish processing. The splitter will stop doing work and close the results channel.
var ErrJobChannelFull = fmt.Errorf("job channel is full, retry in a hot second")
ErrJobChannelFull is returned on calls to Do where the internal job channel is full, the job is dropped and the user must try Do again if they want the given job processed.
var ErrUserCancel = fmt.Errorf("user cancellation")
ErrUserCancel is returned on the error channel if the user called Cancel before jobs finish processing. The splitter will stop doing work and close the results channel.
Functions ¶
func Split ¶
func Split[J any, R any](ctx context.Context, jobs <-chan J, funcs WorkerFuncs[J, R], opts ...SplitterOption) (<-chan R, <-chan error, func())
Split takes a job channel, WorkerFuncs, and options. It splits the work of processing jobs from the channel among the provided funcs. It returns a results channel, an error channel, and a cancel func. It will continue looking for jobs until the jobs channel is closed. Once it stops, it will close the results channel. Processing will also stop if the ctx is closed, if the close func is called, or if a workerFunc returns an error and stopOnError is true.
Example (Simple) ¶
ctx := context.Background() multFactory := func(m int) func(int) (int, error) { return func(x int) (int, error) { return x * m, nil } } jobs := make(chan int, 100) // passing 3 different functions, each gets a worker and will pull jobs and send results. funcs := []func(int) (int, error){multFactory(1), multFactory(2), multFactory(3)} // split the jobs among the funcs (exit if any fuction returns an error) results, errors, _ := Split[int, int](ctx, jobs, funcs, StopOnError()) for i := 0; i < 25; i++ { // add each job, can be done before or after passing to Split jobs <- i } // notify splitter routines that no more jobs are coming in close(jobs) // range over results works best because results is closed when all // jobs have been processed for x := range results { fmt.Println(x) } select { case err := <-errors: fmt.Printf("it failed %s\n", err) default: }
func SplitSlice ¶
SplitSlice is a wrapper around the split function that takes a slice of jobs and returns a slice of results where f(job[n]) == results[n].
Example (Simple) ¶
square := func(x int) int { return x * x } ctx := context.Background() jobs := []int{} for i := 0; i < 100; i++ { jobs = append(jobs, i) } results, _ := SplitSlice(ctx, jobs, FromFunction(square, 100)) for x := range results { fmt.Println(x) }
Types ¶
type Splitter ¶
Splitter splits the work of the jobs in a channel among a worker for each function it starts with. Results are put into a channel. Errors can also be read from a channel
Example (Simple) ¶
ctx := context.Background() square := func(x int) int { return x * x } // create a splitter, passing in a function and how many routines processing // jobs using this function you want sf := NewSplitter[int, int](ctx, FromFunction(square, 5)) for i := 0; i < 25; i++ { // add each job sf.Do(i) } // notify splitter that no more jobs are coming in sf.Done() // range over results works best because sf.Results() is closed when all // jobs have been processed for x := range sf.Results() { fmt.Println(x) }
func NewSplitter ¶
func NewSplitter[J any, R any](ctx context.Context, funcs WorkerFuncs[J, R], opts ...SplitterOption) *Splitter[J, R]
NewSplitter creates a Splitter that will read from the given job chan using workers that run the WorkerFuncs. The caller can add jobs to the channel before or after the splitter is created. Once the channel is closed, the splitter will exit after it finishes all the jobs inserted into the channel before the close.
func (*Splitter[J, R]) Cancel ¶
func (sf *Splitter[J, R]) Cancel()
Cancel forces the splitter to stop. The workers will exit (after they finish with the job they are currently processing).
func (*Splitter[J, R]) Do ¶
Do passes a job to the splitter's jobs channel, will return ErrJobChannelFull if the job channel is full. Jobs can be retried in this case.
func (*Splitter[J, R]) Done ¶
func (sf *Splitter[J, R]) Done()
Done closes the splitter's jobs channel. It signals that no more jobs are coming in and the workers will exit once they have completed all the pending jobs. The results channel will close after they exit
type SplitterOption ¶
type SplitterOption func(c *config)
func StopOnError ¶
func StopOnError() SplitterOption
func WithLogger ¶
func WithLogger(log logrus.FieldLogger) SplitterOption
type WorkerFuncs ¶
WorkerFuncs are needed by the splitter function. Each func gets its own worker and processes jobs independently of the other funcs. They can take any type and return any type, but they also need to return an error. This is because the splitter supports forwarding errors so all functions need to return them. There's a few convenience functions for converting common function signatures into type WorkerFunc.
func FromErrorFunction ¶
func FromErrorFunction[J any, R any](f func(J) (R, error), workerCount int) WorkerFuncs[J, R]
FromErrorFunction takes func f and creates WorkerFuncs where there's workerCount copies of a f
func FromErrorFunctions ¶
func FromErrorFunctions[J any, R any](funcs []func(J) (R, error)) WorkerFuncs[J, R]
FromErrorFunctions is just a cast, it doesn't do anything but it's here to complete the set. Also if I decide to make WorkerFuncs more than just a typedef, I'd need to add it anyway.
func FromFunction ¶
func FromFunction[J any, R any](f func(J) R, workerCount int) WorkerFuncs[J, R]
FromFunction takes func f and creates WorkerFuncs where there's workerCount copies of a function f' such that f'(J) -> (f(J), nil.(error))
func FromFunctions ¶
func FromFunctions[J any, R any](funcs []func(J) R) WorkerFuncs[J, R]
FromFunctions takes a slice of functions funcs and creates WorkerFuncs where for each func in funcs, there is a func f' where f'(J) -> (funcs[i](J), nil.(error))