pooling

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2024 License: MIT Imports: 3 Imported by: 0

Documentation

Overview

The pooling package allows one to dispatch an infinite number of functions to be executed in parallel while still limiting the number of routines.

For that, pooling package takes advantage of ants pool library. A pooling Pooler can have multiple pools (with builder SetSizes) to dispatch sub functions into different pools of routines.

When sending a function into the pooler (with the appropriate channel), this function can itself send other functions into the pooler. It allows one to "split" functions executions (like iterating over a slice and each element handled in parallel).

func main() {
	log := logrus.WithContext(context.Background())

	pooler, err := pooling.NewPoolerBuilder().
		SetSizes(10, 500, ...). // each size will initialize a pool with given size
		SetOptions(ants.WithLogger(log)).
		Build()
	if err != nil {
		panic(err)
	}
	defer pooler.Close()

	input := ReadFrom()

	// Read function is blocking until input is closed
	// and all running routines have ended
	pooler.Read(input)
}

func ReadFrom() <-chan pooling.PoolerFunc {
	input := make(chan pooling.PoolerFunc)

	go func() {
		// close input to stop blocking function Read once all elements are sent to input
		defer close(input)

		// do something populating input channel
		for i := range 100 {
			input <- HandleInt(i)
		}
	}()

	return input
}

func HandleInt(i int) pooling.PoolerFunc {
	return func(funcs chan<- pooling.PoolerFunc) {
		// you may handle the integer whichever you want
		// funcs channel is present to dispatch again some elements into a channel handled by the pooler
	}
}

Index

Constants

This section is empty.

Variables

View Source
var ErrMinimalSizes = errors.New("pooler pools size must be at least 1")

ErrMinimalSizes is the error returned by Build in case the PoolerBuilder doesn't have any size of pool to create.

Functions

This section is empty.

Types

type Pooler

type Pooler struct {
	// contains filtered or unexported fields
}

Pooler represents a slice of pools alongside a waitgroup to handle functions (like a queue).

A Pooler contains a slice of pools to allow each functions (given as channel in Read) to send functions (recursively and indefinitely) into the next pool.

func (*Pooler) Close

func (p *Pooler) Close()

Close waits for all Pooler funcs to be ended and then closes all Pooler pools.

func (*Pooler) Read

func (p *Pooler) Read(funcs <-chan PoolerFunc)

Read reads indefinitely (until closed) the input channel. It will wait at the end (when closed) for all functions executions to be ended before giving back the hand.

type PoolerBuilder

type PoolerBuilder struct {
	// contains filtered or unexported fields
}

PoolerBuilder is the builder for Pooler. It takes input options to tune Pooler behavior.

func NewPoolerBuilder

func NewPoolerBuilder() *PoolerBuilder

NewPoolerBuilder creates a new PoolerBuilder.

func (*PoolerBuilder) Build

func (p *PoolerBuilder) Build() (*Pooler, error)

Build builds the Pooler associated to PoolerBuilder and returns an error in case the ants pools creation fails.

func (*PoolerBuilder) SetOptions

func (p *PoolerBuilder) SetOptions(options ...ants.Option) *PoolerBuilder

SetOptions takes a slice of ants options for Pooler pools.

func (*PoolerBuilder) SetSizes

func (p *PoolerBuilder) SetSizes(sizes ...int16) *PoolerBuilder

SetSizes takes a slice of integers where each one will represent the size of an ants pool.

type PoolerFunc

type PoolerFunc func(funcs chan<- PoolerFunc)

PoolerFunc represents a function to be given into Pooler channel consumption.

Input channel closing is handled by Pooler so don't close it yourself.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL