Documentation
¶
Overview ¶
Package fun is a zero-dependency collection of tools and idoms that takes advantage of generics. Iterators, error handling, a native-feeling Set type, and a simple pub-sub framework for distributing messages in fan-out patterns.
Index ¶
- Constants
- Variables
- func HandlePassthrough[T any, O any](hf Handler[O]) func(T, O) T
- type ChanOp
- func (op ChanOp[T]) Blocking() ChanOp[T]
- func (op ChanOp[T]) Cap() int
- func (op ChanOp[T]) Channel() chan T
- func (op ChanOp[T]) Close()
- func (op ChanOp[T]) Iterator() *Iterator[T]
- func (op ChanOp[T]) Len() int
- func (op ChanOp[T]) NonBlocking() ChanOp[T]
- func (op ChanOp[T]) Pipe() (Processor[T], Producer[T])
- func (op ChanOp[T]) Processor() Processor[T]
- func (op ChanOp[T]) Producer() Producer[T]
- func (op ChanOp[T]) Receive() ChanReceive[T]
- func (op ChanOp[T]) Send() ChanSend[T]
- func (op ChanOp[T]) Seq(ctx context.Context) iter.Seq[T]
- func (op ChanOp[T]) Seq2(ctx context.Context) iter.Seq2[int, T]
- type ChanReceive
- func (ro ChanReceive[T]) Check(ctx context.Context) (T, bool)
- func (ro ChanReceive[T]) Consume(op Processor[T]) Worker
- func (ro ChanReceive[T]) Drop(ctx context.Context) bool
- func (ro ChanReceive[T]) Filter(ctx context.Context, eh Handler[error], filter func(T) bool) ChanReceive[T]
- func (ro ChanReceive[T]) Force(ctx context.Context) (out T)
- func (ro ChanReceive[T]) Ignore(ctx context.Context)
- func (ro ChanReceive[T]) Iterator() *Iterator[T]
- func (ro ChanReceive[T]) Ok() bool
- func (ro ChanReceive[T]) Producer() Producer[T]
- func (ro ChanReceive[T]) Read(ctx context.Context) (T, error)
- func (ro ChanReceive[T]) Seq(ctx context.Context) iter.Seq[T]
- func (ro ChanReceive[T]) Seq2(ctx context.Context) iter.Seq2[int, T]
- type ChanSend
- func (sm ChanSend[T]) Check(ctx context.Context, it T) bool
- func (sm ChanSend[T]) Consume(iter *Iterator[T]) Worker
- func (sm ChanSend[T]) Ignore(ctx context.Context, it T)
- func (sm ChanSend[T]) Processor() Processor[T]
- func (sm ChanSend[T]) Signal(ctx context.Context)
- func (sm ChanSend[T]) Write(ctx context.Context, it T) (err error)
- func (sm ChanSend[T]) Zero(ctx context.Context) error
- type Future
- func (f Future[T]) If(cond bool) Future[T]
- func (f Future[T]) Ignore() func()
- func (f Future[T]) Join(merge func(T, T) T, ops ...Future[T]) Future[T]
- func (f Future[T]) Limit(in int) Future[T]
- func (f Future[T]) Lock() Future[T]
- func (f Future[T]) Not(cond bool) Future[T]
- func (f Future[T]) Once() Future[T]
- func (f Future[T]) PostHook(fn func()) Future[T]
- func (f Future[T]) PreHook(fn func()) Future[T]
- func (f Future[T]) Producer() Producer[T]
- func (f Future[T]) Reduce(merge func(T, T) T, next Future[T]) Future[T]
- func (f Future[T]) Resolve() T
- func (f Future[T]) Slice() func() []T
- func (f Future[T]) TTL(dur time.Duration) Future[T]
- func (f Future[T]) When(c func() bool) Future[T]
- func (f Future[T]) WithLock(m sync.Locker) Future[T]
- type Handler
- func (of Handler[T]) All(in ...T)
- func (of Handler[T]) Capture(in T) func()
- func (of Handler[T]) Chain(chain ...Handler[T]) Handler[T]
- func (of Handler[T]) Filter(filter func(T) T) Handler[T]
- func (of Handler[T]) Handle(in T)
- func (of Handler[T]) If(cond bool) Handler[T]
- func (of Handler[T]) Iterator(iter *Iterator[T]) Worker
- func (of Handler[T]) Join(next Handler[T]) Handler[T]
- func (of Handler[T]) Lock() Handler[T]
- func (of Handler[T]) Once() Handler[T]
- func (of Handler[T]) Operation(in T) Operation
- func (of Handler[T]) PreHook(prev Handler[T]) Handler[T]
- func (of Handler[T]) Processor() Processor[T]
- func (of Handler[T]) RecoverPanic(in T) error
- func (of Handler[T]) Skip(hook func(T) bool) Handler[T]
- func (of Handler[T]) When(cond func() bool) Handler[T]
- func (of Handler[T]) WithLock(mtx sync.Locker) Handler[T]
- func (of Handler[T]) WithRecover(oe Handler[error]) Handler[T]
- func (of Handler[T]) Worker(in T) Worker
- type Handlers
- func (Handlers) Atoi() Transform[string, int]
- func (Handlers) Counter(maxVal int) *Iterator[int]
- func (Handlers) ErrorCollector() (Handler[error], Future[error])
- func (Handlers) ErrorHandler(of Handler[error]) Handler[error]
- func (Handlers) ErrorHandlerSingle() (Handler[error], Future[error])
- func (Handlers) ErrorHandlerWithAbort(cancel context.CancelFunc) Handler[error]
- func (Handlers) ErrorHandlerWithoutEOF(of Handler[error]) Handler[error]
- func (Handlers) ErrorHandlerWithoutTerminating(of Handler[error]) Handler[error]
- func (Handlers) ErrorProcessor(pf Processor[error]) Processor[error]
- func (Handlers) ErrorStackHandler() (*ers.Stack, Handler[error])
- func (Handlers) ErrorStackIterator(s *ers.Stack) *Iterator[error]
- func (Handlers) ErrorStackProducer(s *ers.Stack) Producer[error]
- func (Handlers) ErrorUnwindTransformer(filter ers.Filter) Transform[error, []error]
- func (Handlers) Itoa() Transform[int, string]
- func (Handlers) Lines(reader io.Reader) *Iterator[string]
- func (Handlers) LinesWithSpaceTrimed(reader io.Reader) *Iterator[string]
- func (Handlers) OperationPool(iter *Iterator[Operation]) Operation
- func (Handlers) ProcessOperation() Processor[Operation]
- func (Handlers) ProcessWorker() Processor[Worker]
- func (Handlers) Recover(ob Handler[error])
- func (Handlers) Sprint(args ...any) Future[string]
- func (Handlers) Sprintf(tmpl string, args ...any) Future[string]
- func (Handlers) Sprintln(args ...any) Future[string]
- func (Handlers) Str(args []any) Future[string]
- func (Handlers) StrConcatinate(strs ...string) Future[string]
- func (Handlers) StrJoin(strs []string, sep string) Future[string]
- func (Handlers) StrSliceConcatinate(input []string) Future[string]
- func (Handlers) Strf(tmpl string, args []any) Future[string]
- func (Handlers) Stringer(op fmt.Stringer) Future[string]
- func (Handlers) Strln(args []any) Future[string]
- func (Handlers) WorkerPool(iter *Iterator[Worker]) Worker
- type Iterator
- func ChannelIterator[T any](ch <-chan T) *Iterator[T]
- func ConvertIterator[T, O any](iter *Iterator[T], op Transform[T, O]) *Iterator[O]
- func Generator[T any](op Producer[T]) *Iterator[T]
- func Map[T any, O any](input *Iterator[T], mpf Transform[T, O], ...) *Iterator[O]
- func MergeIterators[T any](iters ...*Iterator[T]) *Iterator[T]
- func SliceIterator[T any](in []T) *Iterator[T]
- func VariadicIterator[T any](in ...T) *Iterator[T]
- func (i *Iterator[T]) AddError(e error)
- func (i *Iterator[T]) Any() *Iterator[any]
- func (i *Iterator[T]) Buffer(n int) *Iterator[T]
- func (i *Iterator[T]) BufferedChannel(ctx context.Context, size int) <-chan T
- func (i *Iterator[T]) Channel(ctx context.Context) <-chan T
- func (i *Iterator[T]) Close() error
- func (i *Iterator[T]) Count(ctx context.Context) int
- func (i *Iterator[T]) ErrorHandler() Handler[error]
- func (i *Iterator[T]) Filter(check func(T) bool) *Iterator[T]
- func (i *Iterator[T]) Join(iters ...*Iterator[T]) *Iterator[T]
- func (i *Iterator[T]) MarshalJSON() ([]byte, error)
- func (i *Iterator[T]) Next(ctx context.Context) bool
- func (i *Iterator[T]) Observe(fn Handler[T]) Worker
- func (i *Iterator[T]) ParallelBuffer(n int) *Iterator[T]
- func (i *Iterator[T]) Process(fn Processor[T]) Worker
- func (i *Iterator[T]) ProcessParallel(fn Processor[T], optp ...OptionProvider[*WorkerGroupConf]) Worker
- func (i *Iterator[T]) Producer() Producer[T]
- func (i *Iterator[T]) ReadOne(ctx context.Context) (out T, err error)
- func (i *Iterator[T]) Reduce(reducer func(T, T) (T, error)) Producer[T]
- func (i *Iterator[T]) Seq(ctx context.Context) iter.Seq[T]
- func (i *Iterator[T]) Slice(ctx context.Context) (out []T, _ error)
- func (i *Iterator[T]) Split(num int) []*Iterator[T]
- func (i *Iterator[T]) Transform(op Transform[T, T]) *Iterator[T]
- func (i *Iterator[T]) UnmarshalJSON(in []byte) error
- func (i *Iterator[T]) Value() T
- type Operation
- func (wf Operation) Add(ctx context.Context, wg *WaitGroup)
- func (wf Operation) After(ts time.Time) Operation
- func (wf Operation) Background(ctx context.Context)
- func (wf Operation) Block()deprecated
- func (wf Operation) Delay(dur time.Duration) Operation
- func (wf Operation) Go() Operation
- func (wf Operation) If(cond bool) Operation
- func (wf Operation) Interval(dur time.Duration) Operation
- func (wf Operation) Jitter(dur func() time.Duration) Operation
- func (wf Operation) Join(ops ...Operation) Operation
- func (wf Operation) Launch(ctx context.Context) Operation
- func (wf Operation) Limit(in int) Operation
- func (wf Operation) Lock() Operation
- func (wf Operation) Once() Operation
- func (wf Operation) PostHook(hook func()) Operation
- func (wf Operation) PreHook(hook Operation) Operation
- func (wf Operation) Run(ctx context.Context)
- func (wf Operation) Signal(ctx context.Context) <-chan struct{}
- func (wf Operation) StartGroup(ctx context.Context, wg *WaitGroup, n int)
- func (wf Operation) TTL(dur time.Duration) Operation
- func (wf Operation) Wait()
- func (wf Operation) When(cond func() bool) Operation
- func (wf Operation) While() Operation
- func (wf Operation) WithCancel() (Operation, context.CancelFunc)
- func (wf Operation) WithLock(mtx *sync.Mutex) Operation
- func (wf Operation) WithRecover() Worker
- func (wf Operation) Worker() Worker
- type OptionProvider
- func JoinOptionProviders[T any](op ...OptionProvider[T]) OptionProvider[T]
- func WorkerGroupConfAddExcludeErrors(errs ...error) OptionProvider[*WorkerGroupConf]
- func WorkerGroupConfContinueOnError() OptionProvider[*WorkerGroupConf]
- func WorkerGroupConfContinueOnPanic() OptionProvider[*WorkerGroupConf]
- func WorkerGroupConfErrorCollectorPair(ob Handler[error], resolver Future[error]) OptionProvider[*WorkerGroupConf]
- func WorkerGroupConfErrorHandler(observer Handler[error]) OptionProvider[*WorkerGroupConf]
- func WorkerGroupConfErrorResolver(resolver func() error) OptionProvider[*WorkerGroupConf]
- func WorkerGroupConfIncludeContextErrors() OptionProvider[*WorkerGroupConf]
- func WorkerGroupConfNumWorkers(num int) OptionProvider[*WorkerGroupConf]
- func WorkerGroupConfSet(opt *WorkerGroupConf) OptionProvider[*WorkerGroupConf]
- func WorkerGroupConfWithErrorCollector(ec interface{ ... }) OptionProvider[*WorkerGroupConf]
- func WorkerGroupConfWorkerPerCPU() OptionProvider[*WorkerGroupConf]
- type Processor
- func MakeHandlerProcessor[T any](fn func(T)) Processor[T]
- func MakeProcessor[T any](fn func(T) error) Processor[T]
- func NewProcessor[T any](fn func(context.Context, T) error) Processor[T]
- func Processify[T any](fn func(context.Context, T) error) Processor[T]deprecated
- func ProcessifyHandler[T any](fn Handler[T]) Processor[T]deprecated
- func ProcessorGroup[T any](pfs ...Processor[T]) Processor[T]
- func (pf Processor[T]) Add(ctx context.Context, wg *WaitGroup, eh Handler[error], op T)
- func (pf Processor[T]) After(ts time.Time) Processor[T]
- func (pf Processor[T]) Background(ctx context.Context, op T) Worker
- func (pf Processor[T]) Block(in T) errordeprecated
- func (pf Processor[T]) Capture() Handler[T]
- func (pf Processor[T]) Check(ctx context.Context, in T) bool
- func (pf Processor[T]) Delay(dur time.Duration) Processor[T]
- func (pf Processor[T]) Filter(fl func(T) bool) Processor[T]
- func (pf Processor[T]) Force(in T)
- func (pf Processor[T]) Handler(ctx context.Context, oe Handler[error]) Handler[T]
- func (pf Processor[T]) If(c bool) Processor[T]
- func (pf Processor[T]) Ignore(ctx context.Context, in T)
- func (pf Processor[T]) Iterator(iter *Iterator[T]) Worker
- func (pf Processor[T]) Jitter(jf func() time.Duration) Processor[T]
- func (pf Processor[T]) Join(pfs ...Processor[T]) Processor[T]
- func (pf Processor[T]) Limit(n int) Processor[T]
- func (pf Processor[T]) Lock() Processor[T]
- func (pf Processor[T]) Once() Processor[T]
- func (pf Processor[T]) Operation(of Handler[error], in T) Operation
- func (pf Processor[T]) Parallel(ops ...T) Worker
- func (pf Processor[T]) PostHook(op func()) Processor[T]
- func (pf Processor[T]) PreHook(op Operation) Processor[T]
- func (pf Processor[T]) ReadAll(prod Producer[T]) Worker
- func (pf Processor[T]) ReadOne(prod Producer[T]) Worker
- func (pf Processor[T]) Retry(n int, in T) Worker
- func (pf Processor[T]) Run(ctx context.Context, in T) error
- func (pf Processor[T]) TTL(dur time.Duration) Processor[T]
- func (pf Processor[T]) Wait(in T) error
- func (pf Processor[T]) When(c func() bool) Processor[T]
- func (pf Processor[T]) WithCancel() (Processor[T], context.CancelFunc)
- func (pf Processor[T]) WithErrorCheck(ef Future[error]) Processor[T]
- func (pf Processor[T]) WithErrorFilter(ef ers.Filter) Processor[T]
- func (pf Processor[T]) WithLock(mtx sync.Locker) Processor[T]
- func (pf Processor[T]) WithRecover() Processor[T]
- func (pf Processor[T]) WithoutErrors(errs ...error) Processor[T]
- func (pf Processor[T]) Worker(in T) Worker
- type Producer
- func CheckProducer[T any](op func() (T, bool)) Producer[T]
- func ConsistentProducer[T any](fn func() T) Producer[T]
- func MakeProducer[T any](fn func() (T, error)) Producer[T]
- func NewProducer[T any](fn func(ctx context.Context) (T, error)) Producer[T]
- func StaticProducer[T any](val T, err error) Producer[T]
- func ValueProducer[T any](val T) Producer[T]
- func (pf Producer[T]) After(ts time.Time) Producer[T]
- func (pf Producer[T]) Background(ctx context.Context, of Handler[T]) Worker
- func (pf Producer[T]) Block() (T, error)deprecated
- func (pf Producer[T]) Check(ctx context.Context) (T, bool)
- func (pf Producer[T]) CheckForce() (T, bool)
- func (pf Producer[T]) Delay(d time.Duration) Producer[T]
- func (pf Producer[T]) Filter(fl func(T) bool) Producer[T]
- func (pf Producer[T]) Force() Future[T]
- func (pf Producer[T]) Future(ctx context.Context, ob Handler[error]) Future[T]
- func (pf Producer[T]) GenerateParallel(optp ...OptionProvider[*WorkerGroupConf]) *Iterator[T]
- func (pf Producer[T]) If(cond bool) Producer[T]
- func (pf Producer[T]) Ignore(ctx context.Context) Future[T]
- func (pf Producer[T]) Iterator() *Iterator[T]
- func (pf Producer[T]) IteratorWithErrorCollector(ec Handler[error], er Future[error]) *Iterator[T]
- func (pf Producer[T]) IteratorWithHook(hook func(*Iterator[T])) *Iterator[T]
- func (pf Producer[T]) Jitter(jf func() time.Duration) Producer[T]
- func (pf Producer[T]) Join(next Producer[T]) Producer[T]
- func (pf Producer[T]) Launch(ctx context.Context) Producer[T]
- func (pf Producer[T]) Limit(in int) Producer[T]
- func (pf Producer[T]) Lock() Producer[T]
- func (pf Producer[T]) Must(ctx context.Context) Future[T]
- func (pf Producer[T]) Once() Producer[T]
- func (pf Producer[T]) Operation(of Handler[T], eo Handler[error]) Operation
- func (pf Producer[T]) PostHook(op func()) Producer[T]
- func (pf Producer[T]) PreHook(op Operation) Producer[T]
- func (pf Producer[T]) Resolve(ctx context.Context) (T, error)
- func (pf Producer[T]) Retry(n int) Producer[T]
- func (pf Producer[T]) Run(ctx context.Context) (T, error)deprecated
- func (pf Producer[T]) SendAll(proc Processor[T]) Worker
- func (pf Producer[T]) SendOne(proc Processor[T]) Worker
- func (pf Producer[T]) TTL(dur time.Duration) Producer[T]
- func (pf Producer[T]) Wait() (T, error)
- func (pf Producer[T]) When(cond func() bool) Producer[T]
- func (pf Producer[T]) WithCancel() (Producer[T], context.CancelFunc)
- func (pf Producer[T]) WithErrorCheck(ef Future[error]) Producer[T]
- func (pf Producer[T]) WithErrorFilter(ef ers.Filter) Producer[T]
- func (pf Producer[T]) WithLock(mtx sync.Locker) Producer[T]
- func (pf Producer[T]) WithRecover() Producer[T]
- func (pf Producer[T]) WithoutErrors(errs ...error) Producer[T]
- func (pf Producer[T]) Worker(of Handler[T]) Worker
- type RuntimeInvariant
- type Transform
- func (mpf Transform[T, O]) CheckWait() func(T) (O, bool)
- func (mpf Transform[T, O]) Convert(in T) Producer[O]
- func (mpf Transform[T, O]) ConvertFuture(fn Future[T]) Producer[O]
- func (mpf Transform[T, O]) ConvertProducer(fn Producer[T]) Producer[O]
- func (mpf Transform[T, O]) Lock() Transform[T, O]
- func (mpf Transform[T, O]) Pipe() (in Processor[T], out Producer[O])
- func (mpf Transform[T, O]) Process(iter *Iterator[T]) *Iterator[O]
- func (mpf Transform[T, O]) ProcessParallel(iter *Iterator[T], optp ...OptionProvider[*WorkerGroupConf]) *Iterator[O]
- func (mpf Transform[T, O]) ProcessPipe(in Producer[T], out Processor[O]) Worker
- func (mpf Transform[T, O]) Producer(prod Producer[T]) Producer[O]
- func (mpf Transform[T, O]) Run(ctx context.Context, in T) (O, error)
- func (mpf Transform[T, O]) Wait() func(T) (O, error)
- func (mpf Transform[T, O]) WithLock(mu sync.Locker) Transform[T, O]
- func (mpf Transform[T, O]) WithRecover() Transform[T, O]
- func (mpf Transform[T, O]) Worker(in T, hf Handler[O]) Worker
- func (mpf Transform[T, O]) WorkerFuture(fn Future[T], hf Handler[O]) Worker
- type WaitGroup
- func (wg *WaitGroup) Add(num int)
- func (wg *WaitGroup) DoTimes(ctx context.Context, n int, op Operation)
- func (wg *WaitGroup) Done()
- func (wg *WaitGroup) Inc()
- func (wg *WaitGroup) IsDone() bool
- func (wg *WaitGroup) Launch(ctx context.Context, op Operation)
- func (wg *WaitGroup) Num() int
- func (wg *WaitGroup) Operation() Operation
- func (wg *WaitGroup) Wait(ctx context.Context)
- func (wg *WaitGroup) Worker() Worker
- type Worker
- func (wf Worker) After(ts time.Time) Worker
- func (wf Worker) Background(ctx context.Context, ob Handler[error]) Operation
- func (wf Worker) Block() errordeprecated
- func (wf Worker) Check(ctx context.Context) bool
- func (wf Worker) Delay(dur time.Duration) Worker
- func (wf Worker) Group(n int) Worker
- func (wf Worker) If(cond bool) Worker
- func (wf Worker) Ignore() Operation
- func (wf Worker) Interval(dur time.Duration) Worker
- func (wf Worker) Jitter(jf func() time.Duration) Worker
- func (wf Worker) Join(wfs ...Worker) Worker
- func (wf Worker) Launch(ctx context.Context) Worker
- func (wf Worker) Limit(n int) Worker
- func (wf Worker) Lock() Worker
- func (wf Worker) Must() Operation
- func (wf Worker) Observe(ctx context.Context, ob Handler[error])
- func (wf Worker) Once() Worker
- func (wf Worker) Operation(ob Handler[error]) Operation
- func (wf Worker) PostHook(op func()) Worker
- func (wf Worker) PreHook(op Operation) Worker
- func (wf Worker) Retry(n int) Worker
- func (wf Worker) Run(ctx context.Context) error
- func (wf Worker) Signal(ctx context.Context) <-chan error
- func (wf Worker) StartGroup(ctx context.Context, n int) Worker
- func (wf Worker) TTL(dur time.Duration) Worker
- func (wf Worker) Wait() error
- func (wf Worker) When(cond func() bool) Worker
- func (wf Worker) While() Worker
- func (wf Worker) WithCancel() (Worker, context.CancelFunc)
- func (wf Worker) WithErrorCheck(ef Future[error]) Worker
- func (wf Worker) WithErrorFilter(ef ers.Filter) Worker
- func (wf Worker) WithLock(mtx sync.Locker) Worker
- func (wf Worker) WithRecover() Worker
- func (wf Worker) WithoutErrors(errs ...error) Worker
- type WorkerGroupConf
Constants ¶
const ErrInvariantViolation ers.Error = ers.ErrInvariantViolation
ErrInvariantViolation is the root error of the error object that is the content of all panics produced by the Invariant helper.
const ErrIteratorSkip ers.Error = ers.ErrCurrentOpSkip
ErrIteratorSkip instructs consumers of Iterators and related processors that run groups. Equivalent to the "continue" keyword in other contexts.
const ErrNonBlockingChannelOperationSkipped ers.Error = ers.ErrCurrentOpSkip
ErrNonBlockingChannelOperationSkipped is returned when sending into a channel, in a non-blocking context, when the channel was full and the send or receive was therefore skipped.
const ErrRecoveredPanic ers.Error = ers.ErrRecoveredPanic
ErrRecoveredPanic is at the root of any error returned by a function in the fun package that recovers from a panic.
Variables ¶
var HF = Handlers{}
HF provides namespaced access to the Handlers/constructors provided by the handler's type.
var Invariant = RuntimeInvariant{}
Invariant provides a namespace for making runtime invariant assertions. These all raise panics, passing error objects from panic, which can be more easily handled. These helpers are syntactic sugar around Invariant.OK, and the invariant is violated the ErrInvariantViolation.
Functions ¶
func HandlePassthrough ¶ added in v0.10.3
HandlePassthrough creates a bit of syntactic sugar to handle the _second_ return value of a function with a provided handler function while returning the first. This is often useful for processing the error value of a function with a handler while returning the first (potentially zero) value.
Types ¶
type ChanOp ¶ added in v0.10.0
type ChanOp[T any] struct { // contains filtered or unexported fields }
ChanOp is a wrapper around a channel, to make it easier to write clear code that uses and handles basic operations with single channels. From a high level an operation might look like:
ch := make(chan string) err := fun.Blocking().Send("hello world")
Methods on ChanOp and related structures are not pointer receivers, ensure that the output values are recorded as needed. Typically it's reasonable to avoid creating ChanOp objects in a loop as well.
func Blocking ¶ added in v0.8.5
Blocking produces a blocking Send instance. All Send/Check/Ignore operations will block until the context is canceled, the channel is canceled, or the send succeeds.
func Chan ¶ added in v0.10.4
Chan constructs a channel op, like "make(chan T)", with the optionally specified length. Operations (like read from and write to a channel) on the channel are blocking by default, but the
func DefaultChan ¶ added in v0.10.4
DefaultChan takes a channel value and if it is non-nil, returns it; otherwise it constructs a new ChanOp of the specified type with the optionally provided length and returns it.
func NonBlocking ¶ added in v0.8.5
NonBlocking produces a send instance that performs a non-blocking send.
The Send() method, for non-blocking sends, will return ErrSkipedNonBlockingSend if the channel was full and the object was not sent.
func (ChanOp[T]) Blocking ¶ added in v0.10.4
Blocking returns a version of the ChanOp in blocking mode. This is not an atomic operation.
func (ChanOp[T]) Channel ¶ added in v0.10.0
func (op ChanOp[T]) Channel() chan T
Channel returns the underlying channel.
func (ChanOp[T]) Close ¶ added in v0.10.0
func (op ChanOp[T]) Close()
Close closes the underlying channel.
This swallows any panic encountered when calling close() on the underlying channel, which makes it safe to call on nil or already-closed channels: the result in all cases (that the channel is closed when Close() returns, is the same in all cases.)
func (ChanOp[T]) Iterator ¶ added in v0.10.0
Iterator returns the "receive" aspect of the channel as an iterator. This is equivalent to fun.ChannelIterator(), but may be more accessible in some contexts.
func (ChanOp[T]) NonBlocking ¶ added in v0.10.4
NonBlocking returns a version of the ChanOp in non-blocking mode. This is not an atomic operation.
func (ChanOp[T]) Pipe ¶ added in v0.10.0
Pipe creates a linked pair of functions for transmitting data via these interfaces.
func (ChanOp[T]) Processor ¶ added in v0.10.0
Processor exposes the "send" aspect of the channel as a Processor function.
func (ChanOp[T]) Producer ¶ added in v0.10.0
Producer expoess the "receive" aspect of the channel as a Producer function.
func (ChanOp[T]) Receive ¶ added in v0.10.0
func (op ChanOp[T]) Receive() ChanReceive[T]
Receive returns a ChanReceive object that acts on the same underlying sender.
type ChanReceive ¶ added in v0.10.0
type ChanReceive[T any] struct { // contains filtered or unexported fields }
ChanReceive, wraps a channel fore <-chan T operations. It is the type returned by the ChanReceive() method on ChannelOp. The primary method is Read(), with other methods provided as "self-documenting" helpers.
func BlockingReceive ¶ added in v0.10.0
func BlockingReceive[T any](ch <-chan T) ChanReceive[T]
BlockingReceive is the equivalent of Blocking(ch).Receive(), except that it accepts a receive-only channel.
func NonBlockingReceive ¶ added in v0.10.0
func NonBlockingReceive[T any](ch <-chan T) ChanReceive[T]
NonBlockingReceive is the equivalent of NonBlocking(ch).Receive(), except that it accepts a receive-only channel.
func (ChanReceive[T]) Check ¶ added in v0.10.0
func (ro ChanReceive[T]) Check(ctx context.Context) (T, bool)
Check performs the read operation and converts the error into an "ok" value, returning true if receive was successful and false otherwise.
func (ChanReceive[T]) Consume ¶ added in v0.10.0
func (ro ChanReceive[T]) Consume(op Processor[T]) Worker
Consume returns a Worker function that processes the output of data from the channel with the Processor function. If the processor function returns ErrIteratorSkip, the processing will continue. All other Processor errors (and problems reading from the channel,) abort iterator. io.EOF errors are not propagated to the caller.
func (ChanReceive[T]) Drop ¶ added in v0.10.0
func (ro ChanReceive[T]) Drop(ctx context.Context) bool
Drop performs a read operation and drops the response. If an item was dropped (e.g. Read would return an error), Drop() returns false, and true when the Drop was successful.
func (ChanReceive[T]) Filter ¶ added in v0.10.5
func (ro ChanReceive[T]) Filter(ctx context.Context, eh Handler[error], filter func(T) bool) ChanReceive[T]
Filter returns a channel that consumes the output of a channel and returns a NEW channel that only contains elements that have elements that the filter function returns true for.
func (ChanReceive[T]) Force ¶ added in v0.10.0
func (ro ChanReceive[T]) Force(ctx context.Context) (out T)
Force ignores the error returning only the value from Read. This is either the value sent through the channel, or the zero value for T. Because zero values can be sent through channels, Force does not provide a way to distinguish between "channel-closed" and "received a zero value".
func (ChanReceive[T]) Ignore ¶ added in v0.10.0
func (ro ChanReceive[T]) Ignore(ctx context.Context)
Ignore reads one item from the channel and discards it.
func (ChanReceive[T]) Iterator ¶ added in v0.10.0
func (ro ChanReceive[T]) Iterator() *Iterator[T]
Iterator provides access to the contents of the channel as a fun-style iterator. For ChanRecieve objects in non-blocking mode, iteration ends when there are no items in the channel. In blocking mode, iteration ends when the context is canceled or the channel is closed.
func (ChanReceive[T]) Ok ¶ added in v0.10.0
func (ro ChanReceive[T]) Ok() bool
Ok attempts to read from a channel returns true either when the channel is blocked or an item is read from the channel and false when the channel has been closed.
func (ChanReceive[T]) Producer ¶ added in v0.10.0
func (ro ChanReceive[T]) Producer() Producer[T]
Producer returns the Read method as a producer for integration into existing tools.
func (ChanReceive[T]) Read ¶ added in v0.10.0
func (ro ChanReceive[T]) Read(ctx context.Context) (T, error)
Read performs the read operation according to the blocking/non-blocking semantics of the receive operation.
In general errors are either: io.EOF if channel is closed; a context cancellation error if the context passed to Read() is canceled, or ErrSkippedNonBlockingChannelOperation in the non-blocking case if the channel was empty.
In all cases when Read() returns an error, the return value is the zero value for T.
func (ChanReceive[T]) Seq ¶ added in v0.11.0
func (ro ChanReceive[T]) Seq(ctx context.Context) iter.Seq[T]
Iterator provides access to the contents of the channel as a new-style standard library iterator. For ChanRecieve objects in non-blocking mode, iteration ends when there are no items in the channel. In blocking mode, iteration ends when the context is canceled or the channel is closed.
func (ChanReceive[T]) Seq2 ¶ added in v0.11.0
Iterator provides access to the contents of the channel as a new-style standard library iterator. For ChanRecieve objects in non-blocking mode, iteration ends when there are no items in the channel. In blocking mode, iteration ends when the context is canceled or the channel is closed.
type ChanSend ¶ added in v0.10.0
type ChanSend[T any] struct { // contains filtered or unexported fields }
ChanSend provides access to channel send operations, and is contstructed by the ChanSend() method on the channel operation. The primary method is Write(), with other methods provided for clarity.
func BlockingSend ¶ added in v0.10.0
BlockingSend is equivalent to Blocking(ch).Send() except that it accepts a send-only channel.
func NonBlockingSend ¶ added in v0.10.0
NonBlockingSend is equivalent to NonBlocking(ch).Send() except that it accepts a send-only channel.
func (ChanSend[T]) Check ¶ added in v0.10.0
Check performs a send and returns true when the send was successful and false otherwise.
func (ChanSend[T]) Consume ¶ added in v0.10.0
Consume returns a worker that, when executed, pushes the content from the iterator into the channel.
func (ChanSend[T]) Processor ¶ added in v0.10.0
Processor returns the Write method as a processor for integration into existing tools
func (ChanSend[T]) Signal ¶ added in v0.10.0
Signal attempts to sends the Zero value of T through the channel and returns when: the send succeeds, the channel is full and this is a non-blocking send, the context is canceled, or the channel is closed.
func (ChanSend[T]) Write ¶ added in v0.10.0
Write sends the item into the channel captured by Blocking/NonBlocking returning the appropriate error.
The returned error is nil if the send was successful, and an io.EOF if the channel is closed (or nil) rather than a panic (as with the equivalent direct operation.) The error value is a context cancelation error when the context is canceled, and for non-blocking sends, if the channel did not accept the write, ErrSkippedNonBlockingChannelOperation is returned.
type Future ¶ added in v0.10.0
type Future[T any] func() T
Future is a basic function for providing a fun-style function type for a function object that will produce an object of the specified type.
func AsFuture ¶ added in v0.10.0
AsFuture wraps a value and returns a future object that, when called, will return the provided value.
func Futurize ¶ added in v0.10.0
Futureize is a simple wrapper to convert a function object to a Future[T] object.
func (Future[T]) If ¶ added in v0.10.0
If produces a future that only runs when the condition value is true. If the condition is false, the future will return the zero value of T.
func (Future[T]) Ignore ¶ added in v0.10.0
func (f Future[T]) Ignore() func()
Ignore produces a function that will call the Future but discards the output.
func (Future[T]) Lock ¶ added in v0.10.0
Locked returns a wrapped future that ensure that all calls to the future are protected by a mutex.
func (Future[T]) Not ¶ added in v0.10.0
Not produces that only runs when the condition value is false. If the condition is true, the future will return the zero value of T.
func (Future[T]) Once ¶ added in v0.10.0
Once returns a future that will only run the underlying future exactly once.
func (Future[T]) PostHook ¶ added in v0.10.0
PostHook unconditionally runs the provided function after running the future. The hook runs in a defer statement.
func (Future[T]) PreHook ¶ added in v0.10.0
PreHook unconditionally runs the provided function before running and returning the function.
func (Future[T]) Producer ¶ added in v0.10.0
Producer returns a producer function that wraps the future.
func (Future[T]) Reduce ¶ added in v0.10.0
Reduce takes the input future, the next future, and merges the results using the merge function.
func (Future[T]) Resolve ¶ added in v0.10.4
func (f Future[T]) Resolve() T
Resolve executes the future and returns its value.
func (Future[T]) Slice ¶ added in v0.10.0
func (f Future[T]) Slice() func() []T
Slice returns a future-like function that wraps the output of the future as the first element in a slice.
type Handler ¶ added in v0.10.2
type Handler[T any] func(T)
Handler describes a function that operates on a single object, but returns no output, and is used primarily for side effects, particularly around handling errors or collecting metrics. The Handler implementation here makes it possible to provide panic-safety for these kinds of functions or easily convert to other related types.
func (Handler[T]) All ¶ added in v0.10.3
func (of Handler[T]) All(in ...T)
All processes all inputs with the specified handler.
func (Handler[T]) Capture ¶ added in v0.10.2
func (of Handler[T]) Capture(in T) func()
Capture returns a function that handles the specified value, but only when executed later.
func (Handler[T]) Chain ¶ added in v0.10.3
Chain calls the base handler, and then calls every handler in the chain.
func (Handler[T]) Filter ¶ added in v0.10.2
Filter creates an handler that only executes the root handler Use this to process or transform the input before it is passed to the underlying handler. Use in combination with the Skip function to filter out non-actionable inputs.
func (Handler[T]) Handle ¶ added in v0.10.4
func (of Handler[T]) Handle(in T)
Handler provides a more expository operation to call a handler function.
func (Handler[T]) If ¶ added in v0.10.2
If returns an handler that only executes the root handler if the condition is true.
func (Handler[T]) Iterator ¶ added in v0.10.2
Iterator produces a worker that processes every item in the iterator with the handler function function.
func (Handler[T]) Join ¶ added in v0.10.2
Join creates an handler function that runs both the root handler and the "next" handler.
func (Handler[T]) Lock ¶ added in v0.10.2
Lock returns an handler that is protected by a mutex. All execution's of the handler are isolated.
func (Handler[T]) Once ¶ added in v0.10.2
Once produces an handler function that runs exactly once, and successive executions of the handler are noops.
func (Handler[T]) Operation ¶ added in v0.10.2
Operation captures a variable and converts an Handler into a wait function that observes the value when the Operation runs.
func (Handler[T]) PreHook ¶ added in v0.10.5
PreHook provides the inverse operation to Join, running "prev" handler before the base handler.
func (Handler[T]) Processor ¶ added in v0.10.2
Processor converts the handler to an handler function. The Processor will always return nil, and the context is ignored.
func (Handler[T]) RecoverPanic ¶ added in v0.10.5
RecoverPanic runs the handler function with a panic handler and converts a possible panic to an error.
func (Handler[T]) Skip ¶ added in v0.10.2
Skip runs a check before passing the object to the obsever, when the condition function--which can inspect the input object--returns true, the underlying Handler is called, otherwise, the observation is a noop.
func (Handler[T]) When ¶ added in v0.10.2
When returns an handler function that only executes the handler function if the condition function returns true. The condition function is run every time the handler function runs.
func (Handler[T]) WithLock ¶ added in v0.10.2
WithLock protects the action of the handler with the provied mutex.
func (Handler[T]) WithRecover ¶ added in v0.10.5
WithRecover handles any panic encountered during the handler's execution and converts it to an error.
type Handlers ¶ added in v0.10.0
type Handlers struct{}
The Handlers type serves to namespace constructors of common operations and specializations of generic functions provided by this package.
func (Handlers) Atoi ¶ added in v0.10.1
Atoi produces a Transform function that converts strings into integers.
func (Handlers) Counter ¶ added in v0.10.1
Counter produces an iterator that, starting at 1, yields monotonically increasing integers until the maximum is reached.
func (Handlers) ErrorCollector ¶ added in v0.10.0
ErrorCollector provides a basic error aggregation facility around ers.Stack (as with ErrorStackHandler, though this is an implementation detail.) ErrorCollector does use a mutex to guard these objects.
func (Handlers) ErrorHandler ¶ added in v0.10.2
ErrorHandler constructs an error observer that only calls the wrapped observer when the error passed is non-nil.
func (Handlers) ErrorHandlerSingle ¶ added in v0.10.3
ErrorHandlerSingle creates an Handler/Future pair for errors that that, with a lightweight concurrency control, captures the first non-nil error it encounters.
func (Handlers) ErrorHandlerWithAbort ¶ added in v0.10.3
func (Handlers) ErrorHandlerWithAbort(cancel context.CancelFunc) Handler[error]
ErrorHandlerWithAbort creates a new error handler that--ignoring nil and context expiration errors--will call the provided context cancellation function when it receives an error.
Use the Chain and Join methods of handlers to further process the error.
func (Handlers) ErrorHandlerWithoutEOF ¶ added in v0.10.2
ErrorHandlerWithoutEOF wraps an error observer and propagates all non-error and non-io.EOF errors to the underlying observer.
func (Handlers) ErrorHandlerWithoutTerminating ¶ added in v0.10.2
ErrorHandlerWithoutTerminating wraps an error observer and only calls the underlying observer if the input error is non-nil and is not one of the "terminating" errors used by this package (e.g. io.EOF and the context cancellation errors).
func (Handlers) ErrorProcessor ¶ added in v0.10.0
ErrorProcessor produces an error Processor function for errors that only calls the input Processor if the error is non-nil.
func (Handlers) ErrorStackHandler ¶ added in v0.10.5
ErrorStackHandler returns an ers.ErrorStack, and a fun.Handler[error] function that will add an error to the stack. This collector is not safe for concurrent use.
func (Handlers) ErrorStackIterator ¶ added in v0.11.0
ErrorStackIterator creates an iterator object to view the contents of a ers.Stack object (error collection).
func (Handlers) ErrorStackProducer ¶ added in v0.11.0
ErrorStackProducer creates an producer function to yield the contents of a ers.Stack object (error collection).
func (Handlers) ErrorUnwindTransformer ¶ added in v0.10.0
ErrorUnwindTransformer provides the ers.Unwind operation as a transform method, which consumes an error and produces a slice of its component errors. All errors are processed by the provided filter, and the transformer's context is not used. The error value of the Transform function is always nil.
func (Handlers) Itoa ¶ added in v0.10.1
Itoa produces a Transform function that converts integers into strings.
func (Handlers) Lines ¶ added in v0.10.1
Lines provides a fun.Iterator access over the contexts of a (presumably plaintext) io.Reader, using the bufio.Scanner.
func (Handlers) LinesWithSpaceTrimed ¶ added in v0.10.9
LinesWithSpaceTrimed provides an iterator with access to the line-separated content of an io.Reader, line Lines(), but with the leading and trailing space trimmed from each line.
func (Handlers) OperationPool ¶ added in v0.10.0
RunOperations returns a Operation that, when called, processes the incoming iterator of Operations, starts a go routine for running each element in the iterator, (without any throttling or rate limiting) and then blocks until all operations have returned, or the context passed to the output function has been canceled.
For more configuraable options, use the itertool.Worker() function which provides more configurability and supports both Operation and Worker functions.
func (Handlers) ProcessOperation ¶ added in v0.10.0
ProcessOperation constructs a Processor function for running Worker functions. Use in combination with Process and ProcessParallel, and to build worker pools.
The Handlers type serves to namespace these constructors, for interface clarity purposes. Use the HF variable to access this method as in:
fun.HF.ProcessOperation()
func (Handlers) ProcessWorker ¶ added in v0.10.0
ProcessWorker constructs a Processor function for running Worker functions. Use in combination with Process and ProcessParallel, and to build worker pools.
The Handlers type serves to namespace these constructors, for interface clarity purposes. Use the HF variable to access this method as in:
fun.HF.ProcessWorker()
func (Handlers) Recover ¶ added in v0.10.4
Recovery catches a panic, turns it into an error and passes it to the provided observer function.
func (Handlers) Sprint ¶ added in v0.10.0
Sprint constructs a future that calls fmt.Sprint over the given variadic arguments.
func (Handlers) Sprintf ¶ added in v0.10.0
Sprintf produces a future that calls and returns fmt.Sprintf for the provided arguments when the future is called.
func (Handlers) Sprintln ¶ added in v0.10.0
Sprintln constructs a future that calls fmt.Sprintln over the given variadic arguments.
func (Handlers) Str ¶ added in v0.10.0
Str provides a future that calls fmt.Sprint over a slice of any objects. Use fun.HF.Sprint for a variadic alternative.
func (Handlers) StrConcatinate ¶ added in v0.10.0
StrConcatinate produces a future that joins a variadic sequence of strings into a single string.
func (Handlers) StrJoin ¶ added in v0.10.0
StrJoin produces a future that combines a slice of strings into a single string, joined with the separator.
func (Handlers) StrSliceConcatinate ¶ added in v0.10.3
StrJoinWith produces a future for strings.Join(), concatenating the elements in the input slice with the provided separator.
func (Handlers) Strf ¶ added in v0.10.0
Strf produces a future that calls fmt.Sprintf for the given template string and arguments.
func (Handlers) Stringer ¶ added in v0.10.0
Stringer converts a fmt.Stringer object/method call into a string-formatter.
func (Handlers) Strln ¶ added in v0.10.0
Strln constructs a future that calls fmt.Sprintln for the given arguments.
func (Handlers) WorkerPool ¶ added in v0.10.8
WorkerPool, creates a work that processes an iterator of worker functions, for simple and short total-duration operations. Every worker in the pool runs in it's own go routine, and there are no limits or throttling on the number of go routines. All errors are aggregated and in a single collector (ers.Stack) which is returned by the worker when the operation ends (if many Worker's error this may create memory pressure) and there's no special handling of panics.
For more configuraable options, use the itertool.Worker() function which provides more configurability and supports both Operation and Worker functions.
type Iterator ¶
type Iterator[T any] struct { // contains filtered or unexported fields }
Iterator provides a safe, context-respecting iteration/sequence paradigm, and entire tool kit for consumer functions, converters, and generation options.
As the basis and heart of a programming model, iterators make it possible to think about groups or sequences of objects or work, that can be constructed and populated lazily, and provide a collection of interfaces for processing and manipulating data.
Beyond the iterator interactive tools provided in this package, the itertool package provdes some additional helpers and tools, while the adt and dt packages provide simple iterations and tooling around iterators.
The canonical way to use an iterator is with the core Next() Value() and Close() methods: Next takes a context and advances the iterator. Next, which is typically called in single-clause for loop (e.g. as in while loop) returns false when the iterator has no items, after which the iterator should be closed and cannot be re-started. When Next() returns true, the iterator is advanced, and the output of Value() will provide the value at the current position in the iterator. Next() will block if the iterator has not been closed, and the operation with Produces or Generates new items for the iterator blocks, (or continues iterating) until the iterator is exhausted, or closed.
However, additional methods, such as ReadOne, the Producer() function (which is a wrapper around ReadOne) provide a different iteraction paradigm: they combine the Next() and value operations into a single function call. When the iterator is exhausted these methods return the `io.EOF` error.
In all cases, checking the Close() value of the iterator makes it possible to see any errors encountered during the operation of the iterator.
Using Next/Value cannot be used concurrently, as there is no way to synchronize the Next/Value calls with respect to eachother: it's possible in this mode to both miss and/or get duplicate values from the iterator in this case. If the generator/producer function in the iterator is safe for concurrent use, then ReadOne can be used safely. As a rule, all tooling in the fun package uses ReadOne except in a few cases where a caller has exclusive access to the iterator.
func ChannelIterator ¶ added in v0.10.0
ChannelIterator exposes access to an existing "receive" channel as an iterator.
func ConvertIterator ¶ added in v0.10.0
ConvertIterator processes the input iterator of type T into an output iterator of type O. It's implementation uses the Generator, will continue producing values as long as the input iterator produces values, the context isn't canceled, or exhausted.
func Generator ¶ added in v0.8.5
Generator creates an iterator that produces new values, using the generator function provided. This implementation does not create any background go routines, and the iterator will produce values until the function returns an error or the Close() method is called. Any non-nil error returned by the generator function is propagated to the close method, as long as it is not a context cancellation error or an io.EOF error.
func Map ¶ added in v0.9.3
func Map[T any, O any]( input *Iterator[T], mpf Transform[T, O], optp ...OptionProvider[*WorkerGroupConf], ) *Iterator[O]
Map provides an orthodox functional map implementation based around fun.Iterator. Operates in asynchronous/streaming manner, so that the output Iterator must be consumed. The zero values of Options provide reasonable defaults for abort-on-error and single-threaded map operation.
If the mapper function errors, the result isn't included, but the errors would be aggregated and propagated to the `Close()` method of the resulting iterator. The mapping operation respects the fun.ErrIterationSkip error, If there are more than one error (as is the case with a panic or with ContinueOnError semantics,) the error can be unwrapped or converted to a slice with the fun.Unwind function. Panics in the map function are converted to errors and always collected but may abort the operation if ContinueOnPanic is not set.
func MergeIterators ¶ added in v0.10.0
MergeIterators takes a collection of iterators of the same type of objects and provides a single iterator over these items.
There are a collection of background threads which will iterate over the inputs and will provide the items to the output iterator. These threads start on the first iteration and will exit if this context is canceled.
The iterator will continue to produce items until all input iterators have been consumed, the initial context is canceled, or the Close method is called, or all of the input iterators have returned an error.
func SliceIterator ¶ added in v0.10.0
SliceIterator provides Iterator access to the elements in a slice.
func VariadicIterator ¶ added in v0.10.0
VariadicIterator produces an iterator from an arbitrary collection of objects, passed into the constructor.
func (*Iterator[T]) AddError ¶ added in v0.10.0
AddError can be used by calling code to add errors to the iterator, which are merged.
AddError is not safe for concurrent use (with regards to other AddError calls or Close).
func (*Iterator[T]) Any ¶ added in v0.10.0
Any, as a special case of Transform converts an iterator of any type and converts it to an iterator of any (e.g. interface{}) values.
func (*Iterator[T]) Buffer ¶ added in v0.10.0
Buffer adds a buffer in the queue using a channel as buffer to smooth out iteration performance, if the iteration (producer) and the consumer both take time, even a small buffer will improve the throughput of the system and prevent both components of the system from blocking on eachother.
The ordering of elements in the output iterator is the same as the order of elements in the input iterator.
func (*Iterator[T]) BufferedChannel ¶ added in v0.10.0
BufferedChannel provides access to the content of the iterator with a buffered channel that is closed when the iterator is exhausted.
func (*Iterator[T]) Channel ¶ added in v0.10.0
Channel proides access to the contents of the iterator as a channel. The channel is closed when the iterator is exhausted.
func (*Iterator[T]) Close ¶
Close terminates the iterator and returns any errors collected during iteration. If the iterator allocates resources, this will typically release them, but close may not block until all resources are released.
func (*Iterator[T]) Count ¶ added in v0.10.0
Count returns the number of items observed by the iterator. Callers should still manually call Close on the iterator.
func (*Iterator[T]) ErrorHandler ¶ added in v0.10.2
ErrorHandler provides access to the AddError method as an error observer.
func (*Iterator[T]) Filter ¶ added in v0.10.0
Filter passes every item in the iterator and, if the check function returns true propagates it to the output iterator. There is no buffering, and check functions should return quickly. For more advanced use, consider using itertool.Map()
func (*Iterator[T]) Join ¶ added in v0.10.0
Join merges multiple iterators processing and producing their results sequentially, and without starting any go routines. Otherwise similar to MergeIterators (which processes each iterator in parallel).
func (*Iterator[T]) MarshalJSON ¶ added in v0.10.0
MarshalJSON is useful for implementing json.Marshaler methods from iterator-supporting types. Wrapping the standard library's json encoding tools.
The contents of the iterator are marshaled as elements in an JSON array.
func (*Iterator[T]) Next ¶
Next advances the iterator (using ReadOne) and caches the current value for access with the Value() method. When Next is true, the Value() will return the next item. When false, either the iterator has been exhausted (e.g. the Producer function has returned io.EOF) or the context passed to Next has been canceled.
Using Next/Value cannot be done safely if iterator is accessed from multiple go routines concurrently. In these cases use ReadOne directly, or use Split to create an iterator that safely draws items from the parent iterator.
func (*Iterator[T]) Observe ¶ added in v0.10.0
Observe processes an iterator calling the handler function for every element in the iterator and retruning when the iterator is exhausted. Take care to ensure that the handler function does not block.
The error returned captures any panics encountered as an error, as well as the output of the Close() operation. Observe will not add a context cancelation error to its error, though the observed iterator may return one in its close method.
func (*Iterator[T]) ParallelBuffer ¶ added in v0.10.0
ParallelBuffer, like buffer, process the input queue and stores those items in a channel; however, unlike Buffer, multiple workers consume the input iterator: as a result the order of the elements in the output iterator is not the same as the input order.
Otherwise, the two Buffer methods are equivalent and serve the same purpose: process the items from an iterator without blocking the consumer of the iterator.
func (*Iterator[T]) Process ¶ added in v0.10.0
Process provides a function consumes all items in the iterator with the provided processor function.
All panics are converted to errors and propagated in the response of the worker, and abort the processing. If the processor function returns ErrIteratorSkip, processing continues. All other errors abort processing and are returned by the worker.
func (*Iterator[T]) ProcessParallel ¶ added in v0.10.0
func (i *Iterator[T]) ProcessParallel( fn Processor[T], optp ...OptionProvider[*WorkerGroupConf], ) Worker
ProcessParallel produces a worker that, when executed, will iteratively processes the contents of the iterator. The options control the error handling and parallelism semantics of the operation.
This is the work-house operation of the package, and can be used as the basis of worker pools, even processing, or message dispatching for pubsub queues and related systems.
func (*Iterator[T]) Producer ¶ added in v0.10.0
Producer provides access to the contents of the iterator as a Producer function.
func (*Iterator[T]) ReadOne ¶ added in v0.10.0
ReadOne advances the iterator and returns the value as a single option. This operation IS safe for concurrent use.
ReadOne returns the io.EOF error when the iterator has been exhausted, a context expiration error or the underlying error produced by the iterator. All errors produced by ReadOne are terminal and indicate that no further iteration is possible.
func (*Iterator[T]) Reduce ¶ added in v0.10.0
Reduce processes an iterator with a reducer function. The output function is a Producer operation which runs synchronously, and no processing happens before producer is called. If the reducer function returns, ErrIteratorskip, the output value is ignored, and the reducer operation continues. io.EOR errors are not propagated to the caller, and in all situations, the last value produced by the reducer is returned with an error.
The "previous" value for the first reduce option is the zero value for the type T.
func (*Iterator[T]) Slice ¶ added in v0.10.0
Slice converts an iterator to the slice of it's values, and closes the iterator at the when the iterator has been exhausted..
In the case of an error in the underlying iterator the output slice will have the values encountered before the error.
func (*Iterator[T]) Split ¶ added in v0.10.0
Split produces an arbitrary number of iterators which divide the input. The division is lazy and depends on the rate of consumption of output iterators, but every item from the input iterator is sent to exactly one output iterator, each of which can be safely used from a different go routine.
The input iterator is not closed after the output iterators are exhausted. There is one background go routine that reads items off of the input iterator, which starts when the first output iterator is advanced: be aware that canceling this context will effectively cancel all iterators.
func (*Iterator[T]) Transform ¶ added in v0.10.1
Transform processes an iterator passing each element through a transform function. The type of the iterator is the same for the output. Use Convert iterator to change the type of the value.
func (*Iterator[T]) UnmarshalJSON ¶ added in v0.10.0
UnmarshalJSON reads a byte-array of input data that contains a JSON array and then processes and returns that data iteratively.
To handle streaming data from an io.Reader that contains a stream of line-separated json documents, use itertool.JSON.
type Operation ¶ added in v0.10.0
Operation is a type of function object that will block until an operation returns or the context is canceled.
func MakeOperation ¶ added in v0.10.4
func MakeOperation(in func()) Operation
MakeOperation converts a function that takes no arguments into an Operation.
func WaitChannel ¶ added in v0.6.3
WaitChannel converts a channel (typically, a `chan struct{}`) to a Operation. The Operation blocks till it's context is canceled or the channel is either closed or returns one item.
func WaitContext ¶ added in v0.6.3
WaitContext wait's for the context to be canceled before returning. The Operation that's return also respects it's own context. Use this Operation and it's own context to wait for a context to be cacneled with a timeout, for instance.
func (Operation) Add ¶ added in v0.10.0
Add starts a the operation in a goroutine incrementing and decrementing the WaitGroup as appropriate.
func (Operation) After ¶ added in v0.10.0
After provides an operation that will only run if called after the specified clock time. When called after this time, the operation blocks until that time passes (or the context is canceled.)
func (Operation) Background ¶ added in v0.10.0
Background launches the operation in a go routine. There is no panic-safety provided.
func (Operation) Delay ¶ added in v0.10.0
Delay wraps a Operation in a function that will always wait for the specified duration before running.
If the value is negative, then there is always zero delay.
func (Operation) Go ¶ added in v0.10.0
Go provides access to the Go method (e.g. starting this operation in a go routine.) as a method that can be used as an operation itself.
func (Operation) If ¶ added in v0.10.0
If provides a static version of the When that only runs if the condition is true, and is otherwise a noop.
func (Operation) Interval ¶ added in v0.10.3
Interval runs the operation with a timer that resets to the provided duration. The operation runs immediately, and then the time is reset to the specified interval after the base operation is completed. Which is to say that the runtime of the operation itself is effectively added to the interval.
func (Operation) Jitter ¶ added in v0.10.0
Jitter wraps a Operation that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the Operation operation.
If the function produces a negative duration, there is no delay.
func (Operation) Join ¶ added in v0.10.0
Join combines a sequence of operations, calling the Operations in order as long as the context does not expire. If the context expires, the combined operation aborts early.
func (Operation) Launch ¶ added in v0.10.0
Launch starts the operation in a background go routine and returns an operation which blocks until it's context is canceled or the underlying operation returns.
func (Operation) Limit ¶ added in v0.10.0
Limit returns an operation that will only run the specified number of times. The resulting operation is safe for concurrent use, but operations can run concurrently.
func (Operation) Lock ¶ added in v0.10.0
Lock constructs a mutex that ensure that the underlying operation (when called through the output operation,) only runs within the scope of the lock.
func (Operation) Once ¶ added in v0.10.0
Once produces an operation that will only execute the root operation once, no matter how many times it's called.
func (Operation) PostHook ¶ added in v0.10.0
PostHook unconditionally runs the post-hook operation after the operation returns. Use the hook to run cleanup operations.
func (Operation) PreHook ¶ added in v0.10.0
PreHook unconditionally runs the hook operation before the underlying operation. Use Operaiton.Once() operations for the hook to initialize resources for use by the operation, or without Once to reset.
func (Operation) Signal ¶ added in v0.10.0
Sginal starts the operation in a go routine, and provides a signal channel which will be closed when the operation is complete.
func (Operation) StartGroup ¶ added in v0.10.0
StartGroup runs n operations, incrementing the WaitGroup to account for the job. Callers must wait on the WaitGroup independently.
func (Operation) TTL ¶ added in v0.10.0
TTL runs an operation, and if the operation is called before the specified duration, the operation is a noop.
func (Operation) Wait ¶ added in v0.10.3
func (wf Operation) Wait()
Wait runs the operation with a background context.
func (Operation) While ¶ added in v0.10.3
While runs the operation in a tight loop, until the context expires.
func (Operation) WithCancel ¶ added in v0.10.0
func (wf Operation) WithCancel() (Operation, context.CancelFunc)
WithCancel creates a Operation and a cancel function which will terminate the context that the root Operation is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Operation is canceled.)
func (Operation) WithLock ¶ added in v0.10.0
WithLock ensures that the underlying operation, when called through the output operation, will holed the mutex while running.
func (Operation) WithRecover ¶ added in v0.10.4
WithRecover converts the Operation into a Worker function that catchers panics and returns them as errors using fun.Check.
type OptionProvider ¶ added in v0.10.0
OptionProvider is a function type for building functional arguments, and is used for the parallel iterator processing (map, transform, for-each, etc.) in the fun and itertool packages, and available with tooling for use in other contexts.
The type T should always be mutable (e.g. a map, or a pointer).
func JoinOptionProviders ¶ added in v0.10.0
func JoinOptionProviders[T any](op ...OptionProvider[T]) OptionProvider[T]
JoinOptionProviders takes a zero or more option providers and produces a single combined option provider. With zero or nil arguments, the operation becomes a noop.
func WorkerGroupConfAddExcludeErrors ¶ added in v0.10.0
func WorkerGroupConfAddExcludeErrors(errs ...error) OptionProvider[*WorkerGroupConf]
WorkerGroupConfAddExcludeErrors appends the provided errors to the ExcludedErrors value. The provider will return an error if any of the input iterators is ErrRecoveredPanic.
func WorkerGroupConfContinueOnError ¶ added in v0.10.0
func WorkerGroupConfContinueOnError() OptionProvider[*WorkerGroupConf]
WorkerGroupConfContinueOnError toggles the option that allows the operation to continue when the operation encounters an error. Otherwise, any option will lead to an abort.
func WorkerGroupConfContinueOnPanic ¶ added in v0.10.0
func WorkerGroupConfContinueOnPanic() OptionProvider[*WorkerGroupConf]
WorkerGroupConfContinueOnPanic toggles the option that allows the operation to continue when encountering a panic.
func WorkerGroupConfErrorCollectorPair ¶ added in v0.10.0
func WorkerGroupConfErrorCollectorPair(ob Handler[error], resolver Future[error]) OptionProvider[*WorkerGroupConf]
WorkerGroupConfErrorCollectorPair uses an Handler/Producer pair to collect errors. A basic implementation, accessible via HF.ErrorCollector() is suitable for this purpose.
func WorkerGroupConfErrorHandler ¶ added in v0.10.2
func WorkerGroupConfErrorHandler(observer Handler[error]) OptionProvider[*WorkerGroupConf]
WorkerGroupConfWithErrorCollector saves an error observer to the configuration. Typically implementations will provide some default error collection tool, and will only call the observer for non-nil errors. ErrorHandlers should be safe for concurrent use.
func WorkerGroupConfErrorResolver ¶ added in v0.10.0
func WorkerGroupConfErrorResolver(resolver func() error) OptionProvider[*WorkerGroupConf]
WorkerGroupConfErrorResolver reports the errors collected by the observer. If the ErrorHandler is not set the resolver may be overridden. ErrorHandlers should be safe for concurrent use.
func WorkerGroupConfIncludeContextErrors ¶ added in v0.10.0
func WorkerGroupConfIncludeContextErrors() OptionProvider[*WorkerGroupConf]
WorkerGroupConfIncludeContextErrors toggles the option that forces the operation to include context errors in the output. By default they are not included.
func WorkerGroupConfNumWorkers ¶ added in v0.10.0
func WorkerGroupConfNumWorkers(num int) OptionProvider[*WorkerGroupConf]
WorkerGroupConfNumWorkers sets the number of workers configured. It is not possible to set this value to less than 1: negative values and 0 are always ignored.
func WorkerGroupConfSet ¶ added in v0.10.0
func WorkerGroupConfSet(opt *WorkerGroupConf) OptionProvider[*WorkerGroupConf]
WorkerGroupConfSet overrides the option with the provided option.
func WorkerGroupConfWithErrorCollector ¶ added in v0.10.0
func WorkerGroupConfWithErrorCollector( ec interface { Add(error) Resolve() error }, ) OptionProvider[*WorkerGroupConf]
WorkerGroupConfWithErrorCollector sets an error collector implementation for later use in the WorkerGroupOptions. The resulting function will only error if the collector is nil, however, this method will override an existing error collector.
The ErrorCollector interface is typically provided by the `erc.Collector` type.
ErrorCollectors are used by some operations to collect, aggregate, and distribute errors from operations to the caller.
func WorkerGroupConfWorkerPerCPU ¶ added in v0.10.0
func WorkerGroupConfWorkerPerCPU() OptionProvider[*WorkerGroupConf]
WorkerGroupConfWorkerPerCPU sets the number of workers to the number of detected CPUs by the runtime (e.g. runtime.NumCPU()).
func (OptionProvider[T]) Apply ¶ added in v0.10.0
func (op OptionProvider[T]) Apply(in T) error
Apply applies the current Operation Provider to the configuration, and if the type T implements a Validate() method, calls that. All errors are aggregated.
func (OptionProvider[T]) Build ¶ added in v0.10.3
func (op OptionProvider[T]) Build(conf T) (out T, err error)
Build processes a configuration object, returning a modified version (or a zero value, in the case of an error).
func (OptionProvider[T]) Join ¶ added in v0.10.0
func (op OptionProvider[T]) Join(opps ...OptionProvider[T]) OptionProvider[T]
Join aggregates a collection of Option Providers into a single option provider. The amalgamated operation is panic safe and omits all nil providers.
type Processor ¶ added in v0.10.0
Processor are generic functions that take an argument (and a context) and return an error. They're the type of function used by the itertool.Process/itertool.ParallelForEach and useful in other situations as a compliment to fun.Worker and Operation.
In general the implementations of the methods for processing functions are wrappers around their similarly named fun.Worker analogues.
func MakeHandlerProcessor ¶ added in v0.10.7
MakeHandlerProcessor converts a handler-type function to a processor.
func MakeProcessor ¶ added in v0.10.2
MakeProcessor converts a function with the Processor signature (minus the context) for easy conversion.
func NewProcessor ¶ added in v0.10.7
NewProcessors returns a processor as a convenience function to avoid the extra cast when creating new function objects.
func ProcessifyHandler
deprecated
added in
v0.10.3
func ProcessorGroup ¶ added in v0.10.8
ProcessorGroup takes a collection of Processor functions and merges them into a single chain, eliding any nil processors.
func (Processor[T]) Add ¶ added in v0.10.3
Add begins running the process in a different goroutine, using the provided arguments to manage the operation.
func (Processor[T]) After ¶ added in v0.10.0
After produces a Processor that will execute after the provided timestamp.
func (Processor[T]) Background ¶ added in v0.10.3
Background processes an item in a separate goroutine and returns a worker that will block until the underlying operation is complete.
func (Processor[T]) Capture ¶ added in v0.10.6
Capture creates a handler function that like, Processor.Force, passes a background context and ignores the processors error.
func (Processor[T]) Check ¶ added in v0.10.0
Check processes the input and returns true when the error is nil, and false when there was an error.
func (Processor[T]) Delay ¶ added in v0.10.0
Delay wraps a Processor in a function that will always wait for the specified duration before running.
If the value is negative, then there is always zero delay.
func (Processor[T]) Filter ¶ added in v0.10.5
Filter returns a wrapping processor that takes a function a function that only calls the processor when the filter function returns true, and returns ers.ErrCurrentOpSkip otherwise.
func (Processor[T]) Force ¶ added in v0.10.0
func (pf Processor[T]) Force(in T)
Force processes the input, but discards the error and uses a context that will not expire.
func (Processor[T]) Handler ¶ added in v0.10.2
Handler converts a processor into an observer, handling the error with the error observer and using the provided context.
func (Processor[T]) If ¶ added in v0.10.0
If runs the processor function if, and only if the condition is true. Otherwise the function does not run and the processor returns nil.
The resulting processor can be used more than once.
func (Processor[T]) Ignore ¶ added in v0.10.0
Ignore runs the process function and discards the error.
func (Processor[T]) Iterator ¶ added in v0.10.0
Iterator creates a worker function that processes the iterator with the processor function, merging/collecting all errors, and respecting the worker's context. The processing does not begin until the worker is called.
func (Processor[T]) Jitter ¶ added in v0.10.0
Jitter wraps a Processor that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the processor.
If the function produces a negative duration, there is no delay.
func (Processor[T]) Join ¶ added in v0.10.0
Join combines a sequence of processors on the same input, calling each function in order, as long as there is no error and the context does not expire. Context expiration errors are not propagated.
func (Processor[T]) Limit ¶ added in v0.10.0
Limit ensures that the processor is called at most n times.
func (Processor[T]) Lock ¶ added in v0.10.0
Lock wraps the Processor and protects its execution with a mutex.
func (Processor[T]) Once ¶ added in v0.10.0
Once make a processor that can only run once. Subsequent calls to the processor return the cached error of the original run.
func (Processor[T]) Operation ¶ added in v0.10.0
Operation converts a processor into a worker that will process the input provided when executed.
func (Processor[T]) Parallel ¶ added in v0.10.4
Parallel takes a variadic number of items and returns a worker that processes them concurrently. All panics are converted to errors and all errors are aggregated.
func (Processor[T]) PostHook ¶ added in v0.10.0
PostHook produces an amalgamated processor that runs after the processor completes. Panics are caught, converted to errors, and aggregated with the processors error. The hook operation is unconditionally called after the processor function (except in the case of a processor panic.)
func (Processor[T]) PreHook ¶ added in v0.10.0
PreHook creates an amalgamated Processor that runs the operation before the root processor. If the operation panics that panic is converted to an error and merged with the processor's error. Use with Operation.Once() to create an "init" function that runs once before a processor is called the first time.
func (Processor[T]) ReadAll ¶ added in v0.10.0
ReadAll reads elements from the producer until an error is encountered and passes them to a producer, until the first error is encountered. The work
func (Processor[T]) ReadOne ¶ added in v0.10.0
ReadOne returns a future (Worker) that calls the processor function on the output of the provided producer function. ReadOne uses the fun.Pipe() operation for the underlying implementation.
func (Processor[T]) Retry ¶ added in v0.10.4
Retry makes a worker function that takes runs the processor function with the provied input until the return value is nil, or it encounters a terminating error (io.EOF, ers.ErrAbortCurrentOp, or context cancellation.)
Context cancellation errors are returned to the caller, other terminating errors are not, with any other errors encountered during retries. ErrIteratorSkip is always ignored and not aggregated. All errors are discarded if the retry operation succeeds in the provided number of retries.
Except for ErrIteratorSkip, which is ignored, all other errors are aggregated and returned to the caller only if the retry fails. It's possible to return a nil error without successfully completing the underlying operation, if the processor only returned ErrIteratorSkip values.
func (Processor[T]) Run ¶ added in v0.10.0
Run executes the Processors but creates a context within the function (decended from the context provided in the arguments,) that is canceled when Run() returns to avoid leaking well behaved resources outside of the scope of the function execution. Run can also be passed as a Processor func.
func (Processor[T]) TTL ¶ added in v0.10.0
TTL returns a Processor that runs once in the specified window, and returns the error from the last run in between this interval. While the executions of the underlying function happen in isolation, in between, the processor is concurrently accessible.
func (Processor[T]) Wait ¶ added in v0.10.6
Wait runs the Processor with a context that will never be canceled.
func (Processor[T]) When ¶ added in v0.10.0
When returns a processor function that runs if the conditional function returns true, and does not run otherwise. The conditional function is evaluated every time the returned processor is run.
func (Processor[T]) WithCancel ¶ added in v0.10.0
func (pf Processor[T]) WithCancel() (Processor[T], context.CancelFunc)
WithCancel creates a Processor and a cancel function which will terminate the context that the root proccessor is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Processor is canceled.)
func (Processor[T]) WithErrorCheck ¶ added in v0.10.6
WithErrorCheck takes an error future, and checks it before executing the processor operation. If the error future returns an error (any error), the processor propagates that error, rather than running the underying processor. Useful for injecting an abort into an existing pipleine or chain.
The error future is called before running the underlying processor, to short circuit the operation, and also a second time when processor has returned in case an error has occurred during the operation of the processor.
func (Processor[T]) WithErrorFilter ¶ added in v0.10.5
WithErrorFilter uses an ers.Filter to process the error respose from the processor.
func (Processor[T]) WithLock ¶ added in v0.10.0
WithLock wraps the Processor and ensures that the mutex is always held while the root Processor is called.
func (Processor[T]) WithRecover ¶ added in v0.10.5
WithRecover runs the producer, converted all panics into errors. WithRecover is itself a processor.
func (Processor[T]) WithoutErrors ¶ added in v0.10.0
WithoutErrors returns a producer that will convert a non-nil error of the provided types to a nil error.
type Producer ¶ added in v0.10.0
Producer is a function type that is a failrly common constructor. It's signature is used to create iterators, as a generator, and functions like a Future.
func CheckProducer ¶ added in v0.10.4
CheckProducer wraps a function object that uses the second ("OK") value to indicate that no more values will be produced. Errors returned from the resulting produce are always either the context cancellation error or io.EOF.
func ConsistentProducer ¶ added in v0.10.0
ConsistentProducer constructs a wrapper around a similar function type that does not return an error or take a context. The resulting function will never error.
func MakeProducer ¶ added in v0.10.3
MakeProducer constructs a producer that wraps a similar function that does not take a context.
func NewProducer ¶ added in v0.10.7
NewProducer returns a producer as a convenience function to avoid the extra cast when creating new function objects.
func StaticProducer ¶ added in v0.10.0
StaticProducer returns a producer function that always returns the provided values.
func ValueProducer ¶ added in v0.10.0
ValueProducer returns a producer function that always returns the provided value, and a nill error.
func (Producer[T]) After ¶ added in v0.10.0
After will return a Producer that will block until the provided time is in the past, and then execute normally.
func (Producer[T]) Background ¶ added in v0.10.0
Background constructs a worker that runs the provided Producer in a background thread and passes the produced value to the observe.
The worker function's return value captures the procuder's error, and will block until the producer has completed.
func (Producer[T]) Check ¶ added in v0.10.0
Check converts the error into a boolean, with true indicating success and false indicating (but not propagating it.).
func (Producer[T]) CheckForce ¶ added in v0.10.3
func (Producer[T]) Delay ¶ added in v0.10.0
Delay wraps a Producer in a function that will always wait for the specified duration before running.
If the value is negative, then there is always zero delay.
func (Producer[T]) Filter ¶ added in v0.10.5
Filter creates a function that passes the output of the producer to the filter function, which, if it returns true. is returned to the caller, otherwise the Producer returns the zero value of type T and ers.ErrCurrentOpSkip error (e.g. continue), which iterators and other producer-consuming functions can respect.
func (Producer[T]) Force ¶ added in v0.10.0
Force combines the semantics of Must and Wait as a future: when the future is resolved, the producer executes with a context that never expires and panics in the case of an error.
func (Producer[T]) Future ¶ added in v0.10.0
Future creates a future function using the context provided and error observer to collect the error.
func (Producer[T]) GenerateParallel ¶ added in v0.10.0
func (pf Producer[T]) GenerateParallel( optp ...OptionProvider[*WorkerGroupConf], ) *Iterator[T]
ParallelGenerate creates an iterator using a generator pattern which produces items until the generator function returns io.EOF, or the context (passed to the first call to Next()) is canceled. Parallel operation, continue on error/continue-on-panic semantics are available and share configuration with the ParallelProcess and Map operations.
func (Producer[T]) If ¶ added in v0.10.0
If returns a producer that will execute the root producer only if the cond value is true. Otherwise, If will return the zero value for T and a nil error.
func (Producer[T]) Ignore ¶ added in v0.10.0
Ignore creates a future that runs the producer and returns the value, ignoring the error.
func (Producer[T]) Iterator ¶ added in v0.10.0
Iterator creates an iterator that calls the Producer function once for every iteration, until it errors. Errors that are not context cancellation errors or io.EOF are propgated to the iterators Close method.
func (Producer[T]) IteratorWithErrorCollector ¶ added in v0.10.5
func (Producer[T]) IteratorWithHook ¶ added in v0.10.0
IteratorWithHook constructs an Iterator from the producer. The provided hook function will run during the Iterators Close() method.
func (Producer[T]) Jitter ¶ added in v0.10.0
Jitter wraps a Producer that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the Producer.
If the function produces a negative duration, there is no delay.
func (Producer[T]) Join ¶ added in v0.10.0
Join, on successive calls, runs the first producer until it returns an io.EOF error, and then returns the results of the second producer. If either producer returns another error (context cancelation or otherwise,) those errors are returned.
When the second function returns io.EOF, all successive calls will return io.EOF.
func (Producer[T]) Launch ¶ added in v0.10.3
Launch runs the producer in the background, and returns a producer that and returns a producer which, when called, blocks until the original producer returns.
func (Producer[T]) Limit ¶ added in v0.10.0
Limit runs the producer a specified number of times, and caches the result of the last execution and returns that value for any subsequent executions.
func (Producer[T]) Lock ¶ added in v0.10.0
Lock creates a producer that runs the root mutex as per normal, but under the protection of a mutex so that there's only one execution of the producer at a time.
func (Producer[T]) Must ¶ added in v0.10.0
Must returns a future that resolves the producer returning the constructed value and panicing if the producer errors.
func (Producer[T]) Once ¶ added in v0.10.0
Once returns a producer that only executes ones, and caches the return values, so that subsequent calls to the output producer will return the same values.
func (Producer[T]) Operation ¶ added in v0.10.0
Operation produces a wait function, using two observers to handle the output of the Producer.
func (Producer[T]) PostHook ¶ added in v0.10.0
PostHook appends a function to the execution of the producer. If the function panics it is converted to an error and aggregated with the error of the producer.
Useful for calling context.CancelFunc, closers, or incrementing counters as necessary.
func (Producer[T]) PreHook ¶ added in v0.10.0
PreHook configures an operation function to run before the returned producer. If the pre-hook panics, it is converted to an error which is aggregated with the (potential) error from the producer, and returned with the producer's output.
func (Producer[T]) Retry ¶ added in v0.10.4
Retry constructs a worker function that takes runs the underlying producer until the error value is nil, or it encounters a terminating error (io.EOF, ers.ErrAbortCurrentOp, or context cancellation.) In all cases, unless the error value is nil (e.g. the retry succeeds)
Context cancellation errors are returned to the caller, other terminating errors are not, with any other errors encountered during retries. ErrIteratorSkip is always ignored and not aggregated. All errors are discarded if the retry operation succeeds in the provided number of retries.
Except for ErrIteratorSkip, which is ignored, all other errors are aggregated and returned to the caller only if the retry fails. It's possible to return a nil error and a zero value, if the producer only returned ErrIteratorSkip values.
func (Producer[T]) SendAll ¶ added in v0.10.0
SendAll provides a form of iteration, by construction a future (Worker) that consumes the values of the producer with the processor until either function returns an error. SendAll respects ErrIteratorSkip and io.EOF
func (Producer[T]) SendOne ¶ added in v0.10.0
SendOne makes a Worker function that, as a future, calls the producer once and then passes the output, if there are no errors, to the processor function. Provides the inverse operation of Processor.ReadOne.
func (Producer[T]) TTL ¶ added in v0.10.0
TTL runs the producer only one time per specified interval. The interval must me greater than 0.
func (Producer[T]) Wait ¶ added in v0.10.6
Wait runs the producer with a context that will ever expire.
func (Producer[T]) When ¶ added in v0.10.0
When constructs a producer that will call the cond upon every execution, and when true, will run and return the results of the root producer. Otherwise When will return the zero value of T and a nil error.
func (Producer[T]) WithCancel ¶ added in v0.10.0
func (pf Producer[T]) WithCancel() (Producer[T], context.CancelFunc)
WithCancel creates a Producer and a cancel function which will terminate the context that the root Producer is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Producer is canceled.)
func (Producer[T]) WithErrorCheck ¶ added in v0.10.6
WithErrorCheck takes an error future, and checks it before executing the producer function. If the error future returns an error (any error), the producer propagates that error, rather than running the underying producer. Useful for injecting an abort into an existing pipleine or chain.
func (Producer[T]) WithErrorFilter ¶ added in v0.10.5
WithErrorFilter passes the error of the root Producer function with the ers.Filter.
func (Producer[T]) WithLock ¶ added in v0.10.0
WithLock uses the provided mutex to protect the execution of the producer.
func (Producer[T]) WithRecover ¶ added in v0.10.5
WithRecover returns a wrapped producer with a panic handler that converts any panic to an error.
func (Producer[T]) WithoutErrors ¶ added in v0.10.0
WithoutErrors returns a Producer function that wraps the root producer and, after running the root producer, and makes the error value of the producer nil if the error returned is in the error list. The produced value in these cases is almost always the zero value for the type.
type RuntimeInvariant ¶ added in v0.10.0
type RuntimeInvariant struct{}
RuntimeInvariant is a type defined to create a namespace, callable (typically) via the Invariant symbol. Access these functions as in:
fun.Invariant.IsTrue(len(slice) > 0, "slice must have elements", len(slice))
func (RuntimeInvariant) Failure ¶ added in v0.10.1
func (RuntimeInvariant) Failure(args ...any)
Failure unconditionally raises an invariant failure error and processes the arguments as with the other invariant failures: extracting errors and aggregating constituent errors.
func (RuntimeInvariant) IsFalse ¶ added in v0.10.0
func (RuntimeInvariant) IsFalse(cond bool, args ...any)
IsFalse provides a runtime assertion that the condition is false, and annotates panic object, which is an error rooted in the ErrInvariantViolation. In all other cases the operation is a noop.
func (RuntimeInvariant) IsTrue ¶ added in v0.10.0
func (RuntimeInvariant) IsTrue(cond bool, args ...any)
IsTrue provides a runtime assertion that the condition is true, and annotates panic object, which is an error rooted in the ErrInvariantViolation. In all other cases the operation is a noop.
func (RuntimeInvariant) Must ¶ added in v0.10.0
func (RuntimeInvariant) Must(err error, args ...any)
Must raises an invariant error if the error is not nil. The content of the panic is both--via wrapping--an ErrInvariantViolation and the error itself.
func (RuntimeInvariant) Ok ¶ added in v0.10.9
func (RuntimeInvariant) Ok(cond bool, args ...any)
Ok panics if the condition is false, passing an error that is rooted in InvariantViolation. Otherwise the operation is a noop.
type Transform ¶ added in v0.8.5
Transform is a function type that converts T objects int objects of type O.
func Converter ¶ added in v0.10.0
Converter builds a Transform function out of an equivalent function that doesn't take a context or return an error.
func ConverterErr ¶ added in v0.10.0
ConverterErr constructs a Transform function from an analogous function that does not take a context.
func ConverterOk ¶ added in v0.10.9
ConverterOk builds a Transform function from a function that converts between types T and O, but that returns a boolean/check value. When the converter function returns false the transform function returns a ErrIteratorSkip error.
func (Transform[T, O]) CheckWait ¶ added in v0.10.4
CheckWait calls the function with a context that cannot be canceled. The second value is true as long as the transform function returns a nil error and false in all other cases
func (Transform[T, O]) Convert ¶ added in v0.10.0
Convert returns a Producer function which will translate the input value. The execution is lazy, to provide a future-like interface.
func (Transform[T, O]) ConvertFuture ¶ added in v0.10.5
Convert returns a Producer function which will translate value of the input future as the input value of the translation operation. The execution is lazy, to provide a future-like interface and neither the resolution of the future or the transformation itself is done until the Producer is executed.
func (Transform[T, O]) ConvertProducer ¶ added in v0.10.5
ConvertProducer takes an input-typed producer function and converts it to an output-typed producer function.
func (Transform[T, O]) Lock ¶ added in v0.10.0
Lock returns a Transform function that's executed the root function inside of the sope of a mutex.
func (Transform[T, O]) Pipe ¶ added in v0.10.0
Pipe creates a Processor (input)/ Producer (output) pair that has data processed by the Transform function. The pipe has a buffer of one item and is never closed, and both input and output operations are blocking. The closer function will abort the connection and cause all successive operations to return io.EOF.
func (Transform[T, O]) Process ¶ added in v0.10.5
Process takes an iterator and runs the transformer over every item, producing a new iterator with the output values. The processing is performed serially and lazily and respects ErrIteratorSkip.
func (Transform[T, O]) ProcessParallel ¶ added in v0.10.0
func (mpf Transform[T, O]) ProcessParallel( iter *Iterator[T], optp ...OptionProvider[*WorkerGroupConf], ) *Iterator[O]
ProcessParallel runs the input iterator through the transform operation and produces an output iterator, much like convert. However, the ProcessParallel implementation has configurable parallelism, and error handling with the WorkerGroupConf options.
func (Transform[T, O]) ProcessPipe ¶ added in v0.10.5
ProcessPipe collects a Produer and Processor pair, and returns a worker that, when run processes the input collected by the Processorand returns it to the Producer. This operation runs until the producer, transformer, or processor returns an error. ErrIteratorSkip errors are respected, while io.EOF errors cause the ProcessPipe to abort but are not propogated to the caller.
func (Transform[T, O]) Producer ¶ added in v0.10.0
Producer processes an input producer function with the Transform function. Each call to the output producer returns one value from the input producer after processing the item with the transform function applied. The output producer returns any error encountered during these operations (input, transform, output) to its caller *except* ErrIteratorSkip, which is respected.
func (Transform[T, O]) Run ¶ added in v0.10.5
Run executes the transform function with the provided output.
func (Transform[T, O]) Wait ¶ added in v0.10.4
Wait calls the transform function passing a context that cannot expire.
func (Transform[T, O]) WithLock ¶ added in v0.10.0
WithLock returns a Transform function inside of the scope of the provided mutex.
func (Transform[T, O]) WithRecover ¶ added in v0.10.4
WithRecover returns a Transform function that catches a panic, converts the panic object to an error if needed, and aggregates that with the Transform function's error.
type WaitGroup ¶ added in v0.6.3
type WaitGroup struct {
// contains filtered or unexported fields
}
WaitGroup works like sync.WaitGroup, except that the Wait method takes a context (and can be passed as a fun.Operation). The implementation is exceptionally simple. The only constraint, like sync.WaitGroup, is that you can never modify the value of the internal counter such that it is negative, event transiently. The implementation does not require background resources aside from Wait, which creates a single goroutine that lives for the entire time that Wait is running, but no other background resources are created. Multiple copies of Wait can be safely called at once, and the WaitGroup is reusable more than once.
This implementation is about 50% slower than sync.WaitGroup after informal testing. It provides a little extra flexiblity and introspection, with similar semantics, that may be worth the additional performance hit.
func (*WaitGroup) Add ¶ added in v0.8.0
Add modifies the internal counter. Raises an ErrInvariantViolation error if any modification causes the internal coutner to be less than 0.
func (*WaitGroup) DoTimes ¶ added in v0.10.0
DoTimes uses the WaitGroup to launch an operation in a worker pool of the specified size, and does not block until the operation returns.
func (*WaitGroup) Done ¶ added in v0.8.0
func (wg *WaitGroup) Done()
Done marks a single operation as done.
func (*WaitGroup) Inc ¶ added in v0.10.8
func (wg *WaitGroup) Inc()
Inc adds one item to the wait group.
func (*WaitGroup) IsDone ¶ added in v0.8.0
IsDone returns true if there is pending work, and false otherwise.
func (*WaitGroup) Launch ¶ added in v0.10.0
Launch increments the WaitGroup and starts the operation in a go routine.
func (*WaitGroup) Operation ¶ added in v0.10.0
Operation returns with WaitGroups Wait method as a Operation.
func (*WaitGroup) Wait ¶ added in v0.8.0
Wait blocks until either the context is canceled or all items have completed.
Wait is pasable or usable as a fun.Operation.
In many cases, callers should not rely on the Wait operation returning after the context expires: If Done() calls are used in situations that respect a context cancellation, aborting the Wait on a context cancellation, particularly when Wait gets a context that has the same lifecycle as the operations its waiting on, the result is that worker routines will leak. Nevertheless, in some situations, when workers may take a long time to respond to a context cancellation, being able to set a second deadline on Waiting may be useful.
Consider using `fun.Operation(wg.Wait).Block()` if you want blocking semantics with the other features of this WaitGroup implementation.
type Worker ¶ added in v0.10.0
Worker represents a basic function used in worker pools and other similar situations
func MakeWorker ¶ added in v0.10.4
MakeWorker converts a non-context worker function into a worker for compatibility with tooling.
func Pipe ¶ added in v0.10.0
Pipe sends the output of a producer into the processor as if it were a pipe. As a Worker this operation is delayed until the worker is called.
If both operations succeed, then the worker will return nil. If either function returns an error, it is cached, and successive calls to the worker will return the same error. The only limitation is that there is no way to distinguish between an error encountered by the Producer and one by the processor.
If the producer returns ErrIteratorSkip, it will be called again.
func WorkerFuture ¶ added in v0.10.0
WorkerFuture constructs a worker from an error channel. The resulting worker blocks until an error is produced in the error channel, the error channel is closed, or the worker's context is canceled. If the channel is closed, the worker will return a nil error, and if the context is canceled, the worker will return a context error. In all other cases the work will propagate the error (or nil) received from the channel.
You can call the resulting worker function more than once: if there are multiple errors produced or passed to the channel, they will be propogated; however, after the channel is closed subsequent calls to the worker function will return nil.
func (Worker) After ¶ added in v0.10.0
After returns a Worker that blocks until the timestamp provided is in the past. Additional calls to this worker will run immediately. If the timestamp is in the past the resulting worker will run immediately.
func (Worker) Background ¶ added in v0.10.0
Background starts the worker function in a go routine, passing the error to the provided observer function.
func (Worker) Check ¶ added in v0.10.0
Check runs the worker and returns true (ok) if there was no error, and false otherwise.
func (Worker) Delay ¶ added in v0.10.0
Delay wraps a Worker in a function that will always wait for the specified duration before running.
If the value is negative, then there is always zero delay.
func (Worker) Group ¶ added in v0.10.3
Group makes a worker that runs n copies of the underlying worker, in different go routines and aggregates their output. Work does not start until the resulting worker is called
func (Worker) If ¶ added in v0.10.0
If returns a Worker function that runs only if the condition is true. The error is always nil if the condition is false. If-ed functions may be called more than once, and will run multiple times potentiall.y
func (Worker) Ignore ¶ added in v0.10.0
Ignore converts the worker into a Operation that discards the error produced by the worker.
func (Worker) Interval ¶ added in v0.10.3
Interval runs the worker with a timer that resets to the provided duration. The worker runs immediately, and then the time is reset to the specified interval after the base worker has. Which is to say that the runtime of the worker's operation is effectively added to the interval.
The interval worker will run until the context is canceled or the worker returns an error.
func (Worker) Jitter ¶ added in v0.10.0
Jitter wraps a Worker that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the Worker.
If the function produces a negative duration, there is no delay.
func (Worker) Join ¶ added in v0.10.0
Join combines a sequence of workers, calling the workers in order, as long as there is no error and the context does not expire. Context expiration errors are not propagated.
func (Worker) Launch ¶ added in v0.10.3
Launch runs the worker function in a go routine and returns a new fun.Worker which will block for the context to expire or the background worker to completes, which returns the error from the background request.
The underlying worker begins executing before future returns.
func (Worker) Limit ¶ added in v0.10.0
Limit produces a worker than runs exactly n times. Each execution is isolated from every other, but once the limit is exceeded, the result of the *last* worker to execute is cached concurrent access to that value is possible.
func (Worker) Lock ¶ added in v0.10.0
Lock produces a Worker that will be executed within the scope of a (managed) mutex.
func (Worker) Must ¶ added in v0.10.0
Must converts a Worker function into a wait function; however, if the worker produces an error Must converts the error into a panic.
func (Worker) Observe ¶ added in v0.10.0
Observe runs the worker function, and observes the error (or nil response). Panics are not caught.
func (Worker) Once ¶ added in v0.10.0
Once wraps the Worker in a function that will execute only once. The return value (error) is cached, and can be accessed many times without re-running the worker.
func (Worker) Operation ¶ added in v0.10.0
Operation converts a worker function into a wait function, passing any error to the observer function. Only non-nil errors are observed.
func (Worker) PostHook ¶ added in v0.10.0
PostHook runs hook operation after the worker function completes. If the hook panics it is converted to an error and aggregated with workers's error.
func (Worker) PreHook ¶ added in v0.10.0
PreHook returns a Worker that runs an operatio unconditionally before running the underlying worker. If the hook function panics it is converted to an error and aggregated with the worker's error.
func (Worker) Retry ¶ added in v0.10.4
Retry constructs a worker function that takes runs the underlying worker until the return value is nil, or it encounters a terminating error (io.EOF, ers.ErrAbortCurrentOp, or context cancellation.) Context cancellation errors are returned to the caller with any other errors encountered in previous retries, other terminating errors are not. All errors are discarded if the retry operation succeeds in the provided number of retries.
Except for ErrIteratorSkip, which is ignored, all other errors are aggregated and returned to the caller only if the retry fails.
func (Worker) Signal ¶ added in v0.10.0
Signal runs the worker function in a background goroutine and returns the error in an error channel, that returns when the worker function returns. If Signal is called with a canceled context the worker is still executed (with that context.)
A value, possibly nil, is always sent through the channel. Panics are not caught or handled.
func (Worker) StartGroup ¶ added in v0.10.0
StartGroup starts n copies of the worker operation and returns a future/worker that returns the aggregated errors from all workers
The operation is fundamentally continue-on-error. To get abort-on-error semantics, use the Filter() method on the input worker, that cancels the context on when it sees an error.
func (Worker) TTL ¶ added in v0.10.0
TTL produces a worker that will only run once during every specified duration, when called more than once. During the interval between calls, the previous error is returned. While each execution of the root worker is protected by a mutex, the resulting worker can be used in parallel during the intervals between calls.
func (Worker) Wait ¶ added in v0.10.3
Wait runs the worker with a background context and returns its error.
func (Worker) When ¶ added in v0.10.0
When wraps a Worker function that will only run if the condition function returns true. If the condition is false the worker does not execute. The condition function is called in between every operation.
When worker functions may be called more than once, and will run multiple times potentially.
func (Worker) While ¶ added in v0.10.0
While runs the Worker in a continuous while loop, returning only if the underlying worker returns an error or if the context is cancled.
func (Worker) WithCancel ¶ added in v0.10.0
func (wf Worker) WithCancel() (Worker, context.CancelFunc)
WithCancel creates a Worker and a cancel function which will terminate the context that the root Worker is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Worker is canceled.)
func (Worker) WithErrorCheck ¶ added in v0.10.6
WithErrorCheck takes an error future, and checks it before executing the worker function. If the error future returns an error (any error), the worker propagates that error, rather than running the underying producer. Useful for injecting an abort into an existing pipleine or chain.
The error future is called before running the underlying worker, to short circuit the operation, and also a second time when worker has returned in case an error has occurred during the operation of the worker.
func (Worker) WithErrorFilter ¶ added in v0.10.5
Filter wraps the worker with a Worker that passes the output of the root Worker's error and returns the output of the filter.
The ers package provides a number of filter implementations but any function in the following form works:
func(error) error
func (Worker) WithLock ¶ added in v0.10.0
Lock produces a Worker that will be executed within the scope of the provided mutex.
func (Worker) WithRecover ¶ added in v0.10.4
WithRecover produces a worker function that converts the worker function's panics to errors.
func (Worker) WithoutErrors ¶ added in v0.10.0
WithoutErrors returns a worker that will return nil if the error returned by the worker is one of the errors passed to WithoutErrors.
type WorkerGroupConf ¶ added in v0.10.0
type WorkerGroupConf struct { // NumWorkers describes the number of parallel workers // processing the incoming iterator items and running the map // function. All values less than 1 are converted to 1. Any // value greater than 1 will result in out-of-sequence results // in the output iterator. NumWorkers int // ContinueOnPanic prevents the operations from halting when a // single processing function panics. In all modes mode panics // are converted to errors and propagated to the output // iterator's Close() method,. ContinueOnPanic bool // ContinueOnError allows a processing function to return an // error and allow the work of the broader operation to // continue. Errors are aggregated propagated to the output // iterator's Close() method. ContinueOnError bool // IncludeContextExpirationErrors changes the default handling // of context cancellation errors. By default all errors // rooted in context cancellation are not propagated to the // Close() method, however, when true, these errors are // captured. All other error handling semantics // (e.g. ContinueOnError) are applicable. IncludeContextExpirationErrors bool // ExcludedErrors is a list of that should not be included // in the collected errors of the // output. fun.ErrRecoveredPanic is always included and io.EOF // is never included. ExcludedErrors []error // ErrorHandler is used to collect and aggregate errors in // the collector. For operations with shorter runtime // `erc.Collector.Add` is a good choice, though different // strategies may make sense in different // cases. (erc.Collector has a mutex and stories collected // errors in memory.) ErrorHandler Handler[error] // ErrorResolver should return an aggregated error collected // during the execution of worker // threads. `erc.Collector.Resolve` suffices when collecting // with an erc.Collector. ErrorResolver func() error }
WorkerGroupConf describes the runtime options to several operations operations. The zero value of this struct provides a usable strict operation.
func (WorkerGroupConf) CanContinueOnError ¶ added in v0.10.0
func (o WorkerGroupConf) CanContinueOnError(err error) bool
CanContinueOnError checks an error, collecting it as needed using the WorkerGroupConf, and then returning true if processing should continue and false otherwise.
Neither io.EOF nor EerrIteratorSkip errors are ever observed. All panic errors are observed. Context cancellation errors are observed only when configured. as well as context cancellation errors when configured.
func (*WorkerGroupConf) Validate ¶ added in v0.10.0
func (o *WorkerGroupConf) Validate() error
Validate ensures that the configuration is valid, and returns an error if there are impossible configurations
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package adt provides "atomic data types" as strongly-typed generic helpers for simple atomic operations (including sync.Map, sync.Pool, and a typed value).
|
Package adt provides "atomic data types" as strongly-typed generic helpers for simple atomic operations (including sync.Map, sync.Pool, and a typed value). |
Package assert provides an incredibly simple assertion framework, that relies on generics and simplicity.
|
Package assert provides an incredibly simple assertion framework, that relies on generics and simplicity. |
check
GENERATED FILE FROM ASSERTION PACKAGE
|
GENERATED FILE FROM ASSERTION PACKAGE |
package dt provides container type implementations and interfaces.
|
package dt provides container type implementations and interfaces. |
cmp
Package cmp provides comparators for sorting linked lists.
|
Package cmp provides comparators for sorting linked lists. |
hdrhist
Package hdrhistogram provides an implementation of Gil Tene's HDR Histogram data structure.
|
Package hdrhistogram provides an implementation of Gil Tene's HDR Histogram data structure. |
is
Package is contains a simple assertion library for the fun/ensure testing framework.
|
Package is contains a simple assertion library for the fun/ensure testing framework. |
Package erc provides a simple/fast error aggregation tool for collecting and aggregating errors.
|
Package erc provides a simple/fast error aggregation tool for collecting and aggregating errors. |
Package ers provides some very basic error aggregating and handling tools, as a companion to erc.
|
Package ers provides some very basic error aggregating and handling tools, as a companion to erc. |
Package intish provides a collection of strongly type integer arithmetic operations, to make it possible to avoid floating point math for simple operations when desired.
|
Package intish provides a collection of strongly type integer arithmetic operations, to make it possible to avoid floating point math for simple operations when desired. |
Package itertool provides a set of functional helpers for managinging and using fun.Iterators, including a parallel processing, generators, Map/Reduce, Merge, and other convenient tools.
|
Package itertool provides a set of functional helpers for managinging and using fun.Iterators, including a parallel processing, generators, Map/Reduce, Merge, and other convenient tools. |
Package pubsub provides a message broker for one-to-many or many-to-many message distribution.
|
Package pubsub provides a message broker for one-to-many or many-to-many message distribution. |
Package risky contains a bunch of bad ideas for APIs and operations that will definitely lead to panics and deadlocks and incorrect behavior when used incorrectly.
|
Package risky contains a bunch of bad ideas for APIs and operations that will definitely lead to panics and deadlocks and incorrect behavior when used incorrectly. |
Package srv provides a framework and toolkit for service orchestration.
|
Package srv provides a framework and toolkit for service orchestration. |
Package testt (for test tools), provides a couple of useful helpers for common test patterns.
|
Package testt (for test tools), provides a couple of useful helpers for common test patterns. |