Documentation
¶
Overview ¶
Package parallelisation defines a module for `Concurrency`
Index ¶
- func BreakOnError(ctx context.Context, executionOptions *StoreOptions, ...) error
- func CloseAll(cs ...io.Closer) error
- func CloseAllAndCollateErrors(cs ...io.Closer) error
- func CloseAllFunc(cs ...CloseFunc) error
- func CloseAllFuncAndCollateErrors(cs ...CloseFunc) error
- func CloseAllWithContext(ctx context.Context, cs ...io.Closer) error
- func CloseAllWithContextAndCollateErrors(ctx context.Context, cs ...io.Closer) error
- func DetermineContextError(ctx context.Context) error
- func Filter[T any](ctx context.Context, numWorkers int, s []T, f func(T) bool) (result []T, err error)
- func ForEach(ctx context.Context, executionOptions *StoreOptions, ...) error
- func Map[T1 any, T2 any](ctx context.Context, numWorkers int, s []T1, f func(T1) T2) (result []T2, err error)
- func Parallelise(argList any, action func(arg any) (any, error), resultType reflect.Type) (results any, err error)
- func Reject[T any](ctx context.Context, numWorkers int, s []T, f func(T) bool) ([]T, error)
- func RunActionWithParallelCheck(ctx context.Context, action func(ctx context.Context) error, ...) error
- func RunActionWithTimeout(blockingAction func(stop chan bool) error, timeout time.Duration) (err error)
- func RunActionWithTimeoutAndCancelStore(ctx context.Context, timeout time.Duration, store *CancelFunctionStore, ...) error
- func RunActionWithTimeoutAndContext(ctx context.Context, timeout time.Duration, ...) error
- func SafeSchedule(ctx context.Context, period time.Duration, offset time.Duration, ...)
- func SafeScheduleAfter(ctx context.Context, offset time.Duration, f func(context.Context, time.Time))
- func Schedule(ctx context.Context, period time.Duration, offset time.Duration, ...)
- func ScheduleAfter(ctx context.Context, offset time.Duration, f func(time.Time))
- func SleepWithContext(ctx context.Context, delay time.Duration)
- func SleepWithInterruption(stop chan bool, delay time.Duration)
- func WaitUntil(ctx context.Context, evalCondition func(ctx2 context.Context) (bool, error), ...) error
- func WaitWithContext(ctx context.Context, wg IWaiter) (err error)
- func WaitWithContextAndError(ctx context.Context, wg IErrorWaiter) (err error)
- func WorkerPool[InputType, ResultType any](ctx context.Context, numWorkers int, jobs []InputType, ...) (results []ResultType, err error)
- func WrapCloseToCancelFunc(f CloseFunc) context.CancelFunc
- func WrapContextualToCancelFunc(f ContextualFunc) context.CancelFunc
- type CancelFunctionStore
- type CloseFunc
- type CloseFunctionStore
- func (s *CloseFunctionStore) Close() error
- func (s *CloseFunctionStore) Len() int
- func (s *CloseFunctionStore) RegisterCancelFunction(cancelFunc ...context.CancelFunc)
- func (s *CloseFunctionStore) RegisterCancelStore(cancelStore *CancelFunctionStore)
- func (s *CloseFunctionStore) RegisterCloseFunction(closerObj ...CloseFunc)
- type CloserStore
- type CompoundExecutionGroup
- type ContextualFunc
- type ContextualFunctionGroup
- type ExecuteFunc
- type ExecutionGroup
- type ICompoundExecutionGroup
- type IErrorWaiter
- type IExecutionGroup
- type IExecutor
- type IWaiter
- type StoreOption
- type StoreOptions
- func (o *StoreOptions) Default() *StoreOptions
- func (o *StoreOptions) Merge(opts *StoreOptions) *StoreOptions
- func (o *StoreOptions) MergeWithOptions(opt ...StoreOption) *StoreOptions
- func (o *StoreOptions) Options() []StoreOption
- func (o *StoreOptions) Overwrite(opts *StoreOptions) *StoreOptions
- func (o *StoreOptions) WithOptions(opts ...StoreOption) *StoreOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BreakOnError ¶ added in v1.117.0
func BreakOnError(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error
BreakOnError executes each functions in the group until an error is found or the context gets cancelled.
func CloseAll ¶ added in v1.106.0
CloseAll calls concurrently Close on all io.Closer implementations passed as arguments and returns the first error encountered
func CloseAllAndCollateErrors ¶ added in v1.114.0
CloseAllAndCollateErrors calls concurrently Close on all io.Closer implementations passed as arguments and returns the errors encountered
func CloseAllFunc ¶ added in v1.106.0
CloseAllFunc calls concurrently all Close functions passed as arguments and returns the first error encountered
func CloseAllFuncAndCollateErrors ¶ added in v1.114.0
CloseAllFuncAndCollateErrors calls concurrently all Close functions passed as arguments and returns the errors encountered
func CloseAllWithContext ¶ added in v1.106.0
CloseAllWithContext is similar to CloseAll but can be controlled using a context.
func CloseAllWithContextAndCollateErrors ¶ added in v1.114.0
CloseAllWithContextAndCollateErrors is similar to CloseAllAndCollateErrors but can be controlled using a context.
func DetermineContextError ¶
DetermineContextError determines what the context error is if any.
func Filter ¶ added in v1.110.0
func Filter[T any](ctx context.Context, numWorkers int, s []T, f func(T) bool) (result []T, err error)
Filter is similar to collection.Filter but uses parallelisation.
func ForEach ¶ added in v1.117.0
func ForEach(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error
ForEach executes all the contextual functions according to the store options and returns an error if one occurred.
func Map ¶ added in v1.110.0
func Map[T1 any, T2 any](ctx context.Context, numWorkers int, s []T1, f func(T1) T2) (result []T2, err error)
Map is similar to collection.Map but uses parallelisation.
func Parallelise ¶
func Parallelise(argList any, action func(arg any) (any, error), resultType reflect.Type) (results any, err error)
Parallelise parallelises an action over as many goroutines as specified by the argList and retrieves all the results when all the goroutines are done. To control the number of goroutines spawned, prefer WorkerPool
func Reject ¶ added in v1.110.0
Reject is the opposite of Filter and returns the elements of collection for which the filtering function f returns false.
func RunActionWithParallelCheck ¶
func RunActionWithParallelCheck(ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) bool, checkPeriod time.Duration) error
RunActionWithParallelCheck runs an action with a check in parallel The function performing the check should return true if the check was favourable; false otherwise. If the check did not have the expected result and the whole function would be cancelled.
func RunActionWithTimeout ¶
func RunActionWithTimeout(blockingAction func(stop chan bool) error, timeout time.Duration) (err error)
RunActionWithTimeout runs an action with timeout
func RunActionWithTimeoutAndCancelStore ¶
func RunActionWithTimeoutAndCancelStore(ctx context.Context, timeout time.Duration, store *CancelFunctionStore, blockingAction func(context.Context) error) error
RunActionWithTimeoutAndCancelStore runs an action with timeout The cancel ExecutionGroup is used just to register the cancel function so that it can be called on Cancel.
func RunActionWithTimeoutAndContext ¶
func RunActionWithTimeoutAndContext(ctx context.Context, timeout time.Duration, blockingAction func(context.Context) error) error
RunActionWithTimeoutAndContext runs an action with timeout blockingAction's context will be cancelled on exit.
func SafeSchedule ¶ added in v1.40.0
func SafeSchedule(ctx context.Context, period time.Duration, offset time.Duration, f func(context.Context, time.Time))
SafeSchedule calls function `f` regularly with a `period` and an `offset`, similarly to Schedule but with context control.
func SafeScheduleAfter ¶ added in v1.40.0
func SafeScheduleAfter(ctx context.Context, offset time.Duration, f func(context.Context, time.Time))
SafeScheduleAfter calls once function `f` after `offset` similarly to ScheduleAfter but stops the function is controlled by the context
func ScheduleAfter ¶ added in v1.1.0
ScheduleAfter calls once function `f` after `offset`
func SleepWithContext ¶
SleepWithContext performs an interruptable sleep Similar to time.Sleep() but also responding to context cancellation instead of blocking for the whole length of time.
func SleepWithInterruption ¶
SleepWithInterruption performs an interruptable sleep Similar to time.Sleep() but also interrupting when requested instead of blocking for the whole length of time.
func WaitUntil ¶ added in v1.100.0
func WaitUntil(ctx context.Context, evalCondition func(ctx2 context.Context) (bool, error), pauseBetweenEvaluations time.Duration) error
WaitUntil waits for a condition evaluated by evalCondition to be verified
func WaitWithContext ¶ added in v1.101.0
func WaitWithContextAndError ¶ added in v1.101.0
func WaitWithContextAndError(ctx context.Context, wg IErrorWaiter) (err error)
func WorkerPool ¶ added in v1.108.0
func WorkerPool[InputType, ResultType any](ctx context.Context, numWorkers int, jobs []InputType, f func(context.Context, InputType) (ResultType, bool, error)) (results []ResultType, err error)
WorkerPool parallelises an action using a worker pool of the size provided by numWorkers and retrieves all the results when all the actions have completed. It is similar to Parallelise but it uses generics instead of reflection and allows you to control the pool size
func WrapCloseToCancelFunc ¶ added in v1.117.0
func WrapCloseToCancelFunc(f CloseFunc) context.CancelFunc
func WrapContextualToCancelFunc ¶ added in v1.117.0
func WrapContextualToCancelFunc(f ContextualFunc) context.CancelFunc
Types ¶
type CancelFunctionStore ¶
type CancelFunctionStore struct { ExecutionGroup[context.CancelFunc] }
func NewCancelFunctionsStore ¶
func NewCancelFunctionsStore(options ...StoreOption) *CancelFunctionStore
NewCancelFunctionsStore creates a store for cancel functions. Whatever the options passed, all cancel functions will be executed and cleared. In other words, options `RetainAfterExecution` and `StopOnFirstError` would be discarded if selected to create the Cancel store
func (*CancelFunctionStore) Cancel ¶
func (s *CancelFunctionStore) Cancel()
Cancel will execute the cancel functions in the store. Any errors will be ignored and Execute() is recommended if you need to know if a cancellation failed
func (*CancelFunctionStore) Len ¶
func (s *CancelFunctionStore) Len() int
func (*CancelFunctionStore) RegisterCancelFunction ¶
func (s *CancelFunctionStore) RegisterCancelFunction(cancel ...context.CancelFunc)
func (*CancelFunctionStore) RegisterCancelStore ¶ added in v1.117.0
func (s *CancelFunctionStore) RegisterCancelStore(store *CancelFunctionStore)
type CloseFunc ¶ added in v1.106.0
type CloseFunc func() error
func WrapCancelToCloseFunc ¶ added in v1.117.0
func WrapCancelToCloseFunc(f context.CancelFunc) CloseFunc
func WrapCloserIntoCloseFunc ¶ added in v1.117.0
func WrapContextualToCloseFunc ¶ added in v1.117.0
func WrapContextualToCloseFunc(f ContextualFunc) CloseFunc
type CloseFunctionStore ¶ added in v1.106.0
type CloseFunctionStore struct { ExecutionGroup[CloseFunc] }
func NewCloseFunctionStore ¶ added in v1.113.0
func NewCloseFunctionStore(options ...StoreOption) *CloseFunctionStore
NewCloseFunctionStore returns a store closing functions which will all be called on Close(). The first error received if any will be returned.
func NewCloseFunctionStoreStore ¶ added in v1.106.0
func NewCloseFunctionStoreStore(stopOnFirstError bool) *CloseFunctionStore
NewCloseFunctionStoreStore is exactly the same as NewConcurrentCloseFunctionStore but without a typo in the name.
func NewCloseOnceGroup ¶ added in v1.117.0
func NewCloseOnceGroup(options ...StoreOption) *CloseFunctionStore
NewCloseOnceGroup is the same as NewCloseFunctionStore but ensures any closing functions are only executed once.
func NewConcurrentCloseFunctionStore ¶ added in v1.113.0
func NewConcurrentCloseFunctionStore(stopOnFirstError bool) *CloseFunctionStore
NewConcurrentCloseFunctionStore returns a store closing functions which will all be called concurrently on Close(). The first error received will be returned. Prefer using NewCloseFunctionStore where possible
func (*CloseFunctionStore) Close ¶ added in v1.106.0
func (s *CloseFunctionStore) Close() error
func (*CloseFunctionStore) Len ¶ added in v1.106.0
func (s *CloseFunctionStore) Len() int
func (*CloseFunctionStore) RegisterCancelFunction ¶ added in v1.112.0
func (s *CloseFunctionStore) RegisterCancelFunction(cancelFunc ...context.CancelFunc)
func (*CloseFunctionStore) RegisterCancelStore ¶ added in v1.112.0
func (s *CloseFunctionStore) RegisterCancelStore(cancelStore *CancelFunctionStore)
func (*CloseFunctionStore) RegisterCloseFunction ¶ added in v1.106.0
func (s *CloseFunctionStore) RegisterCloseFunction(closerObj ...CloseFunc)
type CloserStore ¶ added in v1.106.0
type CloserStore struct { ExecutionGroup[io.Closer] }
func NewCloserStore ¶ added in v1.106.0
func NewCloserStore(stopOnFirstError bool) *CloserStore
NewCloserStore returns a store of io.Closer object which will all be closed concurrently on Close(). The first error received will be returned
func NewCloserStoreWithOptions ¶ added in v1.113.0
func NewCloserStoreWithOptions(opts ...StoreOption) *CloserStore
NewCloserStoreWithOptions returns a store of io.Closer object which will all be closed on Close(). The first error received if any will be returned
func (*CloserStore) Close ¶ added in v1.106.0
func (s *CloserStore) Close() error
func (*CloserStore) Len ¶ added in v1.106.0
func (s *CloserStore) Len() int
func (*CloserStore) RegisterCloser ¶ added in v1.106.0
func (s *CloserStore) RegisterCloser(closerObj ...io.Closer)
type CompoundExecutionGroup ¶ added in v1.117.0
type CompoundExecutionGroup struct {
ContextualFunctionGroup
}
func NewCompoundExecutionGroup ¶ added in v1.117.0
func NewCompoundExecutionGroup(options ...StoreOption) *CompoundExecutionGroup
NewCompoundExecutionGroup returns an execution group made of executors
func (*CompoundExecutionGroup) RegisterExecutor ¶ added in v1.117.0
func (g *CompoundExecutionGroup) RegisterExecutor(group ...IExecutor)
RegisterExecutor registers executors
type ContextualFunc ¶ added in v1.117.0
func WrapCancelToContextualFunc ¶ added in v1.117.0
func WrapCancelToContextualFunc(f context.CancelFunc) ContextualFunc
func WrapCloseToContextualFunc ¶ added in v1.117.0
func WrapCloseToContextualFunc(f CloseFunc) ContextualFunc
type ContextualFunctionGroup ¶ added in v1.117.0
type ContextualFunctionGroup struct { ExecutionGroup[ContextualFunc] }
func NewContextualGroup ¶ added in v1.117.0
func NewContextualGroup(options ...StoreOption) *ContextualFunctionGroup
NewContextualGroup returns a group executing contextual functions.
type ExecuteFunc ¶ added in v1.117.0
type ExecutionGroup ¶ added in v1.117.0
type ExecutionGroup[T any] struct { // contains filtered or unexported fields }
func NewExecutionGroup ¶ added in v1.117.0
func NewExecutionGroup[T any](executeFunc ExecuteFunc[T], options ...StoreOption) *ExecutionGroup[T]
NewExecutionGroup returns an execution group which executes functions according to store options.
func (*ExecutionGroup[T]) Execute ¶ added in v1.117.0
func (s *ExecutionGroup[T]) Execute(ctx context.Context) (err error)
Execute executes all the function in the group according to store options.
func (*ExecutionGroup[T]) Len ¶ added in v1.117.0
func (s *ExecutionGroup[T]) Len() int
func (*ExecutionGroup[T]) RegisterFunction ¶ added in v1.117.0
func (s *ExecutionGroup[T]) RegisterFunction(function ...T)
RegisterFunction registers functions to the group.
type ICompoundExecutionGroup ¶ added in v1.117.0
type ICompoundExecutionGroup[T any] interface { IExecutionGroup[T] // RegisterExecutor registers executors of any kind to the group: they could be functions or sub-groups. RegisterExecutor(executor ...IExecutor) }
type IErrorWaiter ¶ added in v1.101.0
type IErrorWaiter interface {
Wait() error
}
IErrorWaiter can be used to wait on errgroups and similar types where Wait() returns an error This is used to support use in the WaitWithContextAndError function to wait but listen to contexts
type IExecutionGroup ¶ added in v1.117.0
type IWaiter ¶ added in v1.101.0
type IWaiter interface {
Wait()
}
IWaiter can be used to wait on sync WaitGroups and similar types where Wait() does not return an error This is used to support use in the WaitWithContext function to wait but listen to contexts
type StoreOption ¶ added in v1.113.0
type StoreOption func(*StoreOptions) *StoreOptions
var AnyTimes StoreOption = func(o *StoreOptions) *StoreOptions { if o == nil { o = DefaultOptions() } o.onlyOnce = false return o }
AnyTimes will allow the functions to be executed as often that they might be.
var ClearAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions { if o == nil { o = DefaultOptions() } o.clearOnExecution = true return o }
ClearAfterExecution clears the ExecutionGroup after execution.
var ExecuteAll StoreOption = func(o *StoreOptions) *StoreOptions { if o == nil { o = DefaultOptions() } o.stopOnFirstError = false return o }
ExecuteAll executes all functions in the ExecutionGroup even if an error is raised. the first error raised is then returned.
var JoinErrors StoreOption = func(o *StoreOptions) *StoreOptions { if o == nil { o = DefaultOptions() } o.stopOnFirstError = false o.joinErrors = true return o }
JoinErrors will collate any errors which happened when executing functions in ExecutionGroup. This option should not be used in combination to StopOnFirstError.
var OnlyOnce StoreOption = func(o *StoreOptions) *StoreOptions { if o == nil { o = DefaultOptions() } o.onlyOnce = true return o }
OnlyOnce will ensure the function are executed only once if they do.
var Parallel StoreOption = func(o *StoreOptions) *StoreOptions { if o == nil { o = DefaultOptions() } o.sequential = false return o }
Parallel ensures every function registered in the ExecutionGroup is executed concurrently in the order they were registered.
var RetainAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions { if o == nil { o = DefaultOptions() } o.clearOnExecution = false return o }
RetainAfterExecution keep the ExecutionGroup intact after execution (no reset).
var Sequential StoreOption = func(o *StoreOptions) *StoreOptions { if o == nil { o = DefaultOptions() } o.sequential = true return o }
Sequential ensures every function registered in the ExecutionGroup is executed sequentially in the order they were registered.
var SequentialInReverse StoreOption = func(o *StoreOptions) *StoreOptions { if o == nil { o = DefaultOptions() } o.sequential = true o.reverse = true return o }
SequentialInReverse ensures every function registered in the ExecutionGroup is executed sequentially but in the reverse order they were registered.
var StopOnFirstError StoreOption = func(o *StoreOptions) *StoreOptions { if o == nil { o = DefaultOptions() } o.stopOnFirstError = true o.joinErrors = false return o }
StopOnFirstError stops ExecutionGroup execution on first error.
func Workers ¶ added in v1.117.0
func Workers(workers int) StoreOption
Workers defines a limit number of workers for executing the function registered in the ExecutionGroup.
type StoreOptions ¶ added in v1.113.0
type StoreOptions struct {
// contains filtered or unexported fields
}
func DefaultOptions ¶ added in v1.117.0
func DefaultOptions() *StoreOptions
DefaultOptions returns the default store configuration
func WithOptions ¶ added in v1.117.0
func WithOptions(option ...StoreOption) (opts *StoreOptions)
WithOptions defines a store configuration.
func (*StoreOptions) Default ¶ added in v1.117.0
func (o *StoreOptions) Default() *StoreOptions
func (*StoreOptions) Merge ¶ added in v1.117.0
func (o *StoreOptions) Merge(opts *StoreOptions) *StoreOptions
func (*StoreOptions) MergeWithOptions ¶ added in v1.117.0
func (o *StoreOptions) MergeWithOptions(opt ...StoreOption) *StoreOptions
func (*StoreOptions) Options ¶ added in v1.117.0
func (o *StoreOptions) Options() []StoreOption
func (*StoreOptions) Overwrite ¶ added in v1.117.0
func (o *StoreOptions) Overwrite(opts *StoreOptions) *StoreOptions
func (*StoreOptions) WithOptions ¶ added in v1.117.0
func (o *StoreOptions) WithOptions(opts ...StoreOption) *StoreOptions