splitter

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2022 License: MIT Imports: 4 Imported by: 0

README

split-for

Go Reference

A little package for handling all the boilerplate stuff for splitting jobs amongst a pool of workers. There's a few ways to use based on how you want to interact with the workers.

As a Function

Split[J any, R any]

The actual logic backing everything else. Internally this function spins up goroutines for each worker func it is given and has them process jobs from a passed in channel until that channel closes. It puts results into a results channelc. It also handles premature stopping due to context cancellation, user cancellation, or error return from one of the worker funcs. As long as the user closes the jobs channel (or one of the stopping events happens), all goroutines will be cleaned up.

ctx := context.Background()

multFactory := func(m int) func(int) (int, error) {
    return func(x int) (int, error) { return x * m, nil }
}

jobs := make(chan int, 100)
// passing 3 different functions, each gets a worker and will pull jobs and send results.
funcs := []func(int) (int, error){multFactory(1), multFactory(2), multFactory(3)}
// split the jobs among the funcs (exit if any fuction returns an error)
results, errors, _ := Split[int, int](ctx, jobs, funcs, StopOnError())
for i := 0; i < 25; i++ {
    // add each job, can be done before or after passing to Split
    jobs <- i
}
// notify splitter routines that no more jobs are coming in
close(jobs)

// range over results works best because results is closed when all
// jobs have been processed
for x := range results {
    fmt.Println(x)
}

select {
case err := <-errors:
    fmt.Printf("it failed %s\n", err)
default:
}
SplitSlice[J any, R any]

Send a slice of jobs and get back a slice of results in the same order. This turns Split into a synchronous function, but simplifies the interface.

square := func(x int) int {
    return x * x
}

ctx := context.Background()
jobs := []int{}
for i := 0; i < 100; i++ {
    jobs = append(jobs, i)
}

results, _ := SplitSlice(ctx, jobs, square, 100)

for x := range results {
    fmt.Println(x)
}

As a Struct

Splitter[J any, R any]

A struct wrapped around a call to Split[J,R]. Replaces the jobs channel with methods Do and Done and gives a Close() method.

ctx := context.Background()

square := func(x int) int {
    return x * x
}

// create a splitter, passing in a function and how many routines processing
// jobs using this function you want
sf := NewSplitter[int, int](ctx, FromFunction(square, 5))
for i := 0; i < 25; i++ {
    // add each job
    sf.Do(i)
}
// notify splitter that no more jobs are coming in
sf.Done()

// range over results works best because sf.Results() is closed when all
// jobs have been processed
for x := range sf.Results() {
    fmt.Println(x)
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrContextCancel = fmt.Errorf("context cancellation")

ErrContextCancel is returned on the error channel if the context passed to the constructor is Done before jobs finish processing. The splitter will stop doing work and close the results channel.

View Source
var ErrJobChannelFull = fmt.Errorf("job channel is full, retry in a hot second")

ErrJobChannelFull is returned on calls to Do where the internal job channel is full, the job is dropped and the user must try Do again if they want the given job processed.

View Source
var ErrUserCancel = fmt.Errorf("user cancellation")

ErrUserCancel is returned on the error channel if the user called Cancel before jobs finish processing. The splitter will stop doing work and close the results channel.

Functions

func Split

func Split[J any, R any](ctx context.Context, jobs <-chan J, funcs WorkerFuncs[J, R], opts ...SplitterOption) (<-chan R, <-chan error, func())

Split takes a job channel, WorkerFuncs, and options. It splits the work of processing jobs from the channel among the provided funcs. It returns a results channel, an error channel, and a cancel func. It will continue looking for jobs until the jobs channel is closed. Once it stops, it will close the results channel. Processing will also stop if the ctx is closed, if the close func is called, or if a workerFunc returns an error and stopOnError is true.

Example (Simple)
ctx := context.Background()

multFactory := func(m int) func(int) (int, error) {
	return func(x int) (int, error) { return x * m, nil }
}

jobs := make(chan int, 100)
// passing 3 different functions, each gets a worker and will pull jobs and send results.
funcs := []func(int) (int, error){multFactory(1), multFactory(2), multFactory(3)}
// split the jobs among the funcs (exit if any fuction returns an error)
results, errors, _ := Split[int, int](ctx, jobs, funcs, StopOnError())
for i := 0; i < 25; i++ {
	// add each job, can be done before or after passing to Split
	jobs <- i
}
// notify splitter routines that no more jobs are coming in
close(jobs)

// range over results works best because results is closed when all
// jobs have been processed
for x := range results {
	fmt.Println(x)
}

select {
case err := <-errors:
	fmt.Printf("it failed %s\n", err)
default:
}

func SplitSlice

func SplitSlice[J any, R any](ctx context.Context, jobs []J, funcs WorkerFuncs[J, R]) ([]R, error)

SplitSlice is a wrapper around the split function that takes a slice of jobs and returns a slice of results where f(job[n]) == results[n].

Example (Simple)
square := func(x int) int {
	return x * x
}

ctx := context.Background()
jobs := []int{}
for i := 0; i < 100; i++ {
	jobs = append(jobs, i)
}

results, _ := SplitSlice(ctx, jobs, FromFunction(square, 100))

for x := range results {
	fmt.Println(x)
}

Types

type Splitter

type Splitter[J any, R any] struct {
	// contains filtered or unexported fields
}

Splitter splits the work of the jobs in a channel among a worker for each function it starts with. Results are put into a channel. Errors can also be read from a channel

Example (Simple)
ctx := context.Background()

square := func(x int) int {
	return x * x
}

// create a splitter, passing in a function and how many routines processing
// jobs using this function you want
sf := NewSplitter[int, int](ctx, FromFunction(square, 5))
for i := 0; i < 25; i++ {
	// add each job
	sf.Do(i)
}
// notify splitter that no more jobs are coming in
sf.Done()

// range over results works best because sf.Results() is closed when all
// jobs have been processed
for x := range sf.Results() {
	fmt.Println(x)
}

func NewSplitter

func NewSplitter[J any, R any](ctx context.Context, funcs WorkerFuncs[J, R], opts ...SplitterOption) *Splitter[J, R]

NewSplitter creates a Splitter that will read from the given job chan using workers that run the WorkerFuncs. The caller can add jobs to the channel before or after the splitter is created. Once the channel is closed, the splitter will exit after it finishes all the jobs inserted into the channel before the close.

func (*Splitter[J, R]) Cancel

func (sf *Splitter[J, R]) Cancel()

Cancel forces the splitter to stop. The workers will exit (after they finish with the job they are currently processing).

func (*Splitter[J, R]) Do

func (sf *Splitter[J, R]) Do(job J) error

Do passes a job to the splitter's jobs channel, will return ErrJobChannelFull if the job channel is full. Jobs can be retried in this case.

func (*Splitter[J, R]) Done

func (sf *Splitter[J, R]) Done()

Done closes the splitter's jobs channel. It signals that no more jobs are coming in and the workers will exit once they have completed all the pending jobs. The results channel will close after they exit

func (*Splitter[J, R]) Errors

func (sf *Splitter[J, R]) Errors() <-chan error

Errors gives the Splitter's error channel. Any error from the workers along with context or user cancel will be passed to the error channel.

func (*Splitter[J, R]) Results

func (sf *Splitter[J, R]) Results() <-chan R

Results gives the Splitter's results channel. All results from the workers are sent to this channel. Closed by the Splitter after workers exit.

type SplitterOption

type SplitterOption func(c *config)

func StopOnError

func StopOnError() SplitterOption

func WithLogger

func WithLogger(log logrus.FieldLogger) SplitterOption

type WorkerFuncs

type WorkerFuncs[J any, R any] []func(J) (R, error)

WorkerFuncs are needed by the splitter function. Each func gets its own worker and processes jobs independently of the other funcs. They can take any type and return any type, but they also need to return an error. This is because the splitter supports forwarding errors so all functions need to return them. There's a few convenience functions for converting common function signatures into type WorkerFunc.

func FromErrorFunction

func FromErrorFunction[J any, R any](f func(J) (R, error), workerCount int) WorkerFuncs[J, R]

FromErrorFunction takes func f and creates WorkerFuncs where there's workerCount copies of a f

func FromErrorFunctions

func FromErrorFunctions[J any, R any](funcs []func(J) (R, error)) WorkerFuncs[J, R]

FromErrorFunctions is just a cast, it doesn't do anything but it's here to complete the set. Also if I decide to make WorkerFuncs more than just a typedef, I'd need to add it anyway.

func FromFunction

func FromFunction[J any, R any](f func(J) R, workerCount int) WorkerFuncs[J, R]

FromFunction takes func f and creates WorkerFuncs where there's workerCount copies of a function f' such that f'(J) -> (f(J), nil.(error))

func FromFunctions

func FromFunctions[J any, R any](funcs []func(J) R) WorkerFuncs[J, R]

FromFunctions takes a slice of functions funcs and creates WorkerFuncs where for each func in funcs, there is a func f' where f'(J) -> (funcs[i](J), nil.(error))

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL