parallelisation

package
v1.117.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2025 License: Apache-2.0 Imports: 11 Imported by: 6

Documentation

Overview

Package parallelisation defines a module for `Concurrency`

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BreakOnError added in v1.117.0

func BreakOnError(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error

BreakOnError executes each functions in the group until an error is found or the context gets cancelled.

func CloseAll added in v1.106.0

func CloseAll(cs ...io.Closer) error

CloseAll calls concurrently Close on all io.Closer implementations passed as arguments and returns the first error encountered

func CloseAllAndCollateErrors added in v1.114.0

func CloseAllAndCollateErrors(cs ...io.Closer) error

CloseAllAndCollateErrors calls concurrently Close on all io.Closer implementations passed as arguments and returns the errors encountered

func CloseAllFunc added in v1.106.0

func CloseAllFunc(cs ...CloseFunc) error

CloseAllFunc calls concurrently all Close functions passed as arguments and returns the first error encountered

func CloseAllFuncAndCollateErrors added in v1.114.0

func CloseAllFuncAndCollateErrors(cs ...CloseFunc) error

CloseAllFuncAndCollateErrors calls concurrently all Close functions passed as arguments and returns the errors encountered

func CloseAllWithContext added in v1.106.0

func CloseAllWithContext(ctx context.Context, cs ...io.Closer) error

CloseAllWithContext is similar to CloseAll but can be controlled using a context.

func CloseAllWithContextAndCollateErrors added in v1.114.0

func CloseAllWithContextAndCollateErrors(ctx context.Context, cs ...io.Closer) error

CloseAllWithContextAndCollateErrors is similar to CloseAllAndCollateErrors but can be controlled using a context.

func DetermineContextError

func DetermineContextError(ctx context.Context) error

DetermineContextError determines what the context error is if any.

func Filter added in v1.110.0

func Filter[T any](ctx context.Context, numWorkers int, s []T, f func(T) bool) (result []T, err error)

Filter is similar to collection.Filter but uses parallelisation.

func ForEach added in v1.117.0

func ForEach(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error

ForEach executes all the contextual functions according to the store options and returns an error if one occurred.

func Map added in v1.110.0

func Map[T1 any, T2 any](ctx context.Context, numWorkers int, s []T1, f func(T1) T2) (result []T2, err error)

Map is similar to collection.Map but uses parallelisation.

func Parallelise

func Parallelise(argList any, action func(arg any) (any, error), resultType reflect.Type) (results any, err error)

Parallelise parallelises an action over as many goroutines as specified by the argList and retrieves all the results when all the goroutines are done. To control the number of goroutines spawned, prefer WorkerPool

func Reject added in v1.110.0

func Reject[T any](ctx context.Context, numWorkers int, s []T, f func(T) bool) ([]T, error)

Reject is the opposite of Filter and returns the elements of collection for which the filtering function f returns false.

func RunActionWithParallelCheck

func RunActionWithParallelCheck(ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) bool, checkPeriod time.Duration) error

RunActionWithParallelCheck runs an action with a check in parallel The function performing the check should return true if the check was favourable; false otherwise. If the check did not have the expected result and the whole function would be cancelled.

func RunActionWithTimeout

func RunActionWithTimeout(blockingAction func(stop chan bool) error, timeout time.Duration) (err error)

RunActionWithTimeout runs an action with timeout

func RunActionWithTimeoutAndCancelStore

func RunActionWithTimeoutAndCancelStore(ctx context.Context, timeout time.Duration, store *CancelFunctionStore, blockingAction func(context.Context) error) error

RunActionWithTimeoutAndCancelStore runs an action with timeout The cancel ExecutionGroup is used just to register the cancel function so that it can be called on Cancel.

func RunActionWithTimeoutAndContext

func RunActionWithTimeoutAndContext(ctx context.Context, timeout time.Duration, blockingAction func(context.Context) error) error

RunActionWithTimeoutAndContext runs an action with timeout blockingAction's context will be cancelled on exit.

func SafeSchedule added in v1.40.0

func SafeSchedule(ctx context.Context, period time.Duration, offset time.Duration, f func(context.Context, time.Time))

SafeSchedule calls function `f` regularly with a `period` and an `offset`, similarly to Schedule but with context control.

func SafeScheduleAfter added in v1.40.0

func SafeScheduleAfter(ctx context.Context, offset time.Duration, f func(context.Context, time.Time))

SafeScheduleAfter calls once function `f` after `offset` similarly to ScheduleAfter but stops the function is controlled by the context

func Schedule

func Schedule(ctx context.Context, period time.Duration, offset time.Duration, f func(time.Time))

Schedule calls function `f` regularly with a `period` and an `offset`.

func ScheduleAfter added in v1.1.0

func ScheduleAfter(ctx context.Context, offset time.Duration, f func(time.Time))

ScheduleAfter calls once function `f` after `offset`

func SleepWithContext

func SleepWithContext(ctx context.Context, delay time.Duration)

SleepWithContext performs an interruptable sleep Similar to time.Sleep() but also responding to context cancellation instead of blocking for the whole length of time.

func SleepWithInterruption

func SleepWithInterruption(stop chan bool, delay time.Duration)

SleepWithInterruption performs an interruptable sleep Similar to time.Sleep() but also interrupting when requested instead of blocking for the whole length of time.

func WaitUntil added in v1.100.0

func WaitUntil(ctx context.Context, evalCondition func(ctx2 context.Context) (bool, error), pauseBetweenEvaluations time.Duration) error

WaitUntil waits for a condition evaluated by evalCondition to be verified

func WaitWithContext added in v1.101.0

func WaitWithContext(ctx context.Context, wg IWaiter) (err error)

func WaitWithContextAndError added in v1.101.0

func WaitWithContextAndError(ctx context.Context, wg IErrorWaiter) (err error)

func WorkerPool added in v1.108.0

func WorkerPool[InputType, ResultType any](ctx context.Context, numWorkers int, jobs []InputType, f func(context.Context, InputType) (ResultType, bool, error)) (results []ResultType, err error)

WorkerPool parallelises an action using a worker pool of the size provided by numWorkers and retrieves all the results when all the actions have completed. It is similar to Parallelise but it uses generics instead of reflection and allows you to control the pool size

func WrapCloseToCancelFunc added in v1.117.0

func WrapCloseToCancelFunc(f CloseFunc) context.CancelFunc

func WrapContextualToCancelFunc added in v1.117.0

func WrapContextualToCancelFunc(f ContextualFunc) context.CancelFunc

Types

type CancelFunctionStore

type CancelFunctionStore struct {
	ExecutionGroup[context.CancelFunc]
}

func NewCancelFunctionsStore

func NewCancelFunctionsStore(options ...StoreOption) *CancelFunctionStore

NewCancelFunctionsStore creates a store for cancel functions. Whatever the options passed, all cancel functions will be executed and cleared. In other words, options `RetainAfterExecution` and `StopOnFirstError` would be discarded if selected to create the Cancel store

func (*CancelFunctionStore) Cancel

func (s *CancelFunctionStore) Cancel()

Cancel will execute the cancel functions in the store. Any errors will be ignored and Execute() is recommended if you need to know if a cancellation failed

func (*CancelFunctionStore) Len

func (s *CancelFunctionStore) Len() int

func (*CancelFunctionStore) RegisterCancelFunction

func (s *CancelFunctionStore) RegisterCancelFunction(cancel ...context.CancelFunc)

func (*CancelFunctionStore) RegisterCancelStore added in v1.117.0

func (s *CancelFunctionStore) RegisterCancelStore(store *CancelFunctionStore)

type CloseFunc added in v1.106.0

type CloseFunc func() error

func WrapCancelToCloseFunc added in v1.117.0

func WrapCancelToCloseFunc(f context.CancelFunc) CloseFunc

func WrapCloserIntoCloseFunc added in v1.117.0

func WrapCloserIntoCloseFunc(closer io.Closer) CloseFunc

func WrapContextualToCloseFunc added in v1.117.0

func WrapContextualToCloseFunc(f ContextualFunc) CloseFunc

func (CloseFunc) Close added in v1.117.0

func (c CloseFunc) Close() error

type CloseFunctionStore added in v1.106.0

type CloseFunctionStore struct {
	ExecutionGroup[CloseFunc]
}

func NewCloseFunctionStore added in v1.113.0

func NewCloseFunctionStore(options ...StoreOption) *CloseFunctionStore

NewCloseFunctionStore returns a store closing functions which will all be called on Close(). The first error received if any will be returned.

func NewCloseFunctionStoreStore added in v1.106.0

func NewCloseFunctionStoreStore(stopOnFirstError bool) *CloseFunctionStore

NewCloseFunctionStoreStore is exactly the same as NewConcurrentCloseFunctionStore but without a typo in the name.

func NewCloseOnceGroup added in v1.117.0

func NewCloseOnceGroup(options ...StoreOption) *CloseFunctionStore

NewCloseOnceGroup is the same as NewCloseFunctionStore but ensures any closing functions are only executed once.

func NewConcurrentCloseFunctionStore added in v1.113.0

func NewConcurrentCloseFunctionStore(stopOnFirstError bool) *CloseFunctionStore

NewConcurrentCloseFunctionStore returns a store closing functions which will all be called concurrently on Close(). The first error received will be returned. Prefer using NewCloseFunctionStore where possible

func (*CloseFunctionStore) Close added in v1.106.0

func (s *CloseFunctionStore) Close() error

func (*CloseFunctionStore) Len added in v1.106.0

func (s *CloseFunctionStore) Len() int

func (*CloseFunctionStore) RegisterCancelFunction added in v1.112.0

func (s *CloseFunctionStore) RegisterCancelFunction(cancelFunc ...context.CancelFunc)

func (*CloseFunctionStore) RegisterCancelStore added in v1.112.0

func (s *CloseFunctionStore) RegisterCancelStore(cancelStore *CancelFunctionStore)

func (*CloseFunctionStore) RegisterCloseFunction added in v1.106.0

func (s *CloseFunctionStore) RegisterCloseFunction(closerObj ...CloseFunc)

type CloserStore added in v1.106.0

type CloserStore struct {
	ExecutionGroup[io.Closer]
}

func NewCloserStore added in v1.106.0

func NewCloserStore(stopOnFirstError bool) *CloserStore

NewCloserStore returns a store of io.Closer object which will all be closed concurrently on Close(). The first error received will be returned

func NewCloserStoreWithOptions added in v1.113.0

func NewCloserStoreWithOptions(opts ...StoreOption) *CloserStore

NewCloserStoreWithOptions returns a store of io.Closer object which will all be closed on Close(). The first error received if any will be returned

func (*CloserStore) Close added in v1.106.0

func (s *CloserStore) Close() error

func (*CloserStore) Len added in v1.106.0

func (s *CloserStore) Len() int

func (*CloserStore) RegisterCloser added in v1.106.0

func (s *CloserStore) RegisterCloser(closerObj ...io.Closer)

type CompoundExecutionGroup added in v1.117.0

type CompoundExecutionGroup struct {
	ContextualFunctionGroup
}

func NewCompoundExecutionGroup added in v1.117.0

func NewCompoundExecutionGroup(options ...StoreOption) *CompoundExecutionGroup

NewCompoundExecutionGroup returns an execution group made of executors

func (*CompoundExecutionGroup) RegisterExecutor added in v1.117.0

func (g *CompoundExecutionGroup) RegisterExecutor(group ...IExecutor)

RegisterExecutor registers executors

type ContextualFunc added in v1.117.0

type ContextualFunc func(ctx context.Context) error

func WrapCancelToContextualFunc added in v1.117.0

func WrapCancelToContextualFunc(f context.CancelFunc) ContextualFunc

func WrapCloseToContextualFunc added in v1.117.0

func WrapCloseToContextualFunc(f CloseFunc) ContextualFunc

type ContextualFunctionGroup added in v1.117.0

type ContextualFunctionGroup struct {
	ExecutionGroup[ContextualFunc]
}

func NewContextualGroup added in v1.117.0

func NewContextualGroup(options ...StoreOption) *ContextualFunctionGroup

NewContextualGroup returns a group executing contextual functions.

type ExecuteFunc added in v1.117.0

type ExecuteFunc[T any] func(ctx context.Context, element T) error

type ExecutionGroup added in v1.117.0

type ExecutionGroup[T any] struct {
	// contains filtered or unexported fields
}

func NewExecutionGroup added in v1.117.0

func NewExecutionGroup[T any](executeFunc ExecuteFunc[T], options ...StoreOption) *ExecutionGroup[T]

NewExecutionGroup returns an execution group which executes functions according to store options.

func (*ExecutionGroup[T]) Execute added in v1.117.0

func (s *ExecutionGroup[T]) Execute(ctx context.Context) (err error)

Execute executes all the function in the group according to store options.

func (*ExecutionGroup[T]) Len added in v1.117.0

func (s *ExecutionGroup[T]) Len() int

func (*ExecutionGroup[T]) RegisterFunction added in v1.117.0

func (s *ExecutionGroup[T]) RegisterFunction(function ...T)

RegisterFunction registers functions to the group.

type ICompoundExecutionGroup added in v1.117.0

type ICompoundExecutionGroup[T any] interface {
	IExecutionGroup[T]
	// RegisterExecutor registers executors of any kind to the group: they could be functions or sub-groups.
	RegisterExecutor(executor ...IExecutor)
}

type IErrorWaiter added in v1.101.0

type IErrorWaiter interface {
	Wait() error
}

IErrorWaiter can be used to wait on errgroups and similar types where Wait() returns an error This is used to support use in the WaitWithContextAndError function to wait but listen to contexts

type IExecutionGroup added in v1.117.0

type IExecutionGroup[T any] interface {
	IExecutor
	RegisterFunction(function ...T)
	Len() int
}

type IExecutor added in v1.117.0

type IExecutor interface {
	// Execute executes all the functions in the group.
	Execute(ctx context.Context) error
}

type IWaiter added in v1.101.0

type IWaiter interface {
	Wait()
}

IWaiter can be used to wait on sync WaitGroups and similar types where Wait() does not return an error This is used to support use in the WaitWithContext function to wait but listen to contexts

type StoreOption added in v1.113.0

type StoreOption func(*StoreOptions) *StoreOptions
var AnyTimes StoreOption = func(o *StoreOptions) *StoreOptions {
	if o == nil {
		o = DefaultOptions()
	}
	o.onlyOnce = false
	return o
}

AnyTimes will allow the functions to be executed as often that they might be.

var ClearAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
	if o == nil {
		o = DefaultOptions()
	}
	o.clearOnExecution = true
	return o
}

ClearAfterExecution clears the ExecutionGroup after execution.

var ExecuteAll StoreOption = func(o *StoreOptions) *StoreOptions {
	if o == nil {
		o = DefaultOptions()
	}
	o.stopOnFirstError = false
	return o
}

ExecuteAll executes all functions in the ExecutionGroup even if an error is raised. the first error raised is then returned.

var JoinErrors StoreOption = func(o *StoreOptions) *StoreOptions {
	if o == nil {
		o = DefaultOptions()
	}
	o.stopOnFirstError = false
	o.joinErrors = true
	return o
}

JoinErrors will collate any errors which happened when executing functions in ExecutionGroup. This option should not be used in combination to StopOnFirstError.

var OnlyOnce StoreOption = func(o *StoreOptions) *StoreOptions {
	if o == nil {
		o = DefaultOptions()
	}
	o.onlyOnce = true
	return o
}

OnlyOnce will ensure the function are executed only once if they do.

var Parallel StoreOption = func(o *StoreOptions) *StoreOptions {
	if o == nil {
		o = DefaultOptions()
	}
	o.sequential = false
	return o
}

Parallel ensures every function registered in the ExecutionGroup is executed concurrently in the order they were registered.

var RetainAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
	if o == nil {
		o = DefaultOptions()
	}
	o.clearOnExecution = false
	return o
}

RetainAfterExecution keep the ExecutionGroup intact after execution (no reset).

var Sequential StoreOption = func(o *StoreOptions) *StoreOptions {
	if o == nil {
		o = DefaultOptions()
	}
	o.sequential = true
	return o
}

Sequential ensures every function registered in the ExecutionGroup is executed sequentially in the order they were registered.

var SequentialInReverse StoreOption = func(o *StoreOptions) *StoreOptions {
	if o == nil {
		o = DefaultOptions()
	}
	o.sequential = true
	o.reverse = true
	return o
}

SequentialInReverse ensures every function registered in the ExecutionGroup is executed sequentially but in the reverse order they were registered.

var StopOnFirstError StoreOption = func(o *StoreOptions) *StoreOptions {
	if o == nil {
		o = DefaultOptions()
	}
	o.stopOnFirstError = true
	o.joinErrors = false
	return o
}

StopOnFirstError stops ExecutionGroup execution on first error.

func Workers added in v1.117.0

func Workers(workers int) StoreOption

Workers defines a limit number of workers for executing the function registered in the ExecutionGroup.

type StoreOptions added in v1.113.0

type StoreOptions struct {
	// contains filtered or unexported fields
}

func DefaultOptions added in v1.117.0

func DefaultOptions() *StoreOptions

DefaultOptions returns the default store configuration

func WithOptions added in v1.117.0

func WithOptions(option ...StoreOption) (opts *StoreOptions)

WithOptions defines a store configuration.

func (*StoreOptions) Default added in v1.117.0

func (o *StoreOptions) Default() *StoreOptions

func (*StoreOptions) Merge added in v1.117.0

func (o *StoreOptions) Merge(opts *StoreOptions) *StoreOptions

func (*StoreOptions) MergeWithOptions added in v1.117.0

func (o *StoreOptions) MergeWithOptions(opt ...StoreOption) *StoreOptions

func (*StoreOptions) Options added in v1.117.0

func (o *StoreOptions) Options() []StoreOption

func (*StoreOptions) Overwrite added in v1.117.0

func (o *StoreOptions) Overwrite(opts *StoreOptions) *StoreOptions

func (*StoreOptions) WithOptions added in v1.117.0

func (o *StoreOptions) WithOptions(opts ...StoreOption) *StoreOptions

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL