fun

package module
v0.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2023 License: Apache-2.0 Imports: 16 Imported by: 17

README

fun -- Go Generic Functions and Tools

Go Reference

fun is a simple, well tested, zero-dependency, collection of packages with generic function, tools, patterns, and the kind of thing you could write one-offs for but shouldn't.

Packages:

  • erc (error collection utilites.)
  • ers (error and panic handling utilites.)
  • dt (generic container datatypes, including ordered and unordered sets, singly and doubly linked list, as well as wrappers around maps and slices.)
  • ft function tools: simple tools for handling function objects.
  • adt (strongly typed atomic data structures, wrappers, tools, and operations.)
  • itertool (iterator tools.)
  • pubsub (message broker and concurrency-safe queue and deque.)
  • srv (service orchestration and management framework.)
  • assert (minimal generic-based assertion library, in the tradition of testify.) The assertions in assert abort the flow of the test while check, provide non-critical assertions.
  • testt (testy) are a collection of "nice to have" test helpers and utilities.
  • ensure An experimental test harness and orchestration tool.

For more information, see the documentation, but of general interest:

  • The root fun package contains a few generic function types and with a collection of methods for interacting and managing and manipulating these operations. The fun.Iterator provides a framework for interacting with sequences, including some powerful high-level parallel processing tools.
  • In itertools and with fun.Iterator, an iterator framework and tools for interacting with iterators and generators.
  • In srv, a service orchestration toolkit and lifecycle tools.
  • In ft a number of low-level function-manipulation tools.
  • In pubsub, a channel-based message broker (for one-to-many channel patterns), with several backend patterns for dealing with load-shedding and message distribution patterns.
  • Queue and Deque implementations (in pubsub) that provide thread-safe linked-list based implementations and Wait methods to block until new items added.
  • In dt a collection of data types and tools for manipulating different container types, as well as implementations of linked lists and sets. dt also provides an fun-idiomatic wrappers around generic slices and maps, which complement the tools in the fun package.
  • In adt, a collection of Atomic/Pool/Map operations that use generics to provide strongly typed interfaces for common operations.
  • In erc, an error collector implementation for threadsafe error aggregation and introspection, particularly in worker-pool, applications. ers provides related functionality.
  • fun includes a number of light weight testing tools:
    • assert and assert/check provide "testify-style" assertions with more simple output, leveraging generics.
    • testt context, logging, and timer helpers for use in tests.
    • ensure and ensure/is a somewhat experimental "chain"-centered API for assertions.

Contributions welcome, the general goals of the project:

  • superior API ergonomics.
  • great high-level abstractions.
  • obvious and clear implementations.
  • minimal dependencies.

Have fun!

Documentation

Overview

Package fun is a zero-dependency collection of tools and idoms that takes advantage of generics. Iterators, error handling, a native-feeling Set type, and a simple pub-sub framework for distributing messages in fan-out patterns.

Index

Constants

View Source
const ErrInvariantViolation ers.Error = ers.ErrInvariantViolation

ErrInvariantViolation is the root error of the error object that is the content of all panics produced by the Invariant helper.

View Source
const ErrIteratorSkip ers.Error = ers.Error("skip-iteration")

ErrIteratorSkip instructs consumers of Iterators and related processors that run groups. Equivalent to the "continue" keyword in other contexts.

View Source
const ErrNonBlockingChannelOperationSkipped ers.Error = ers.Error("non-blocking channel operation skipped")

ErrNonBlockingChannelOperationSkipped is returned when sending into a channel, in a non-blocking context, when the channel was full and the send or receive was therefore skipped.

View Source
const ErrRecoveredPanic ers.Error = ers.ErrRecoveredPanic

ErrRecoveredPanic is at the root of any error returned by a function in the fun package that recovers from a panic.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChanOp added in v0.10.0

type ChanOp[T any] struct {
	// contains filtered or unexported fields
}

ChanOp is a wrapper around a channel, to make it easier to write clear code that uses and handles basic operations with single channels. From a high level an operation might look like:

ch := make(chan string)
err := fun.Blocking().Send()

func Blocking added in v0.8.5

func Blocking[T any](ch chan T) ChanOp[T]

Blocking produces a blocking Send instance. All Send/Check/Ignore operations will block until the context is canceled, the channel is canceled, or the send succeeds.

func NonBlocking added in v0.8.5

func NonBlocking[T any](ch chan T) ChanOp[T]

NonBlocking produces a send instance that performs a non-blocking send.

The Send() method, for non-blocking sends, will return ErrSkipedNonBlockingSend if the channel was full and the object was not sent.

func (ChanOp[T]) Channel added in v0.10.0

func (op ChanOp[T]) Channel() chan T

Channel returns the underlying channel.

func (ChanOp[T]) Close added in v0.10.0

func (op ChanOp[T]) Close()

Close closes the underlying channel.

func (ChanOp[T]) Iterator added in v0.10.0

func (op ChanOp[T]) Iterator() *Iterator[T]

Iterator returns the "receive" aspect of the channel as an iterator. This is equivalent to fun.ChannelIterator(), but may be more accessible in some contexts.

func (ChanOp[T]) Pipe added in v0.10.0

func (op ChanOp[T]) Pipe() (Processor[T], Producer[T])

Pipe creates a linked pair of functions for transmitting data via these interfaces.

func (ChanOp[T]) Processor added in v0.10.0

func (op ChanOp[T]) Processor() Processor[T]

Processor exposes the "send" aspect of the channel as a Processor function.

func (ChanOp[T]) Producer added in v0.10.0

func (op ChanOp[T]) Producer() Producer[T]

Producer expoess the "receive" aspect of the channel as a Producer function.

func (ChanOp[T]) Receive added in v0.10.0

func (op ChanOp[T]) Receive() ChanReceive[T]

Receive returns a ChanReceive object that acts on the same underlying sender.

func (ChanOp[T]) Send added in v0.10.0

func (op ChanOp[T]) Send() ChanSend[T]

Send returns a ChanSend object that acts on the same underlying channel

type ChanReceive added in v0.10.0

type ChanReceive[T any] struct {
	// contains filtered or unexported fields
}

ChanReceive, wraps a channel fore <-chan T operations. It is the type returned by the ChanReceive() method on ChannelOp. The primary method is Read(), with other methods provided as "self-documenting" helpers.

func BlockingReceive added in v0.10.0

func BlockingReceive[T any](ch <-chan T) ChanReceive[T]

BlockingReceive is the equivalent of Blocking(ch).Receive(), except that it accepts a receive-only channel.

func NonBlockingReceive added in v0.10.0

func NonBlockingReceive[T any](ch <-chan T) ChanReceive[T]

NonBlockingReceive is the equivalent of NonBlocking(ch).Receive(), except that it accepts a receive-only channel.

func (ChanReceive[T]) Check added in v0.10.0

func (ro ChanReceive[T]) Check(ctx context.Context) (T, bool)

Check performs the read operation and converts the error into an "ok" value, returning true if receive was successful and false otherwise.

func (ChanReceive[T]) Consume added in v0.10.0

func (ro ChanReceive[T]) Consume(op Processor[T]) Worker

Consume returns a Worker function that processes the output of data from the channel with the Processor function. If the processor function returns ErrIteratorSkip, the processing will continue. All other Processor errors (and problems reading from the channel,) abort iterator. io.EOF errors are not propagated to the caller.

func (ChanReceive[T]) Drop added in v0.10.0

func (ro ChanReceive[T]) Drop(ctx context.Context) bool

Drop performs a read operation and drops the response. If an item was dropped (e.g. Read would return an error), Drop() returns false, and true when the Drop was successful.

func (ChanReceive[T]) Force added in v0.10.0

func (ro ChanReceive[T]) Force(ctx context.Context) (out T)

Force ignores the error returning only the value from Read. This is either the value sent through the channel, or the zero value for T. Because zero values can be sent through channels, Force does not provide a way to distinguish between "channel-closed" and "received a zero value".

func (ChanReceive[T]) Ignore added in v0.10.0

func (ro ChanReceive[T]) Ignore(ctx context.Context)

Ignore reads one item from the channel and discards it.

func (ChanReceive[T]) Iterator added in v0.10.0

func (ro ChanReceive[T]) Iterator() *Iterator[T]

Iterator expoeses aspects to the contents of the channel as an iterator.

func (ChanReceive[T]) Ok added in v0.10.0

func (ro ChanReceive[T]) Ok() bool

Ok attempts to read from a channel returns true either when the channel is blocked or an item is read from the channel and false when the channel has been closed.

func (ChanReceive[T]) Producer added in v0.10.0

func (ro ChanReceive[T]) Producer() Producer[T]

Producer returns the Read method as a producer for integration into existing tools.

func (ChanReceive[T]) Read added in v0.10.0

func (ro ChanReceive[T]) Read(ctx context.Context) (T, error)

Read performs the read operation according to the blocking/non-blocking semantics of the receive operation.

In general errors are either: io.EOF if channel is closed; a context cancellation error if the context passed to Read() is canceled, or ErrSkippedNonBlockingChannelOperation in the non-blocking case if the channel was empty.

In all cases when Read() returns an error, the return value is the zero value for T.

type ChanSend added in v0.10.0

type ChanSend[T any] struct {
	// contains filtered or unexported fields
}

ChanSend provides access to channel send operations, and is contstructed by the ChanSend() method on the channel operation. The primary method is Write(), with other methods provided for clarity.

func BlockingSend added in v0.10.0

func BlockingSend[T any](ch chan<- T) ChanSend[T]

BlockingSend is equivalent to Blocking(ch).Send() except that it accepts a send-only channel.

func NonBlockingSend added in v0.10.0

func NonBlockingSend[T any](ch chan<- T) ChanSend[T]

NonBlockingSend is equivalent to NonBlocking(ch).Send() except that it accepts a send-only channel.

func (ChanSend[T]) Check added in v0.10.0

func (sm ChanSend[T]) Check(ctx context.Context, it T) bool

Check performs a send and returns true when the send was successful and false otherwise.

func (ChanSend[T]) Consume added in v0.10.0

func (sm ChanSend[T]) Consume(iter *Iterator[T]) Worker

Consume returns a worker that, when executed, pushes the content from the iterator into the channel.

func (ChanSend[T]) Ignore added in v0.10.0

func (sm ChanSend[T]) Ignore(ctx context.Context, it T)

Ignore performs a send and omits the error.

func (ChanSend[T]) Processor added in v0.10.0

func (sm ChanSend[T]) Processor() Processor[T]

Processor returns the Write method as a processor for integration into existing tools

func (ChanSend[T]) Signal added in v0.10.0

func (sm ChanSend[T]) Signal(ctx context.Context)

Signal attempts to sends the Zero value of T through the channel and returns when: the send succeeds, the channel is full and this is a non-blocking send, the context is canceled, or the channel is closed.

func (ChanSend[T]) Write added in v0.10.0

func (sm ChanSend[T]) Write(ctx context.Context, it T) (err error)

Write sends the item into the channel captured by Blocking/NonBlocking returning the appropriate error.

The returned error is nil if the send was successful, and an io.EOF if the channel is closed (or nil) rather than a panic (as with the equivalent direct operation.) The error value is a context cancelation error when the context is canceled, and for non-blocking sends, if the channel did not accept the write, ErrSkippedNonBlockingChannelOperation is returned.

func (ChanSend[T]) Zero added in v0.10.0

func (sm ChanSend[T]) Zero(ctx context.Context) error

Zero sends the zero value of T through the channel.

type Future added in v0.10.0

type Future[T any] func() T

Future is a basic function for providing a fun-style function type for a function object that will produce an object of the specified type.

func AsFuture added in v0.10.0

func AsFuture[T any](in T) Future[T]

AsFuture wraps a value and returns a future object that, when called, will return the provided value.

func Futurize added in v0.10.0

func Futurize[T any](f func() T) Future[T]

Futureize is a simple wrapper to convert a function object to a Future[T] object.

func Translate added in v0.10.1

func Translate[T any, O any](in Future[T], tfn func(T) O) Future[O]

Translate converts a future from one type to another.

func (Future[T]) If added in v0.10.0

func (f Future[T]) If(cond bool) Future[T]

If produces a future that only runs when the condition value is true. If the condition is false, the future will return the zero value of T.

func (Future[T]) Ignore added in v0.10.0

func (f Future[T]) Ignore() func()

Ignore produces a function that will call the Future but discards the output.

func (Future[T]) Join added in v0.10.0

func (f Future[T]) Join(merge func(T, T) T, ops ...Future[T]) Future[T]

Join iteratively merges a collection of future operations.

func (Future[T]) Limit added in v0.10.1

func (f Future[T]) Limit(in int) Future[T]

func (Future[T]) Lock added in v0.10.0

func (f Future[T]) Lock() Future[T]

Locked returns a wrapped future that ensure that all calls to the future are protected by a mutex.

func (Future[T]) Not added in v0.10.0

func (f Future[T]) Not(cond bool) Future[T]

Not produces that only runs when the condition value is false. If the condition is true, the future will return the zero value of T.

func (Future[T]) Once added in v0.10.0

func (f Future[T]) Once() Future[T]

Once returns a future that will only run the underlying future exactly once.

func (Future[T]) PostHook added in v0.10.0

func (f Future[T]) PostHook(fn func()) Future[T]

PostHook unconditionally runs the provided function after running the future. The hook runs in a defer statement.

func (Future[T]) PreHook added in v0.10.0

func (f Future[T]) PreHook(fn func()) Future[T]

PreHook unconditionally runs the provided function before running and returning the function.

func (Future[T]) Producer added in v0.10.0

func (f Future[T]) Producer() Producer[T]

Producer returns a producer function that wraps the future.

func (Future[T]) Reduce added in v0.10.0

func (f Future[T]) Reduce(merge func(T, T) T, next Future[T]) Future[T]

Reduce takes the input future, the next future, and merges the results using the merge function.

func (Future[T]) Run added in v0.10.0

func (f Future[T]) Run() T

Run executes the future.

func (Future[T]) Slice added in v0.10.0

func (f Future[T]) Slice() func() []T

Slice returns a future-like function that wraps the output of the future as the first element in a slice.

func (Future[T]) TTL added in v0.10.1

func (f Future[T]) TTL(dur time.Duration) Future[T]

func (Future[T]) When added in v0.10.0

func (f Future[T]) When(c func() bool) Future[T]

When produces a new future wrapping the input future that executes when the condition function returns true, returning the zero value for T when the condition is false. The condition value is checked every time the future is called.

func (Future[T]) WithLock added in v0.10.0

func (f Future[T]) WithLock(m *sync.Mutex) Future[T]

WithLock return a future that is protected with the provided mutex.

type Handler added in v0.10.2

type Handler[T any] func(T)

Handler describes a function that operates on a single object, but returns no output, and is used primarily for side effects, particularly around handling errors or collecting metrics. The Handler implementation here makes it possible to provide panic-safety for these kinds of functions or easily convert to other related types.

func Handle added in v0.10.2

func Handle[T any](in func(T)) Handler[T]

Handle produces an Handler[T] function as a helper.

func (Handler[T]) Capture added in v0.10.2

func (of Handler[T]) Capture(in T) func()

Caputre returns a function that observes the specified variable, but only when executed later.

func (Handler[T]) Check added in v0.10.2

func (of Handler[T]) Check(in T) error

Check runs the observer function with a panic handler and converts a possible panic to an error.

func (Handler[T]) Filter added in v0.10.2

func (of Handler[T]) Filter(filter func(T) T) Handler[T]

Filter creates an observer that only executes the root observer Use this to process or transform the input before it is passed to the underlying observer. Use in combination with the Skip function to filter out non-actionable inputs.

func (Handler[T]) If added in v0.10.2

func (of Handler[T]) If(cond bool) Handler[T]

If returns an observer that only executes the root observer if the condition is true.

func (Handler[T]) Iterator added in v0.10.2

func (of Handler[T]) Iterator(iter *Iterator[T]) Worker

Iterator produces a worker that processes every item in the iterator with the observer function.

func (Handler[T]) Join added in v0.10.2

func (of Handler[T]) Join(next Handler[T]) Handler[T]

Join creates an observer function that runs both the root observer and the "next" observer.

func (Handler[T]) Lock added in v0.10.2

func (of Handler[T]) Lock() Handler[T]

Lock returns an observer that is protected by a mutex. All execution's of the observer are isolated.

func (Handler[T]) Once added in v0.10.2

func (of Handler[T]) Once() Handler[T]

Once produces an observer function that runs exactly once, and successive executions of the observer are noops.

func (Handler[T]) Operation added in v0.10.2

func (of Handler[T]) Operation(in T) Operation

Operation captures a variable and converts an Handler into a wait function that observes the value when the Operation runs.

func (Handler[T]) Processor added in v0.10.2

func (of Handler[T]) Processor() Processor[T]

Processor converts the observer to an observer function. The Processor will always return nil, and the context is ignored.

func (Handler[T]) Safe added in v0.10.2

func (of Handler[T]) Safe(oe Handler[error]) Handler[T]

Safe handles any panic encountered during the observer's execution and converts it to an error.

func (Handler[T]) Skip added in v0.10.2

func (of Handler[T]) Skip(hook func(T) bool) Handler[T]

Skip runs a check before passing the object to the obsever, when the condition function--which can inspect the input object--returns true, the underlying Handler is called, otherwise, the observation is a noop.

func (Handler[T]) When added in v0.10.2

func (of Handler[T]) When(cond func() bool) Handler[T]

When returns an observer function that only executes the observer function if the condition function returns true. The condition function is run every time the observer function runs.

func (Handler[T]) WithLock added in v0.10.2

func (of Handler[T]) WithLock(mtx *sync.Mutex) Handler[T]

WithLock protects the action of the observer with the provied mutex.

func (Handler[T]) Worker added in v0.10.2

func (of Handler[T]) Worker(in T) Worker

Worker captures a variable and returns a worker function which will, when executed, observe the input value. These worker functions, use the Safe-mode of execution.

type Handlers added in v0.10.0

type Handlers struct{}

The Handlers type serves to namespace constructors of common operations and specializations of generic functions provided by this package.

var HF Handlers = Handlers{}

HF provides namespaced access to the Handlers/constructors provided by the handler's type.

func (Handlers) Atoi added in v0.10.1

func (Handlers) Atoi() Transform[string, int]

Atoi produces a Transform function that converts strings into integers.

func (Handlers) Counter added in v0.10.1

func (Handlers) Counter(max int) *Iterator[int]

Counter produces an iterator that, starting at 1, yields monotonically increasing integers until the maximum is reached.

func (Handlers) ErrorCollector added in v0.10.0

func (Handlers) ErrorCollector() (ob Handler[error], prod Producer[[]error])

ErrorCollector provides a basic error aggregation facility that collects non-nil errors, and adds them to a slice internally, which is accessible via the producer. The operation of the observer and producer are protexted by a shared mutex.

func (Handlers) ErrorHandler added in v0.10.2

func (Handlers) ErrorHandler(of Handler[error]) Handler[error]

ErrorHandler constructs an error observer that only calls the wrapped observer when the error passed is non-nil.

func (Handlers) ErrorHandlerWithoutEOF added in v0.10.2

func (Handlers) ErrorHandlerWithoutEOF(of Handler[error]) Handler[error]

ErrorHandlerWithoutEOF wraps an error observer and propagates all non-error and non-io.EOF errors to the underlying observer.

func (Handlers) ErrorHandlerWithoutTerminating added in v0.10.2

func (Handlers) ErrorHandlerWithoutTerminating(of Handler[error]) Handler[error]

ErrorHandlerWithoutTerminating wraps an error observer and only calls the underlying observer if the input error is non-nil and is not one of the "terminating" errors used by this package (e.g. io.EOF and the context cancellation errors).

func (Handlers) ErrorProcessor added in v0.10.0

func (Handlers) ErrorProcessor(pf Processor[error]) Processor[error]

ErrorProcessor produces an error Processor function for errors that only calls the input Processor if the error is non-nil.

func (Handlers) ErrorUnwindTransformer added in v0.10.0

func (Handlers) ErrorUnwindTransformer(filter ers.Filter) Transform[error, []error]

ErrorUnwindTransformer provides the ers.Unwind operation as a transform method, which consumes an error and produces a slice of its component errors. All errors are processed by the provided filter, and the transformer's context is not used. The error value of the Transform function is always nil.

func (Handlers) Itoa added in v0.10.1

func (Handlers) Itoa() Transform[int, string]

Itoa produces a Transform function that converts integers into strings.

func (Handlers) Lines added in v0.10.1

func (Handlers) Lines(reader io.Reader) *Iterator[string]

Lines provides a fun.Iterator access over the contexts of a (presumably plaintext) io.Reader, using the bufio.Scanner. During iteration the leading and trailing space is also trimmed.

func (Handlers) OperationPool added in v0.10.0

func (Handlers) OperationPool(iter *Iterator[Operation]) Operation

RunOperations returns a Operation that, when called, processes the incoming iterator of Operations, starts a go routine running each, and wait function and then blocks until all operations have returned, or the context passed to the output function has been canceled.

func (Handlers) ProcessOperation added in v0.10.0

func (Handlers) ProcessOperation() Processor[Operation]

ProcessOperation constructs a Processor function for running Worker functions. Use in combination with Process and ProcessParallel, and to build worker pools.

The Handlers type serves to namespace these constructors, for interface clarity purposes. Use the HF variable to access this method as in:

fun.HF.ProcessOperation()

func (Handlers) ProcessWorker added in v0.10.0

func (Handlers) ProcessWorker() Processor[Worker]

ProcessWorker constructs a Processor function for running Worker functions. Use in combination with Process and ProcessParallel, and to build worker pools.

The Handlers type serves to namespace these constructors, for interface clarity purposes. Use the HF variable to access this method as in:

fun.HF.ProcessWorker()

func (Handlers) Sprint added in v0.10.0

func (Handlers) Sprint(args ...any) Future[string]

Sprint constructs a future that calls fmt.Sprint over the given variadic arguments.

func (Handlers) Sprintf added in v0.10.0

func (Handlers) Sprintf(tmpl string, args ...any) Future[string]

Sprintf produces a future that calls and returns fmt.Sprintf for the provided arguments when the future is called.

func (Handlers) Sprintln added in v0.10.0

func (Handlers) Sprintln(args ...any) Future[string]

Sprintln constructs a future that calls fmt.Sprintln over the given variadic arguments.

func (Handlers) Str added in v0.10.0

func (Handlers) Str(args []any) Future[string]

Str provides a future that calls fmt.Sprint over a slice of any objects. Use fun.HF.Sprint for a variadic alternative.

func (Handlers) StrConcatinate added in v0.10.0

func (Handlers) StrConcatinate(strs ...string) Future[string]

StrConcatinate produces a future that joins a variadic sequence of strings into a single string.

func (Handlers) StrJoin added in v0.10.0

func (Handlers) StrJoin(strs []string) Future[string]

StrJoin produces a future that combines a slice of strings into a single string, joined without spaces.

func (Handlers) StrJoinWith added in v0.10.0

func (Handlers) StrJoinWith(input []string, sep string) Future[string]

StrJoinWith produces a future for strings.Join(), concatenating the elements in the input slice with the provided separator.

func (Handlers) Strf added in v0.10.0

func (Handlers) Strf(tmpl string, args []any) Future[string]

Strf produces a future that calls fmt.Sprintf for the given template string and arguments.

func (Handlers) Stringer added in v0.10.0

func (Handlers) Stringer(op fmt.Stringer) Future[string]

Stringer converts a fmt.Stringer object/method call into a string-formatter.

func (Handlers) Strln added in v0.10.0

func (Handlers) Strln(args []any) Future[string]

Strln constructs a future that calls fmt.Sprintln for the given arguments.

type Iterator

type Iterator[T any] struct {
	// contains filtered or unexported fields
}

Iterator provides a safe, context-respecting iteration/sequence paradigm, and entire tool kit for consumer functions, converters, and generation options.

As the basis and heart of a programming model, iterators make it possible to think about groups or sequences of objects or work, that can be constructed and populated lazily, and provide a collection of interfaces for processing and manipulating data.

Beyond the iterator interactive tools provided in this package, the itertool package provdes some additional helpers and tools, while the adt and dt packages provide simple iterations and tooling around iterators.

The canonical way to use an iterator is with the core Next() Value() and Close() methods: Next takes a context and advances the iterator. Next, which is typically called in single-clause for loop (e.g. as in while loop) returns false when the iterator has no items, after which the iterator should be closed and cannot be re-started. When Next() returns true, the iterator is advanced, and the output of Value() will provide the value at the current position in the iterator. Next() will block if the iterator has not been closed, and the operation with Produces or Generates new items for the iterator blocks, (or continues iterating) until the iterator is exhausted, or closed.

However, additional methods, such as ReadOne, the Producer() function (which is a wrapper around ReadOne) provide a different iteraction paradigm: they combine the Next() and value operations into a single function call. When the iterator is exhausted these methods return the `io.EOF` error.

In all cases, checking the Close() value of the iterator makes it possible to see any errors encountered during the operation of the iterator.

Using Next/Value cannot be used concurrently, as there is no way to synchronize the Next/Value calls with respect to eachother: it's possible in this mode to both miss and/or get duplicate values from the iterator in this case. If the generator/producer function in the iterator is safe for concurrent use, then ReadOne can be used safely. As a rule, all tooling in the fun package uses ReadOne except in a few cases where a caller has exclusive access to the iterator.

func ChannelIterator added in v0.10.0

func ChannelIterator[T any](ch <-chan T) *Iterator[T]

ChannelIterator exposes access to an existing "receive" channel as an iterator.

func ConvertIterator added in v0.10.0

func ConvertIterator[T, O any](iter *Iterator[T], op Transform[T, O]) *Iterator[O]

ConvertIterator processes the input iterator of type T into an output iterator of type O. It's implementation uses the Generator, will continue producing values as long as the input iterator produces values, the context isn't canceled, or exhausted.

func Generator added in v0.8.5

func Generator[T any](op Producer[T]) *Iterator[T]

Generator creates an iterator that produces new values, using the generator function provided. This implementation does not create any background go routines, and the iterator will produce values until the function returns an error or the Close() method is called. Any non-nil error returned by the generator function is propagated to the close method, as long as it is not a context cancellation error or an io.EOF error.

func Map added in v0.9.3

func Map[T any, O any](
	input *Iterator[T],
	mpf Transform[T, O],
	optp ...OptionProvider[*WorkerGroupConf],
) *Iterator[O]

Map provides an orthodox functional map implementation based around fun.Iterator. Operates in asynchronous/streaming manner, so that the output Iterator must be consumed. The zero values of Options provide reasonable defaults for abort-on-error and single-threaded map operation.

If the mapper function errors, the result isn't included, but the errors would be aggregated and propagated to the `Close()` method of the resulting iterator. The mapping operation respects the fun.ErrIterationSkip error, If there are more than one error (as is the case with a panic or with ContinueOnError semantics,) the error can be unwrapped or converted to a slice with the fun.Unwind function. Panics in the map function are converted to errors and always collected but may abort the operation if ContinueOnPanic is not set.

func MergeIterators added in v0.10.0

func MergeIterators[T any](iters ...*Iterator[T]) *Iterator[T]

MergeIterators takes a collection of iterators of the same type of objects and provides a single iterator over these items.

There are a collection of background threads which will iterate over the inputs and will provide the items to the output iterator. These threads start on the first iteration and will exit if this context is canceled.

The iterator will continue to produce items until all input iterators have been consumed, the initial context is canceled, or the Close method is called, or all of the input iterators have returned an error.

func SliceIterator added in v0.10.0

func SliceIterator[T any](in []T) *Iterator[T]

SliceIterator provides Iterator access to the elements in a slice.

func VariadicIterator added in v0.10.0

func VariadicIterator[T any](in ...T) *Iterator[T]

VariadicIterator produces an iterator from an arbitrary collection of objects, passed into the constructor.

func (*Iterator[T]) AddError added in v0.10.0

func (i *Iterator[T]) AddError(e error)

AddError can be used by calling code to add errors to the iterator, which are merged.

AddError is not safe for concurrent use (with regards to other AddError calls or Close).

func (*Iterator[T]) Any added in v0.10.0

func (i *Iterator[T]) Any() *Iterator[any]

Any, as a special case of Transform converts an iterator of any type and converts it to an iterator of any (e.g. interface{}) values.

func (*Iterator[T]) Buffer added in v0.10.0

func (i *Iterator[T]) Buffer(n int) *Iterator[T]

Buffer adds a buffer in the queue using a channel as buffer to smooth out iteration performance, if the iteration (producer) and the consumer both take time, even a small buffer will improve the throughput of the system and prevent both components of the system from blocking on eachother.

The ordering of elements in the output iterator is the same as the order of elements in the input iterator.

func (*Iterator[T]) BufferedChannel added in v0.10.0

func (i *Iterator[T]) BufferedChannel(ctx context.Context, size int) <-chan T

BufferedChannel provides access to the content of the iterator with a buffered channel that is closed when the iterator is exhausted.

func (*Iterator[T]) Channel added in v0.10.0

func (i *Iterator[T]) Channel(ctx context.Context) <-chan T

Channel proides access to the contents of the iterator as a channel. The channel is closed when the iterator is exhausted.

func (*Iterator[T]) Close

func (i *Iterator[T]) Close() error

Close terminates the iterator and returns any errors collected during iteration. If the iterator allocates resources, this will typically release them, but close may not block until all resources are released.

func (*Iterator[T]) Count added in v0.10.0

func (i *Iterator[T]) Count(ctx context.Context) int

Count returns the number of items observed by the iterator. Callers should still manually call Close on the iterator.

func (*Iterator[T]) ErrorHandler added in v0.10.2

func (i *Iterator[T]) ErrorHandler() Handler[error]

ErrorHandler provides access to the AddError method as an error observer.

func (*Iterator[T]) Filter added in v0.10.0

func (i *Iterator[T]) Filter(check func(T) bool) *Iterator[T]

Filter passes every item in the iterator and, if the check function returns true propagates it to the output iterator. There is no buffering, and check functions should return quickly. For more advanced use, consider using itertool.Map()

func (*Iterator[T]) Join added in v0.10.0

func (i *Iterator[T]) Join(iters ...*Iterator[T]) *Iterator[T]

Join merges multiple iterators processing and producing their results sequentially, and without starting any go routines. Otherwise similar to MergeIterators (which processes each iterator in parallel).

func (*Iterator[T]) MarshalJSON added in v0.10.0

func (i *Iterator[T]) MarshalJSON() ([]byte, error)

MarshalJSON is useful for implementing json.Marshaler methods from iterator-supporting types. Wrapping the standard library's json encoding tools.

The contents of the iterator are marshaled as elements in an JSON array.

func (*Iterator[T]) Next

func (i *Iterator[T]) Next(ctx context.Context) bool

Next advances the iterator (using ReadOne) and caches the current value for access with the Value() method. When Next is true, the Value() will return the next item. When false, either the iterator has been exhausted (e.g. the Producer function has returned io.EOF) or the context passed to Next has been canceled.

Using Next/Value cannot be done safely if iterator is accessed from multiple go routines concurrently. In these cases use ReadOne directly, or use Split to create an iterator that safely draws items from the parent iterator.

func (*Iterator[T]) Observe added in v0.10.0

func (i *Iterator[T]) Observe(ctx context.Context, fn Handler[T]) (err error)

Observe processes an iterator calling the observer function for every element in the iterator and retruning when the iterator is exhausted. Take care to ensure that the Observe function does not block.

The error returned captures any panics encountered as an error, as well as the output of the Close() operation. Observe will not add a context cancelation error to its error, though the observed iterator may return one in its close method.

func (*Iterator[T]) ParallelBuffer added in v0.10.0

func (i *Iterator[T]) ParallelBuffer(n int) *Iterator[T]

ParallelBuffer, like buffer, process the input queue and stores those items in a channel; however, unlike Buffer, multiple workers consume the input iterator: as a result the order of the elements in the output iterator is not the same as the input order.

Otherwise, the two Buffer methods are equivalent and serve the same purpose: process the items from an iterator without blocking the consumer of the iterator.

func (*Iterator[T]) Process added in v0.10.0

func (i *Iterator[T]) Process(fn Processor[T]) Worker

Process provides a function consumes all items in the iterator with the provided processor function.

All panics are converted to errors and propagated in the response of the worker, and abort the processing. If the processor function returns ErrIteratorSkip, processing continues. All other errors abort processing and are returned by the worker.

func (*Iterator[T]) ProcessParallel added in v0.10.0

func (i *Iterator[T]) ProcessParallel(
	fn Processor[T],
	optp ...OptionProvider[*WorkerGroupConf],
) Worker

ProcessParallel produces a worker that, when executed, will iteratively processes the contents of the iterator. The options control the error handling and parallelism semantics of the operation.

This is the work-house operation of the package, and can be used as the basis of worker pools, even processing, or message dispatching for pubsub queues and related systems.

func (*Iterator[T]) Producer added in v0.10.0

func (i *Iterator[T]) Producer() Producer[T]

Producer provides access to the contents of the iterator as a Producer function.

func (*Iterator[T]) ReadOne added in v0.10.0

func (i *Iterator[T]) ReadOne(ctx context.Context) (out T, err error)

ReadOne advances the iterator and returns the value as a single option. This operation IS safe for concurrent use.

ReadOne returns the io.EOF error when the iterator has been exhausted, a context expiration error or the underlying error produced by the iterator. All errors produced by ReadOne are terminal and indicate that no further iteration is possible.

func (*Iterator[T]) Reduce added in v0.10.0

func (i *Iterator[T]) Reduce(
	reducer func(T, T) (T, error),
) Producer[T]

Reduce processes an iterator with a reducer function. The output function is a Producer operation which runs synchronously, and no processing happens before producer is called. If the reducer function returns, ErrIteratorskip, the output value is ignored, and the reducer operation continues. io.EOR errors are not propagated to the caller, and in all situations, the last value produced by the reducer is returned with an error.

The "previous" value for the first reduce option is the zero value for the type T.

func (*Iterator[T]) Slice added in v0.10.0

func (i *Iterator[T]) Slice(ctx context.Context) (out []T, _ error)

Slice converts an iterator to the slice of it's values, and closes the iterator at the when the iterator has been exhausted..

In the case of an error in the underlying iterator the output slice will have the values encountered before the error.

func (*Iterator[T]) Split added in v0.10.0

func (i *Iterator[T]) Split(num int) []*Iterator[T]

Split produces an arbitrary number of iterators which divide the input. The division is lazy and depends on the rate of consumption of output iterators, but every item from the input iterator is sent to exactly one output iterator, each of which can be safely used from a different go routine.

The input iterator is not closed after the output iterators are exhausted. There is one background go routine that reads items off of the input iterator, which starts when the first output iterator is advanced: be aware that canceling this context will effectively cancel all iterators.

func (*Iterator[T]) Transform added in v0.10.1

func (i *Iterator[T]) Transform(op Transform[T, T]) *Iterator[T]

Transform processes an iterator passing each element through a transform function. The type of the iterator is the same for the output. Use Convert iterator to change the type of the value.

func (*Iterator[T]) UnmarshalJSON added in v0.10.0

func (i *Iterator[T]) UnmarshalJSON(in []byte) error

UnmarshalJSON reads a byte-array of input data that contains a JSON array and then processes and returns that data iteratively.

To handle streaming data from an io.Reader that contains a stream of line-separated json documents, use itertool.JSON.

func (*Iterator[T]) Value

func (i *Iterator[T]) Value() T

Value returns the object at the current position in the iterator. It's often used with Next() for looping over the iterator.

Value and Next cannot be done safely when the iterator is bueing used concrrently. Use ReadOne or the Prodicer

type Operation added in v0.10.0

type Operation func(context.Context)

Operation is a type of function object that will block until an operation returns or the context is canceled.

func BlockingOperation added in v0.10.0

func BlockingOperation(in func()) Operation

BlockingOperation converts a function that takes no arguments into an Operation.

func WaitChannel added in v0.6.3

func WaitChannel[T any](ch <-chan T) Operation

WaitChannel converts a channel (typically, a `chan struct{}`) to a Operation. The Operation blocks till it's context is canceled or the channel is either closed or returns one item.

func WaitContext added in v0.6.3

func WaitContext(ctx context.Context) Operation

WaitContext wait's for the context to be canceled before returning. The Operation that's return also respects it's own context. Use this Operation and it's own context to wait for a context to be cacneled with a timeout, for instance.

func WaitForGroup added in v0.8.0

func WaitForGroup(wg *sync.WaitGroup) Operation

WaitForGroup converts a sync.WaitGroup into a fun.Operation.

This operation will leak a go routine if the WaitGroup never returns and the context is canceled. To avoid a leaked goroutine, use the fun.WaitGroup type.

func (Operation) Add added in v0.10.0

func (wf Operation) Add(ctx context.Context, wg *WaitGroup)

Add starts a goroutine that waits for the Operation to return, incrementing and decrementing the sync.WaitGroup as appropriate. The execution of the wait fun blocks on Add's context.

func (Operation) After added in v0.10.0

func (wf Operation) After(ts time.Time) Operation

After provides an operation that will only run if called after the specified clock time. When called after this time, the operation blocks until that time passes (or the context is canceled.)

func (Operation) Background added in v0.10.0

func (wf Operation) Background(wg *WaitGroup) Operation

Backgrounds creates a new operation which, when the resulting operation is called, starts the root operation in the background and returns immediately. use the wait group, or it's Operation to block on the completion of the background execution.

func (Operation) Block added in v0.10.0

func (wf Operation) Block()

Block runs the Operation with a context that will never be canceled.

func (Operation) Delay added in v0.10.0

func (wf Operation) Delay(dur time.Duration) Operation

Delay wraps a Operation in a function that will always wait for the specified duration before running.

If the value is negative, then there is always zero delay.

func (Operation) Future added in v0.10.0

func (wf Operation) Future(ctx context.Context) Operation

Future starts the operation in a background go routine and returns an operation which blocks until it's context is canceled or the underlying operation returns.

func (Operation) Go added in v0.10.0

func (wf Operation) Go(ctx context.Context)

Go launches the operation in a go routine. There is no panic-safety provided.

func (Operation) If added in v0.10.0

func (wf Operation) If(cond bool) Operation

If provides a static version of the When that only runs if the condition is true, and is otherwise a noop.

func (Operation) Jitter added in v0.10.0

func (wf Operation) Jitter(dur func() time.Duration) Operation

Jitter wraps a Operation that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the Operation operation.

If the function produces a negative duration, there is no delay.

func (Operation) Join added in v0.10.0

func (wf Operation) Join(op Operation) Operation

Join runs the first operation, and then if the context has not expired, runs the second operation.

func (Operation) Launch added in v0.10.0

func (wf Operation) Launch() Operation

Launch provides access to the Go method (e.g. starting this operation in a go routine.) as a method that can be used as an operation itself.

func (Operation) Limit added in v0.10.0

func (wf Operation) Limit(in int) Operation

Limit provides an operation that will, no matter how many times is called, will only run once. The resulting operation is safe for concurrent use. Operations are launched serially (to maintain the counter,) but the operations themselves can run concurrently.

func (Operation) Lock added in v0.10.0

func (wf Operation) Lock() Operation

Lock constructs a mutex that ensure that the underlying operation (when called through the output operation,) only runs within the scope of the lock.

func (Operation) Once added in v0.10.0

func (wf Operation) Once() Operation

Once produces an operation that will only execute the root operation once, no matter how many times it's called.

func (Operation) PostHook added in v0.10.0

func (wf Operation) PostHook(hook func()) Operation

PostHook unconditionally runs the post-hook operation after the operation returns. Use the hook to run cleanup operations.

func (Operation) PreHook added in v0.10.0

func (wf Operation) PreHook(hook Operation) Operation

PreHook unconditionally runs the hook operation before the underlying operation. Use Operaiton.Once() operations for the hook to initialize resources for use by the operation, or without Once to reset.

func (Operation) Run added in v0.10.0

func (wf Operation) Run(ctx context.Context)

Run is equivalent to calling the operation directly

func (Operation) Safe added in v0.10.0

func (wf Operation) Safe() Worker

Safe converts the Operation into a Worker function that catchers panics and returns them as errors using fun.Check.

func (Operation) Signal added in v0.10.0

func (wf Operation) Signal(ctx context.Context) <-chan struct{}

Sginal starts the operation in a go routine, and provides a signal channel which will be closed when the operation is complete.

func (Operation) StartGroup added in v0.10.0

func (wf Operation) StartGroup(ctx context.Context, wg *WaitGroup, n int)

StartGroup runs n groups, incrementing the waitgroup as appropriate.

func (Operation) TTL added in v0.10.0

func (wf Operation) TTL(dur time.Duration) Operation

TTL runs an operation, and if the operation is called before the specified duration, the operation is a noop.

func (Operation) When added in v0.10.0

func (wf Operation) When(cond func() bool) Operation

When runs the condition function, and if it returns true,

func (Operation) WithCancel added in v0.10.0

func (wf Operation) WithCancel() (Operation, context.CancelFunc)

WithCancel creates a Operation and a cancel function which will terminate the context that the root Operation is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Operation is canceled.)

func (Operation) WithLock added in v0.10.0

func (wf Operation) WithLock(mtx *sync.Mutex) Operation

WithLock ensures that the underlying operation, when called through the output operation, will holed the mutex while running.

func (Operation) Worker added in v0.10.0

func (wf Operation) Worker() Worker

Worker converts a wait function into a fun.Worker. If the context is canceled, the worker function returns the context's error.

type OptionProvider added in v0.10.0

type OptionProvider[T any] func(T) error

OptionProvider is a function type for building functional arguments, and is used for the parallel iterator processing (map, transform, for-each, etc.) in the fun and itertool packages, and available with tooling for use in other contexts.

The type T should always be mutable (e.g. a map, or a pointer).

func JoinOptionProviders added in v0.10.0

func JoinOptionProviders[T any](op ...OptionProvider[T]) OptionProvider[T]

JoinOptionProviders takes a zero or more option providers and produces a single combined option provider. With zero or nil arguments, the operation becomes a noop.

func WorkerGroupConfAddExcludeErrors added in v0.10.0

func WorkerGroupConfAddExcludeErrors(errs ...error) OptionProvider[*WorkerGroupConf]

WorkerGroupConfAddExcludeErrors appends the provided errors to the ExcludedErrors value. The provider will return an error if any of the input iterators is ErrRecoveredPanic.

func WorkerGroupConfContinueOnError added in v0.10.0

func WorkerGroupConfContinueOnError() OptionProvider[*WorkerGroupConf]

WorkerGroupConfContinueOnError toggles the option that allows the operation to continue when the operation encounters an error. Otherwise, any option will lead to an abort.

func WorkerGroupConfContinueOnPanic added in v0.10.0

func WorkerGroupConfContinueOnPanic() OptionProvider[*WorkerGroupConf]

WorkerGroupConfContinueOnPanic toggles the option that allows the operation to continue when encountering a panic.

func WorkerGroupConfErrorCollectorPair added in v0.10.0

func WorkerGroupConfErrorCollectorPair(ob Handler[error], resolver Producer[[]error]) OptionProvider[*WorkerGroupConf]

WorkerGroupConfErrorCollectorPair uses an Handler/Producer pair to collect errors. A basic implementation, accessible via HF.ErrorCollector() is suitable for this purpose.

func WorkerGroupConfErrorHandler added in v0.10.2

func WorkerGroupConfErrorHandler(observer Handler[error]) OptionProvider[*WorkerGroupConf]

WorkerGroupConfWithErrorCollector saves an error observer to the configuration. Typically implementations will provide some default error collection tool, and will only call the observer for non-nil errors. ErrorHandlers should be safe for concurrent use.

func WorkerGroupConfErrorResolver added in v0.10.0

func WorkerGroupConfErrorResolver(resolver func() error) OptionProvider[*WorkerGroupConf]

WorkerGroupConfErrorResolver reports the errors collected by the observer. If the ErrorHandler is not set the resolver may be overridden. ErrorHandlers should be safe for concurrent use.

func WorkerGroupConfIncludeContextErrors added in v0.10.0

func WorkerGroupConfIncludeContextErrors() OptionProvider[*WorkerGroupConf]

WorkerGroupConfIncludeContextErrors toggles the option that forces the operation to include context errors in the output. By default they are not included.

func WorkerGroupConfNumWorkers added in v0.10.0

func WorkerGroupConfNumWorkers(num int) OptionProvider[*WorkerGroupConf]

WorkerGroupConfNumWorkers sets the number of workers configured. It is not possible to set this value to less than 1: negative values and 0 are always ignored.

func WorkerGroupConfSet added in v0.10.0

func WorkerGroupConfSet(opt *WorkerGroupConf) OptionProvider[*WorkerGroupConf]

WorkerGroupConfSet overrides the option with the provided option.

func WorkerGroupConfWithErrorCollector added in v0.10.0

func WorkerGroupConfWithErrorCollector(
	ec interface {
		Add(error)
		Resolve() error
	},
) OptionProvider[*WorkerGroupConf]

WorkerGroupConfWithErrorCollector sets an error collector implementation for later use in the WorkerGroupOptions. The resulting function will only error if the collector is nil, however, this method will override an existing error collector.

The ErrorCollector interface is typically provided by the `erc.Collector` type.

ErrorCollectors are used by some operations to collect, aggregate, and distribute errors from operations to the caller.

func WorkerGroupConfWorkerPerCPU added in v0.10.0

func WorkerGroupConfWorkerPerCPU() OptionProvider[*WorkerGroupConf]

WorkerGroupConfWorkerPerCPU sets the number of workers to the number of detected CPUs by the runtime (e.g. runtime.NumCPU()).

func (OptionProvider[T]) Apply added in v0.10.0

func (op OptionProvider[T]) Apply(in T) error

Apply applies the current Operation Provider to the configuration, and if the type T implements a Validate() method, calls that. All errors are aggregated.

func (OptionProvider[T]) Join added in v0.10.0

func (op OptionProvider[T]) Join(opps ...OptionProvider[T]) OptionProvider[T]

Join aggregates a collection of Option Providers into a single option provider. The amalgamated operation is panic safe and omits all nil providers.

type Processor added in v0.10.0

type Processor[T any] func(context.Context, T) error

Processor are generic functions that take an argument (and a context) and return an error. They're the type of function used by the itertool.Process/itertool.ParallelForEach and useful in other situations as a compliment to fun.Worker and Operation.

In general the implementations of the methods for processing functions are wrappers around their similarly named fun.Worker analogues.

func MakeProcessor added in v0.10.2

func MakeProcessor[T any](fn func(T) error) Processor[T]

MakeProcessor converts a function with the Processor signature (minus the context) for easy conversion.

func Processify added in v0.10.2

func Processify[T any](fn func(context.Context, T) error) Processor[T]

Processify provides an easy converter/constructor for Processor-type function

func (Processor[T]) After added in v0.10.0

func (pf Processor[T]) After(ts time.Time) Processor[T]

After produces a Processor that will execute after the provided timestamp.

func (Processor[T]) Block added in v0.10.0

func (pf Processor[T]) Block(in T) error

Block runs the ProcessFunc with a context that will never be canceled.

func (Processor[T]) Check added in v0.10.0

func (pf Processor[T]) Check(ctx context.Context, in T) bool

Check processes the input and returns true when the error is nil, and false when there was an error.

func (Processor[T]) Delay added in v0.10.0

func (pf Processor[T]) Delay(dur time.Duration) Processor[T]

Delay wraps a Processor in a function that will always wait for the specified duration before running.

If the value is negative, then there is always zero delay.

func (Processor[T]) FilterErrors added in v0.10.0

func (pf Processor[T]) FilterErrors(ef ers.Filter) Processor[T]

FilterErrors uses an ers.Filter to process the error respose from the processor.

func (Processor[T]) Force added in v0.10.0

func (pf Processor[T]) Force(in T)

Force processes the input, but discards the error and uses a context that will not expire.

func (Processor[T]) Future added in v0.10.0

func (pf Processor[T]) Future(ctx context.Context, in T) Worker

Future begins processing the input immediately and returns a worker function that returns the processor's error, and will block until the processor returns.

func (Processor[T]) Handler added in v0.10.2

func (pf Processor[T]) Handler(ctx context.Context, oe Handler[error]) Handler[T]

Handler converts a processor into an observer, handling the error with the error observer and using the provided context.

func (Processor[T]) If added in v0.10.0

func (pf Processor[T]) If(c bool) Processor[T]

If runs the processor function if, and only if the condition is true. Otherwise the function does not run and the processor returns nil.

The resulting processor can be used more than once.

func (Processor[T]) Ignore added in v0.10.0

func (pf Processor[T]) Ignore(ctx context.Context, in T)

Ignore runs the process function and discards the error.

func (Processor[T]) Iterator added in v0.10.0

func (pf Processor[T]) Iterator(iter *Iterator[T]) Worker

Iterator creates a worker function that processes the iterator with the processor function, merging/collecting all errors, and respecting the worker's context. The processing does not begin until the worker is called.

func (Processor[T]) Jitter added in v0.10.0

func (pf Processor[T]) Jitter(jf func() time.Duration) Processor[T]

Jitter wraps a Processor that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the processor.

If the function produces a negative duration, there is no delay.

func (Processor[T]) Join added in v0.10.0

func (pf Processor[T]) Join(next Processor[T]) Processor[T]

Join wraps the processor and executes both the root processor and then the next processor, assuming the root processor is not canceled and the context has not expired.

func (Processor[T]) Limit added in v0.10.0

func (pf Processor[T]) Limit(n int) Processor[T]

Limit ensures that the processor is called at most n times.

func (Processor[T]) Lock added in v0.10.0

func (pf Processor[T]) Lock() Processor[T]

Lock wraps the Processor and protects its execution with a mutex.

func (Processor[T]) Once added in v0.10.0

func (pf Processor[T]) Once() Processor[T]

Once make a processor that can only run once. Subsequent calls to the processor return the cached error of the original run.

func (Processor[T]) Operation added in v0.10.0

func (pf Processor[T]) Operation(in T, of Handler[error]) Operation

Operation converts a processor into a worker that will process the input provided when executed.

func (Processor[T]) PostHook added in v0.10.0

func (pf Processor[T]) PostHook(op func()) Processor[T]

PostHook produces an amalgamated processor that runs after the processor completes. Panics are caught, converted to errors, and aggregated with the processors error. The hook operation is unconditionally called after the processor function (except in the case of a processor panic.)

func (Processor[T]) PreHook added in v0.10.0

func (pf Processor[T]) PreHook(op Operation) Processor[T]

PreHook creates an amalgamated Processor that runs the operation before the root processor. If the operation panics that panic is converted to an error and merged with the processor's error. Use with Operation.Once() to create an "init" function that runs once before a processor is called the first time.

func (Processor[T]) ReadAll added in v0.10.0

func (pf Processor[T]) ReadAll(prod Producer[T]) Worker

ReadAll reads elements from the producer until an error is encountered and passes them to a producer, until the first error is encountered. The work

func (Processor[T]) ReadOne added in v0.10.0

func (pf Processor[T]) ReadOne(prod Producer[T]) Worker

ReadOne returns a future (Worker) that calls the processor function on the output of the provided producer function. ReadOne uses the fun.Pipe() operation for the underlying implementation.

func (Processor[T]) Run added in v0.10.0

func (pf Processor[T]) Run(ctx context.Context, in T) error

Run executes the ProcessFunc but creates a context within the function (decended from the context provided in the arguments,) that is canceled when Run() returns to avoid leaking well behaved resources outside of the scope of the function execution. Run can also be passed as a Processor func.

func (Processor[T]) Safe added in v0.10.0

func (pf Processor[T]) Safe() Processor[T]

Safe runs the producer, converted all panics into errors. Safe is itself a processor.

func (Processor[T]) TTL added in v0.10.0

func (pf Processor[T]) TTL(dur time.Duration) Processor[T]

TTL returns a Processor that runs once in the specified window, and returns the error from the last run in between this interval. While the executions of the underlying function happen in isolation, in between, the processor is concurrently accessible.

func (Processor[T]) When added in v0.10.0

func (pf Processor[T]) When(c func() bool) Processor[T]

When returns a processor function that runs if the conditional function returns true, and does not run otherwise. The conditional function is evaluated every time the returned processor is run.

func (Processor[T]) WithCancel added in v0.10.0

func (pf Processor[T]) WithCancel() (Processor[T], context.CancelFunc)

WithCancel creates a Processor and a cancel function which will terminate the context that the root proccessor is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Processor is canceled.)

func (Processor[T]) WithLock added in v0.10.0

func (pf Processor[T]) WithLock(mtx *sync.Mutex) Processor[T]

WithLock wraps the Processor and ensures that the mutex is always held while the root Processor is called.

func (Processor[T]) WithoutErrors added in v0.10.0

func (pf Processor[T]) WithoutErrors(errs ...error) Processor[T]

WithoutErrors returns a producer that will convert a non-nil error of the provided types to a nil error.

func (Processor[T]) Worker added in v0.10.0

func (pf Processor[T]) Worker(in T) Worker

Worker converts the processor into a worker, passing the provide input into the root processor function. The Processor is not run until the worker is called.

type Producer added in v0.10.0

type Producer[T any] func(context.Context) (T, error)

Producer is a function type that is a failrly common constructor. It's signature is used to create iterators, as a generator, and functions like a Future.

func BlockingProducer added in v0.10.0

func BlockingProducer[T any](fn func() (T, error)) Producer[T]

BlockingProducer constructs a producer that wraps a similar function that does not take a context.

func ConsistentProducer added in v0.10.0

func ConsistentProducer[T any](fn func() T) Producer[T]

ConsistentProducer constructs a wrapper around a similar function type that does not return an error or take a context. The resulting function will never error.

func MakeFuture added in v0.10.0

func MakeFuture[T any](ch <-chan T) Producer[T]

MakeFuture constructs a producer that blocks to receive one item from the specified channel. Subsequent calls to the producer will block/yield additional items from the channel. The producer will only return an error if the channel is closed (io.EOF) or the context has expired.

func StaticProducer added in v0.10.0

func StaticProducer[T any](val T, err error) Producer[T]

StaticProducer returns a producer function that always returns the provided values.

func ValueProducer added in v0.10.0

func ValueProducer[T any](val T) Producer[T]

ValueProducer returns a producer function that always returns the provided value, and a nill error.

func (Producer[T]) After added in v0.10.0

func (pf Producer[T]) After(ts time.Time) Producer[T]

After will return a Producer that will block until the provided time is in the past, and then execute normally.

func (Producer[T]) Background added in v0.10.0

func (pf Producer[T]) Background(ctx context.Context, of Handler[T]) Worker

Background constructs a worker that runs the provided Producer in a background thread and passes the produced value to the observe.

The worker function's return value captures the procuder's error, and will block until the producer has completed.

func (Producer[T]) Block added in v0.10.0

func (pf Producer[T]) Block() (T, error)

Block runs the producer with a context that will ever expire.

func (Producer[T]) Check added in v0.10.0

func (pf Producer[T]) Check(ctx context.Context) (T, bool)

Check uses the error observer to consume the error from the Producer and returns a function that takes a context and returns a value.

func (Producer[T]) CheckBlock added in v0.10.0

func (pf Producer[T]) CheckBlock() (T, bool)

func (Producer[T]) Delay added in v0.10.0

func (pf Producer[T]) Delay(d time.Duration) Producer[T]

Delay wraps a Producer in a function that will always wait for the specified duration before running.

If the value is negative, then there is always zero delay.

func (Producer[T]) FilterErrors added in v0.10.0

func (pf Producer[T]) FilterErrors(ef ers.Filter) Producer[T]

FilterErrors passes the error of the root Producer function with the ers.Filter.

func (Producer[T]) Force added in v0.10.0

func (pf Producer[T]) Force() T

Force combines the semantics of Must and Block: runs the producer with a context that never expires and panics in the case of an error.

func (Producer[T]) Future added in v0.10.0

func (pf Producer[T]) Future(ctx context.Context, ob Handler[error]) Future[T]

Future creates a future function using the context provided and error observer to collect the error.

func (Producer[T]) GenerateParallel added in v0.10.0

func (pf Producer[T]) GenerateParallel(
	optp ...OptionProvider[*WorkerGroupConf],
) *Iterator[T]

ParallelGenerate creates an iterator using a generator pattern which produces items until the generator function returns io.EOF, or the context (passed to the first call to Next()) is canceled. Parallel operation, and continue on error/continue-on-panic semantics are available and share configuration with the Map and Process operations.

func (Producer[T]) If added in v0.10.0

func (pf Producer[T]) If(cond bool) Producer[T]

If returns a producer that will execute the root producer only if the cond value is true. Otherwise, If will return the zero value for T and a nil error.

func (Producer[T]) Ignore added in v0.10.0

func (pf Producer[T]) Ignore(ctx context.Context) T

Ignore runs the producer function and returns the value, ignoring the error.

func (Producer[T]) Iterator added in v0.10.0

func (pf Producer[T]) Iterator() *Iterator[T]

Iterator creates an iterator that calls the Producer function once for every iteration, until it errors. Errors that are not context cancellation errors or io.EOF are propgated to the iterators Close method.

func (Producer[T]) IteratorWithHook added in v0.10.0

func (pf Producer[T]) IteratorWithHook(hook func(*Iterator[T])) *Iterator[T]

IteratorWithHook constructs an Iterator from the producer. The provided hook function will run during the Iterators Close() method.

func (Producer[T]) Jitter added in v0.10.0

func (pf Producer[T]) Jitter(jf func() time.Duration) Producer[T]

Jitter wraps a Producer that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the Producer.

If the function produces a negative duration, there is no delay.

func (Producer[T]) Join added in v0.10.0

func (pf Producer[T]) Join(next Producer[T]) Producer[T]

Join, on successive calls, runs the first producer until it returns an io.EOF error, and then returns the results of the second producer. If either producer returns another error (context cancelation or otherwise,) those errors are returned.

When the second function returns io.EOF, all successive calls will return io.EOF.

func (Producer[T]) LaunchFuture added in v0.10.0

func (pf Producer[T]) LaunchFuture(ctx context.Context) Producer[T]

LaunchFuture runs the producer in the background, when function is called, and returns a producer which, when called, blocks until the original producer returns.

func (Producer[T]) Limit added in v0.10.0

func (pf Producer[T]) Limit(in int) Producer[T]

Limit runs the producer a specified number of times, and caches the result of the last execution and returns that value for any subsequent executions.

func (Producer[T]) Lock added in v0.10.0

func (pf Producer[T]) Lock() Producer[T]

Lock creates a producer that runs the root mutex as per normal, but under the protection of a mutex so that there's only one execution of the producer at a time.

func (Producer[T]) Must added in v0.10.0

func (pf Producer[T]) Must(ctx context.Context) T

Must runs the producer returning the constructed value and panicing if the producer errors.

func (Producer[T]) Once added in v0.10.0

func (pf Producer[T]) Once() Producer[T]

Once returns a producer that only executes ones, and caches the return values, so that subsequent calls to the output producer will return the same values.

func (Producer[T]) Operation added in v0.10.0

func (pf Producer[T]) Operation(of Handler[T], eo Handler[error]) Operation

Operation produces a wait function, using two observers to handle the output of the Producer.

func (Producer[T]) PostHook added in v0.10.0

func (pf Producer[T]) PostHook(op func()) Producer[T]

PostHook appends a function to the execution of the producer. If the function panics it is converted to an error and aggregated with the error of the producer.

Useful for calling context.CancelFunc, closers, or incrementing counters as necessary.

func (Producer[T]) PreHook added in v0.10.0

func (pf Producer[T]) PreHook(op Operation) Producer[T]

PreHook configures an operation function to run before the returned producer. If the pre-hook panics, it is converted to an error which is aggregated with the (potential) error from the producer, and returned with the producer's output.

func (Producer[T]) Run added in v0.10.0

func (pf Producer[T]) Run(ctx context.Context) (T, error)

Run executes the producer.

func (Producer[T]) Safe added in v0.10.0

func (pf Producer[T]) Safe() Producer[T]

Safe returns a wrapped producer with a panic handler that converts any panic to an error.

func (Producer[T]) SendAll added in v0.10.0

func (pf Producer[T]) SendAll(proc Processor[T]) Worker

SendAll provides a form of iteration, by construction a future (Worker) that consumes the values of the producer with the processor until either function returns an error. SendAll respects ErrIteratorSkip and io.EOF

func (Producer[T]) SendOne added in v0.10.0

func (pf Producer[T]) SendOne(proc Processor[T]) Worker

SendOne makes a Worker function that, as a future, calls the producer once and then passes the output, if there are no errors, to the processor function. Provides the inverse operation of Processor.ReadOne.

func (Producer[T]) TTL added in v0.10.0

func (pf Producer[T]) TTL(dur time.Duration) Producer[T]

TTL runs the producer only one time per specified interval. The interval must me greater than 0.

func (Producer[T]) When added in v0.10.0

func (pf Producer[T]) When(cond func() bool) Producer[T]

When constructs a producer that will call the cond upon every execution, and when true, will run and return the results of the root producer. Otherwise When will return the zero value of T and a nil error.

func (Producer[T]) WithCancel added in v0.10.0

func (pf Producer[T]) WithCancel() (Producer[T], context.CancelFunc)

WithCancel creates a Producer and a cancel function which will terminate the context that the root Producer is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Producer is canceled.)

func (Producer[T]) WithLock added in v0.10.0

func (pf Producer[T]) WithLock(mtx *sync.Mutex) Producer[T]

WithLock uses the provided mutex to protect the execution of the producer.

func (Producer[T]) WithoutErrors added in v0.10.0

func (pf Producer[T]) WithoutErrors(errs ...error) Producer[T]

WithoutErrors returns a Producer function that wraps the root producer and, after running the root producer, and makes the error value of the producer nil if the error returned is in the error list. The produced value in these cases is almost always the zero value for the type.

func (Producer[T]) Worker added in v0.10.0

func (pf Producer[T]) Worker(of Handler[T]) Worker

Worker passes the produced value to an observer and returns a worker that runs the producer, calls the observer, and returns the error.

type RuntimeInvariant added in v0.10.0

type RuntimeInvariant struct{}

RuntimeInvariant is a type defined to create a namespace, callable (typically) via the Invariant symbol. Access these functions as in:

fun.Invariant.IsTrue(len(slice) > 0, "slice must have elements", len(slice))

Invariant provides a namespace for making runtime invariant assertions. These all raise panics, passing error objects from panic, which can be more easily handled. These helpers are syntactic sugar around Invariant.OK, and the invariant is violated the ErrInvariantViolation.

func (RuntimeInvariant) Failure added in v0.10.1

func (RuntimeInvariant) Failure(args ...any)

Failure unconditionally raises an invariant failure error and processes the arguments as with the other invariant failures: extracting errors and aggregating constituent errors.

func (RuntimeInvariant) IsFalse added in v0.10.0

func (RuntimeInvariant) IsFalse(cond bool, args ...any)

IsTrue provides a runtime assertion that the condition is false, and annotates panic object, which is an error rooted in the ErrInvariantViolation. In all other cases the operation is a noop.

func (RuntimeInvariant) IsTrue added in v0.10.0

func (RuntimeInvariant) IsTrue(cond bool, args ...any)

IsTrue provides a runtime assertion that the condition is true, and annotates panic object, which is an error rooted in the ErrInvariantViolation. In all other cases the operation is a noop.

func (RuntimeInvariant) Must added in v0.10.0

func (RuntimeInvariant) Must(err error, args ...any)

Must raises an invariant error if the error is not nil. The content of the panic is both--via wrapping--an ErrInvariantViolation and the error itself.

func (RuntimeInvariant) OK added in v0.10.0

func (RuntimeInvariant) OK(cond bool, args ...any)

OK panics if the condition is false, passing an error that is rooted in InvariantViolation. Otherwise the operation is a noop.

type Transform added in v0.8.5

type Transform[T any, O any] func(context.Context, T) (O, error)

Transform is a function type that converts T objects int objects of type O.

func Converter added in v0.10.0

func Converter[T any, O any](op func(T) O) Transform[T, O]

Converter builds a Transform function out of an equivalent function that doesn't take a context or return an error.

func ConverterErr added in v0.10.0

func ConverterErr[T any, O any](op func(T) (O, error)) Transform[T, O]

ConverterErr constructs a Transform function from an analogous function that does not take a context.

func ConverterOK added in v0.10.0

func ConverterOK[T any, O any](op func(T) (O, bool)) Transform[T, O]

ConverterOK builds a Transform function from a function that converts between types T and O, but that returns a boolean/check value. When false the transform function returns a ErrIteratorSkip error.

func (Transform[T, O]) Block added in v0.10.0

func (mpf Transform[T, O]) Block() func(T) (O, error)

Block calls the transform function passing a context that cannot expire.

func (Transform[T, O]) BlockCheck added in v0.10.0

func (mpf Transform[T, O]) BlockCheck() func(T) (O, bool)

BlockCheck calls the function with a context that cannot be canceled. The second value is true as long as the transform function returns a nil error and false in all other cases

func (Transform[T, O]) Convert added in v0.10.0

func (mpf Transform[T, O]) Convert(iter *Iterator[T]) *Iterator[O]

Convert takes an iterator and runs the transformer over every item, producing a new iterator with the output values. The processing operation respects ErrIteratorSkip.

func (Transform[T, O]) Lock added in v0.10.0

func (mpf Transform[T, O]) Lock() Transform[T, O]

Lock returns a Transform function that's executed the root function inside of the sope of a mutex.

func (Transform[T, O]) Pipe added in v0.10.0

func (mpf Transform[T, O]) Pipe() (in Processor[T], out Producer[O], closer func())

Pipe creates a Processor (input)/ Producer (output) pair that has data processed by the Transform function. The pipe has a buffer of one item and is never closed, and both input and output operations are blocking. The closer function will abort the connection and cause all successive operations to return io.EOF.

func (Transform[T, O]) ProcessParallel added in v0.10.0

func (mpf Transform[T, O]) ProcessParallel(
	iter *Iterator[T],
	optp ...OptionProvider[*WorkerGroupConf],
) *Iterator[O]

ProcessParallel runs the input iterator through the transform operation and produces an output iterator, much like convert. However, the ProcessParallel implementation has configurable parallelism, and error handling with the WorkerGroupConf options.

func (Transform[T, O]) Producer added in v0.10.0

func (mpf Transform[T, O]) Producer(prod Producer[T]) Producer[O]

Producer processes an input iterator with the Transform function. Each call to the producer returns one value from the input producer after processing the item with the transform function. The Producer returns any error encountered during these operations (input, transform, output) to its caller *except* ErrIteratorSkip, which is respected.

func (Transform[T, O]) Safe added in v0.10.0

func (mpf Transform[T, O]) Safe() Transform[T, O]

Safe returns a Transform function that catches a panic, converts the panic object to an error if needed, and aggregates that with the Transform function's error.

func (Transform[T, O]) WithLock added in v0.10.0

func (mpf Transform[T, O]) WithLock(mu *sync.Mutex) Transform[T, O]

WithLock returns a Transform function inside of the scope of the provided mutex.

func (Transform[T, O]) Worker added in v0.10.0

func (mpf Transform[T, O]) Worker(in Producer[T], out Processor[O]) Worker

Worker collects a Produer and Processor pair, and returns a worker that processes the input collected by the Processorand returns it to the Producer. This operation runs until the producer, transformer, or processor returns an error. ErrIteratorSkip errors are respected, while io.EOF errors cause the Worker to abort but are not propogated to the caller.

type WaitGroup added in v0.6.3

type WaitGroup struct {
	// contains filtered or unexported fields
}

WaitGroup works like sync.WaitGroup, except that the Wait method takes a context (and can be passed as a fun.Operation). The implementation is exceptionally simple. The only constraint, like sync.WaitGroup, is that you can never modify the value of the internal counter such that it is negative, event transiently. The implementation does not require background resources aside from Wait, which creates a single goroutine that lives for the entire time that Wait is running, but no other background resources are created. Multiple copies of Wait can be safely called at once, and the WaitGroup is reusable more than once.

This implementation is about 50% slower than sync.WaitGroup after informal testing. It provides a little extra flexiblity and introspection, with similar semantics, that may be worth the additional performance hit.

func (*WaitGroup) Add added in v0.8.0

func (wg *WaitGroup) Add(num int)

Add modifies the internal counter. Raises an ErrInvariantViolation error if any modification causes the internal coutner to be less than 0.

func (*WaitGroup) DoTimes added in v0.10.0

func (wg *WaitGroup) DoTimes(ctx context.Context, n int, op Operation)

DoTimes uses the WaitGroup to launch an operation in a worker pool of the specified size, and blocks until all operations return or the context is canceled.

func (*WaitGroup) Done added in v0.8.0

func (wg *WaitGroup) Done()

Done marks a single operation as done.

func (*WaitGroup) IsDone added in v0.8.0

func (wg *WaitGroup) IsDone() bool

IsDone returns true if there is pending work, and false otherwise.

func (*WaitGroup) Launch added in v0.10.0

func (wg *WaitGroup) Launch(ctx context.Context, op Operation)

Launch adds an operation to the WaitGroup and starts the operation in a go routine.

func (*WaitGroup) Num added in v0.8.0

func (wg *WaitGroup) Num() int

Num returns the number of pending workers.

func (*WaitGroup) Operation added in v0.10.0

func (wg *WaitGroup) Operation() Operation

Operation returns with WaitGroups Wait method as a Operation.

func (*WaitGroup) Wait added in v0.8.0

func (wg *WaitGroup) Wait(ctx context.Context)

Wait blocks until either the context is canceled or all items have completed.

Wait is pasable or usable as a fun.Operation.

In many cases, callers should not rely on the Wait operation returning after the context expires: If Done() calls are used in situations that respect a context cancellation, aborting the Wait on a context cancellation, particularly when Wait gets a context that has the same lifecycle as the operations its waiting on, the result is that worker routines will leak. Nevertheless, in some situations, when workers may take a long time to respond to a context cancellation, being able to set a second deadline on Waiting may be useful.

Consider using `fun.Operation(wg.Wait).Block()` if you want blocking semantics with the other features of this WaitGroup implementation.

func (*WaitGroup) Worker added in v0.10.0

func (wg *WaitGroup) Worker() Worker

Worker returns a worker that will block on the wait group returning and return the context's error if one exits.

type Worker added in v0.10.0

type Worker func(context.Context) error

Worker represents a basic function used in worker pools and other similar situations

func Pipe added in v0.10.0

func Pipe[T any](from Producer[T], to Processor[T]) Worker

Pipe sends the output of a producer into the processor as if it were a pipe. As a Worker this operation is delayed until the worker is called.

If both operations succeed, then the worker will return nil. If either function returns an error, it is cached, and successive calls to the worker will return the same error. The only limitation is that there is no way to distinguish between an error encountered by the Producer and one by the processor.

If the producer returns ErrIteratorSkip, it will be called again.

func WorkerFuture added in v0.10.0

func WorkerFuture(ch <-chan error) Worker

WorkerFuture constructs a worker from an error channel. The resulting worker blocks until an error is produced in the error channel, the error channel is closed, or the worker's context is canceled. If the channel is closed, the worker will return a nil error, and if the context is canceled, the worker will return a context error. In all other cases the work will propagate the error (or nil) received from the channel.

You can call the resulting worker function more than once: if there are multiple errors produced or passed to the channel, they will be propogated; however, after the channel is closed subsequent calls to the worker function will return nil.

func (Worker) After added in v0.10.0

func (wf Worker) After(ts time.Time) Worker

After returns a Worker that blocks until the timestamp provided is in the past. Additional calls to this worker will run immediately. If the timestamp is in the past the resulting worker will run immediately.

func (Worker) Background added in v0.10.0

func (wf Worker) Background(ctx context.Context, ob Handler[error])

Background starts the worker function in a go routine, passing the error to the provided observer function.

func (Worker) Block added in v0.10.0

func (wf Worker) Block() error

Block executes the worker function with a context that will never expire and returns the error. Use with caution.

func (Worker) Check added in v0.10.0

func (wf Worker) Check(ctx context.Context) bool

Check runs the worker and returns true (ok) if there was no error, and false otherwise.

func (Worker) Delay added in v0.10.0

func (wf Worker) Delay(dur time.Duration) Worker

Delay wraps a Worker in a function that will always wait for the specified duration before running.

If the value is negative, then there is always zero delay.

func (Worker) FilterErrors added in v0.10.0

func (wf Worker) FilterErrors(ef ers.Filter) Worker

Filter wraps the worker with a Worker that passes the output of the root Worker's error and returns the output of the filter.

The ers package provides a number of filter implementations but any function in the following form works:

func(error) error

func (Worker) Future added in v0.10.0

func (wf Worker) Future(ctx context.Context) Worker

Future runs the worker function in a go routine and returns a new fun.Worker which will block for the context to expire or the background worker to complete, which returns the error from the background request.

The underlying worker begins executing before future returns.

func (Worker) If added in v0.10.0

func (wf Worker) If(cond bool) Worker

If returns a Worker function that runs only if the condition is true. The error is always nil if the condition is false. If-ed functions may be called more than once, and will run multiple times potentiall.y

func (Worker) Ignore added in v0.10.0

func (wf Worker) Ignore() Operation

Ignore converts the worker into a Operation that discards the error produced by the worker.

func (Worker) Jitter added in v0.10.0

func (wf Worker) Jitter(jf func() time.Duration) Worker

Jitter wraps a Worker that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the Worker.

If the function produces a negative duration, there is no delay.

func (Worker) Join added in v0.10.0

func (wf Worker) Join(next Worker) Worker

Join melds two workers, calling the second worker if the first succeeds and the context hasn't been canceled.

func (Worker) Limit added in v0.10.0

func (wf Worker) Limit(n int) Worker

Limit produces a worker than runs exactly n times. Each execution is isolated from every other, but once the limit is exceeded, the result of the *last* worker to execute is cached concurrent access to that value is possible.

func (Worker) Lock added in v0.10.0

func (wf Worker) Lock() Worker

Lock produces a Worker that will be executed within the scope of a (managed) mutex.

func (Worker) Must added in v0.10.0

func (wf Worker) Must() Operation

Must converts a Worker function into a wait function; however, if the worker produces an error Must converts the error into a panic.

func (Worker) Observe added in v0.10.0

func (wf Worker) Observe(ctx context.Context, ob Handler[error])

Observe runs the worker function, and observes the error (or nil response). Panics are converted to errors for both the worker function but not the observer function.

func (Worker) Once added in v0.10.0

func (wf Worker) Once() Worker

Once wraps the Worker in a function that will execute only once. The return value (error) is cached, and can be accessed many times without re-running the worker.

func (Worker) Operation added in v0.10.0

func (wf Worker) Operation(ob Handler[error]) Operation

Operation converts a worker function into a wait function, passing any error to the observer function. Only non-nil errors are observed.

func (Worker) PostHook added in v0.10.0

func (wf Worker) PostHook(op func()) Worker

PostHook runs hook operation after the worker function completes. If the hook panics it is converted to an error and aggregated with workers's error.

func (Worker) PreHook added in v0.10.0

func (wf Worker) PreHook(op Operation) Worker

PreHook returns a Worker that runs an operatio unconditionally before running the underlying worker. If the hook function panics it is converted to an error and aggregated with the worker's error.

func (Worker) Run added in v0.10.0

func (wf Worker) Run(ctx context.Context) error

Run is equivalent to calling the worker function directly.

func (Worker) Safe added in v0.10.0

func (wf Worker) Safe() Worker

Safe produces a worker function that converts the worker function's panics to errors.

func (Worker) Signal added in v0.10.0

func (wf Worker) Signal(ctx context.Context) <-chan error

Signal runs the worker function in a background goroutine and returns the error in an error channel, that returns when the worker function returns. If Signal is called with a canceled context the worker is still executed (with that context.)

A value, possibly nil, is always sent through the channel, though the fun.Worker runs in a different go routine, a panic handler will convert panics to errors.

func (Worker) StartGroup added in v0.10.0

func (wf Worker) StartGroup(ctx context.Context, n int) Worker

StartGroup starts n copies of the worker operation and returns a future/worker that returns the aggregated errors from all workers

The operation is fundamentally continue-on-error. To get abort-on-error semantics, use the Filter() method on the input worker, that cancels the context on when it sees an error.

func (Worker) TTL added in v0.10.0

func (wf Worker) TTL(dur time.Duration) Worker

TTL produces a worker that will only run once during every specified duration, when called more than once. During the interval between calls, the previous error is returned. While each execution of the root worker is protected by a mutex, the resulting worker can be used in parallel during the intervals between calls.

func (Worker) When added in v0.10.0

func (wf Worker) When(cond func() bool) Worker

When wraps a Worker function that will only run if the condition function returns true. If the condition is false the worker does not execute. The condition function is called in between every operation.

When worker functions may be called more than once, and will run multiple times potentiall.

func (Worker) While added in v0.10.0

func (wf Worker) While() Worker

func (Worker) WithCancel added in v0.10.0

func (wf Worker) WithCancel() (Worker, context.CancelFunc)

WithCancel creates a Worker and a cancel function which will terminate the context that the root Worker is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Worker is canceled.)

func (Worker) WithLock added in v0.10.0

func (wf Worker) WithLock(mtx *sync.Mutex) Worker

Lock produces a Worker that will be executed within the scope of the provided mutex.

func (Worker) WithoutErrors added in v0.10.0

func (wf Worker) WithoutErrors(errs ...error) Worker

WithoutErrors returns a worker that will return nil if the error returned by the worker is one of the errors passed to WithoutErrors.

type WorkerGroupConf added in v0.10.0

type WorkerGroupConf struct {
	// NumWorkers describes the number of parallel workers
	// processing the incoming iterator items and running the map
	// function. All values less than 1 are converted to 1. Any
	// value greater than 1 will result in out-of-sequence results
	// in the output iterator.
	NumWorkers int
	// ContinueOnPanic prevents the operations from halting when a
	// single processing function panics. In all modes mode panics
	// are converted to errors and propagated to the output
	// iterator's Close() method,.
	ContinueOnPanic bool
	// ContinueOnError allows a processing function to return an
	// error and allow the work of the broader operation to
	// continue. Errors are aggregated propagated to the output
	// iterator's Close() method.
	ContinueOnError bool
	// IncludeContextExpirationErrors changes the default handling
	// of context cancellation errors. By default all errors
	// rooted in context cancellation are not propagated to the
	// Close() method, however, when true, these errors are
	// captured. All other error handling semantics
	// (e.g. ContinueOnError) are applicable.
	IncludeContextExpirationErrors bool
	// ExcludedErrors is a list of that should not be included
	// in the collected errors of the
	// output. fun.ErrRecoveredPanic is always included and io.EOF
	// is never included.
	ExcludedErrors []error
	// ErrorHandler is used to collect and aggregate errors in
	// the collector. For operations with shorter runtime
	// `erc.Collector.Add` is a good choice, though different
	// strategies may make sense in different
	// cases. (erc.Collector has a mutex and stories collected
	// errors in memory.)
	ErrorHandler Handler[error]
	// ErrorResolver should return an aggregated error collected
	// during the execution of worker
	// threads. `erc.Collector.Resolve` suffices when collecting
	// with an erc.Collector.
	ErrorResolver func() error
}

WorkerGroupConf describes the runtime options to several operations operations. The zero value of this struct provides a usable strict operation.

func (WorkerGroupConf) CanContinueOnError added in v0.10.0

func (o WorkerGroupConf) CanContinueOnError(err error) bool

CanContinueOnError checks an error, collecting it as needed using the WorkerGroupConf, and then returning true if processing should continue and false otherwise.

Neither io.EOF nor EerrIteratorSkip errors are ever observed. All panic errors are observed. Context cancellation errors are observed only when configured. as well as context cancellation errors when configured.

func (*WorkerGroupConf) Validate added in v0.10.0

func (o *WorkerGroupConf) Validate() error

Validate ensures that the configuration is valid, and returns an error if there are impossible configurations

Directories

Path Synopsis
Package adt provides "atomic data types" as strongly-typed generic helpers for simple atomic operations (including sync.Map, sync.Pool, and a typed value).
Package adt provides "atomic data types" as strongly-typed generic helpers for simple atomic operations (including sync.Map, sync.Pool, and a typed value).
Package assert provides an incredibly simple assertion framework, that relies on generics and simplicity.
Package assert provides an incredibly simple assertion framework, that relies on generics and simplicity.
check
GENERATED FILE FROM ASSERTION PACKAGE
GENERATED FILE FROM ASSERTION PACKAGE
dt
package dt provides container type implementations and interfaces.
package dt provides container type implementations and interfaces.
cmp
Package cmp provides comparators for sorting linked lists.
Package cmp provides comparators for sorting linked lists.
is
Package is contains a simple assertion library for the fun/ensure testing framework.
Package is contains a simple assertion library for the fun/ensure testing framework.
Package erc provides a simple/fast error aggregation tool for collecting and aggregating errors.
Package erc provides a simple/fast error aggregation tool for collecting and aggregating errors.
Package ers provides some very basic error aggregating and handling tools, as a companion to erc.
Package ers provides some very basic error aggregating and handling tools, as a companion to erc.
Package intish provides a collection of strongly type integer arithmetic operations, to make it possible to avoid floating point math for simple operations when desired.
Package intish provides a collection of strongly type integer arithmetic operations, to make it possible to avoid floating point math for simple operations when desired.
Package itertool provides a set of functional helpers for managinging and using fun.Iterators, including a parallel processing, generators, Map/Reduce, Merge, and other convenient tools.
Package itertool provides a set of functional helpers for managinging and using fun.Iterators, including a parallel processing, generators, Map/Reduce, Merge, and other convenient tools.
Package pubsub provides a message broker for one-to-many or many-to-many message distribution.
Package pubsub provides a message broker for one-to-many or many-to-many message distribution.
Package risky contains a bunch of bad ideas for APIs and operations that will definitely lead to panics and deadlocks and incorrect behavior when used incorrectly.
Package risky contains a bunch of bad ideas for APIs and operations that will definitely lead to panics and deadlocks and incorrect behavior when used incorrectly.
Package srv provides a framework and toolkit for service orchestration.
Package srv provides a framework and toolkit for service orchestration.
Package testt (for test tools), provides a couple of useful helpers for common test patterns.
Package testt (for test tools), provides a couple of useful helpers for common test patterns.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL