fun

package module
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2025 License: Apache-2.0 Imports: 19 Imported by: 19

README

fun -- Core Library for Go Programming

Go Reference

fun is a simple, well-tested, zero-dependency "Core Library" for Go, with support for common patterns and paradigms. Stream processing, error handling, pubsub/message queues, and service architectures. If you've maintained a piece of Go software, you've probably written one-off versions of some of these tools: let's avoid needing to keep writing these tools.

fun aims to be easy to adopt: the interfaces (and implementations!) are simple and (hopefully!) easy to use. There are no external dependencies and all of the code is well-tested. You can (and should!) always adopt the tools that make the most sense to use for your project.

Use Cases and Highlights

  • Error Handling tools. The erc.Collector type allow you to wrap, annotate, and aggregate errors. This makes continue-on-error patterns simple to implement. Because erc.Collector can be used concurrently, handling errors in Go routines, becomes much less of a headache. In addition, the ers.Error type, as a string alias, permits const definition of errors.

  • Service Management. srv.Service handles the lifecycle of "background" processes inside of an application. You can now start services like HTTP servers, background monitoring and workloads, and sub-processes, and ensure that they exit cleanly (at the right time!) and that their errors propagate clearly back to the "main" thread.

  • Streams. The fun.Stream[T] type provides a stream/iterator, interface and tool kit for developing Go applications that lean heavily into streaming data and message-passing patterns. The associated fun.Generator[T] and fun.Handler[T] functions provide a comfortable abstraction for interacting with these data.

  • Pubsub. The pubsub package provides queue and message broker primitives for concurrent applications to support writing applications that use message-passing patterns inside of single application. The pubsub.Broker[T] provides one-to-many or many-to-many (e.g. broadcast) communication. The and deque and queue structures provide configurable queue abstractions to provide control for many workloads.

  • High-level data types: dt (data type) hosts a collection of wrappers and helpers, the dt.Ring[T] type provides a simple ring-buffer, in addition singl-ly (dt.Stack[T])and doubly linked lists (dt.List[T]). The adt package provides type-specific helpers and wrappers around Go's atomic/synchronization primitives, including the adt.Pool[T], adt.Map[T] and adt.Atomic[T] types. Finally the adt/shard.Map[T] provides a logical thread-safe hash map, that is sharded across a collection of maps to reduce mutex contention.

Packages

  • Data Types and Function Tools:
    • dt (generic container data-types, including ordered and unordered sets, singly and doubly linked list, as well as wrappers around maps and slices.)
    • adt (strongly typed atomic data structures, wrappers, tools, and operations.)
    • shard (a "sharded map" to reduce contention for multi-threaded access of synchronized maps.)
    • ft function tools: simple tools for handling function objects.
    • itertool (stream/iteration tools and helpers.)
  • Service Architecture:
    • srv (service orchestration and management framework.)
    • pubsub (message broker and concurrency-safe queue and deque.)
  • Error Handling:
    • erc (error collection, annotation, panic handling utilities for aggregating errors and managing panics, in concurrent contexts.)
    • ers (constant errors and low level error primitives used throughout the package.)
  • Test Infrastructure:
    • assert (minimal generic-based assertion library, in the tradition of testify.) The assertions in assert abort the flow of the test while check](https://pkg.go.dev/github.com/tychoish/fun/assert/check), provide non-critical assertions.
    • testt (testy; a collection of "nice to have" test helpers and utilities.)
    • ensure (an experimental test harness and orchestration tool with more "natural" assertions.)

Examples

(coming soon.)

Version History

There are no plans for a 1.0 release, though major backward-breaking changes increment the second (major) release value, and limited maintenance releases are possible/considered to maintain compatibility.

  • v0.12.0: go1.24 and greater. Major API impacting release. (current)
  • v0.11.0: go1.23 and greater. (maintained)
  • v0.10.0: go1.20 and greater. (maintained)
  • v0.9.0: go1.19 and greater.

There may be small changes to exported APIs within a release series, although, major API changes will increment the major release value.

Contribution

Contributions welcome, the general goals of the project:

  • superior API ergonomics.
  • great high-level abstractions.
  • obvious and clear implementations.
  • minimal dependencies.

Please feel free to open issues or file pull requests.

Have fun!

Documentation

Overview

Package fun is a zero-dependency collection of tools and idoms that takes advantage of generics. Streams, error handling, a native-feeling Set type, and a simple pub-sub framework for distributing messages in fan-out patterns.

Index

Constants

View Source
const ErrInvariantViolation ers.Error = ers.ErrInvariantViolation

ErrInvariantViolation is the root error of the error object that is the content of all panics produced by the Invariant helper.

View Source
const ErrNonBlockingChannelOperationSkipped ers.Error = ers.ErrCurrentOpSkip

ErrNonBlockingChannelOperationSkipped is returned when sending into a channel, in a non-blocking context, when the channel was full and the send or receive was therefore skipped.

View Source
const ErrRecoveredPanic ers.Error = ers.ErrRecoveredPanic

ErrRecoveredPanic is at the root of any error returned by a function in the fun package that recovers from a panic.

View Source
const ErrStreamContinue ers.Error = ers.ErrCurrentOpSkip

ErrStreamContinue instructs consumers of Streams and related processors that run groups. Equivalent to the "continue" keyword in other contexts.

Variables

View Source
var Invariant = RuntimeInvariant{}

Invariant provides a namespace for making runtime invariant assertions. These all raise panics, passing error objects from panic, which can be more easily handled. These helpers are syntactic sugar around Invariant.OK, and the invariant is violated the ErrInvariantViolation.

View Source
var MAKE = Constructors{}

MAKE provides namespaced access to the constructors provided by the Constructors type.

Functions

This section is empty.

Types

type ChanOp added in v0.10.0

type ChanOp[T any] struct {
	// contains filtered or unexported fields
}

ChanOp is a wrapper around a channel, to make it easier to write clear code that uses and handles basic operations with single channels. From a high level an operation might look like:

ch := make(chan string)
err := fun.Blocking().Send("hello world")

Methods on ChanOp and related structures are not pointer receivers, ensure that the output values are recorded as needed. Typically it's reasonable to avoid creating ChanOp objects in a loop as well.

func Blocking added in v0.8.5

func Blocking[T any](ch chan T) ChanOp[T]

Blocking produces a blocking Send instance. All Send/Check/Ignore operations will block until the context is canceled, the channel is canceled, or the send succeeds.

func Chan added in v0.10.4

func Chan[T any](args ...int) ChanOp[T]

Chan constructs a channel op, like "make(chan T)", with the optionally specified length. Operations (like read from and write to a channel) on the channel are blocking by default, but the

func DefaultChan added in v0.10.4

func DefaultChan[T any](input chan T, args ...int) ChanOp[T]

DefaultChan takes a channel value and if it is non-nil, returns it; otherwise it constructs a new ChanOp of the specified type with the optionally provided length and returns it.

func NonBlocking added in v0.8.5

func NonBlocking[T any](ch chan T) ChanOp[T]

NonBlocking produces a send instance that performs a non-blocking send.

The Send() method, for non-blocking sends, will return ErrSkipedNonBlockingSend if the channel was full and the object was not sent.

func (ChanOp[T]) Blocking added in v0.10.4

func (op ChanOp[T]) Blocking() ChanOp[T]

Blocking returns a version of the ChanOp in blocking mode. This is not an atomic operation.

func (ChanOp[T]) Cap added in v0.10.5

func (op ChanOp[T]) Cap() int

Cap returns the current capacity of the channel.

func (ChanOp[T]) Channel added in v0.10.0

func (op ChanOp[T]) Channel() chan T

Channel returns the underlying channel.

func (ChanOp[T]) Close added in v0.10.0

func (op ChanOp[T]) Close()

Close closes the underlying channel.

This swallows any panic encountered when calling close() on the underlying channel, which makes it safe to call on nil or already-closed channels: the result in all cases (that the channel is closed when Close() returns, is the same in all cases.)

func (ChanOp[T]) Generator added in v0.12.0

func (op ChanOp[T]) Generator() Generator[T]

Generator expoess the "receive" aspect of the channel as a Generator function.

func (ChanOp[T]) Handler added in v0.12.0

func (op ChanOp[T]) Handler() Handler[T]

Handler exposes the "send" aspect of the channel as a Handler function.

func (ChanOp[T]) Iterator added in v0.10.0

func (op ChanOp[T]) Iterator(ctx context.Context) iter.Seq[T]

func (ChanOp[T]) Len added in v0.10.5

func (op ChanOp[T]) Len() int

Len returns the current length of the channel.

func (ChanOp[T]) NonBlocking added in v0.10.4

func (op ChanOp[T]) NonBlocking() ChanOp[T]

NonBlocking returns a version of the ChanOp in non-blocking mode. This is not an atomic operation.

func (ChanOp[T]) Pipe added in v0.10.0

func (op ChanOp[T]) Pipe() (Handler[T], Generator[T])

Pipe creates a linked pair of functions for transmitting data via these interfaces.

func (ChanOp[T]) Receive added in v0.10.0

func (op ChanOp[T]) Receive() ChanReceive[T]

Receive returns a ChanReceive object that acts on the same underlying sender.

func (ChanOp[T]) Send added in v0.10.0

func (op ChanOp[T]) Send() ChanSend[T]

Send returns a ChanSend object that acts on the same underlying channel

func (ChanOp[T]) Seq2 added in v0.11.0

func (op ChanOp[T]) Seq2(ctx context.Context) iter.Seq2[int, T]

func (ChanOp[T]) Stream added in v0.12.0

func (op ChanOp[T]) Stream() *Stream[T]

Stream returns the "receive" aspect of the channel as an stream. This is equivalent to fun.ChannelStream(), but may be more accessible in some contexts.

type ChanReceive added in v0.10.0

type ChanReceive[T any] struct {
	// contains filtered or unexported fields
}

ChanReceive wraps a channel fore <-chan T operations. It is the type returned by the ChanReceive() method on ChannelOp. The primary method is Read(), with other methods provided as "self-documenting" helpers.

func BlockingReceive added in v0.10.0

func BlockingReceive[T any](ch <-chan T) ChanReceive[T]

BlockingReceive is the equivalent of Blocking(ch).Receive(), except that it accepts a receive-only channel.

func NonBlockingReceive added in v0.10.0

func NonBlockingReceive[T any](ch <-chan T) ChanReceive[T]

NonBlockingReceive is the equivalent of NonBlocking(ch).Receive(), except that it accepts a receive-only channel.

func (ChanReceive[T]) Check added in v0.10.0

func (ro ChanReceive[T]) Check(ctx context.Context) (T, bool)

Check performs the read operation and converts the error into an "ok" value, returning true if receive was successful and false otherwise.

func (ChanReceive[T]) Drop added in v0.10.0

func (ro ChanReceive[T]) Drop(ctx context.Context) bool

Drop performs a read operation and drops the response. If an item was dropped (e.g. Read would return an error), Drop() returns false, and true when the Drop was successful.

func (ChanReceive[T]) Filter added in v0.10.5

func (ro ChanReceive[T]) Filter(ctx context.Context, filter func(T) bool) ChanReceive[T]

Filter returns a channel that consumes the output of a channel and returns a NEW channel that only contains elements that have elements that the filter function returns true for.

func (ChanReceive[T]) Force added in v0.10.0

func (ro ChanReceive[T]) Force(ctx context.Context) (out T)

Force ignores the error returning only the value from Read. This is either the value sent through the channel, or the zero value for T. Because zero values can be sent through channels, Force does not provide a way to distinguish between "channel-closed" and "received a zero value".

func (ChanReceive[T]) Generator added in v0.12.0

func (ro ChanReceive[T]) Generator() Generator[T]

Generator returns the Read method as a generator for integration into existing tools.

func (ChanReceive[T]) Handle added in v0.12.0

func (ro ChanReceive[T]) Handle(op Handler[T]) Worker

Handle returns a Worker function that processes the output of data from the channel with the Handler function. If the processor function returns ErrStreamContinue, the processing will continue. All other Handler errors (and problems reading from the channel,) abort stream. io.EOF errors are not propagated to the caller.

func (ChanReceive[T]) Ignore added in v0.10.0

func (ro ChanReceive[T]) Ignore(ctx context.Context)

Ignore reads one item from the channel and discards it.

func (ChanReceive[T]) Iterator added in v0.10.0

func (ro ChanReceive[T]) Iterator(ctx context.Context) iter.Seq[T]

Iterator provides access to the contents of the channel as a new-style standard library stream. For ChanRecieve objects in non-blocking mode, iteration ends when there are no items in the channel. In blocking mode, iteration ends when the context is canceled or the channel is closed.

func (ChanReceive[T]) IteratorIndexed added in v0.12.0

func (ro ChanReceive[T]) IteratorIndexed(ctx context.Context) iter.Seq2[int, T]

IteratorIndexed provides access to the contents of the channel as a new-style standard library stream. For ChanRecieve objects in non-blocking mode, iteration ends when there are no items in the channel. In blocking mode, iteration ends when the context is canceled or the channel is closed.

The number is the index/counter of the items in the interator

func (ChanReceive[T]) Ok added in v0.10.0

func (ro ChanReceive[T]) Ok() bool

Ok attempts to read from a channel returns true either when the channel is blocked or an item is read from the channel and false when the channel has been closed.

func (ChanReceive[T]) Read added in v0.10.0

func (ro ChanReceive[T]) Read(ctx context.Context) (T, error)

Read performs the read operation according to the blocking/non-blocking semantics of the receive operation.

In general errors are either: io.EOF if channel is closed; a context cancellation error if the context passed to Read() is canceled, or ErrSkippedNonBlockingChannelOperation in the non-blocking case if the channel was empty.

In all cases when Read() returns an error, the return value is the zero value for T.

func (ChanReceive[T]) Stream added in v0.12.0

func (ro ChanReceive[T]) Stream() *Stream[T]

Stream provides access to the contents of the channel as a fun-style stream. For ChanRecieve objects in non-blocking mode, iteration ends when there are no items in the channel. In blocking mode, iteration ends when the context is canceled or the channel is closed.

type ChanSend added in v0.10.0

type ChanSend[T any] struct {
	// contains filtered or unexported fields
}

ChanSend provides access to channel send operations, and is contstructed by the ChanSend() method on the channel operation. The primary method is Write(), with other methods provided for clarity.

func BlockingSend added in v0.10.0

func BlockingSend[T any](ch chan<- T) ChanSend[T]

BlockingSend is equivalent to Blocking(ch).Send() except that it accepts a send-only channel.

func NonBlockingSend added in v0.10.0

func NonBlockingSend[T any](ch chan<- T) ChanSend[T]

NonBlockingSend is equivalent to NonBlocking(ch).Send() except that it accepts a send-only channel.

func (ChanSend[T]) Check added in v0.10.0

func (sm ChanSend[T]) Check(ctx context.Context, it T) bool

Check performs a send and returns true when the send was successful and false otherwise.

func (ChanSend[T]) Consume added in v0.10.0

func (sm ChanSend[T]) Consume(iter *Stream[T]) Worker

Consume returns a worker that, when executed, pushes the content from the stream into the channel.

func (ChanSend[T]) Handler added in v0.12.0

func (sm ChanSend[T]) Handler() Handler[T]

Handler returns the Write method as a processor for integration into existing tools

func (ChanSend[T]) Ignore added in v0.10.0

func (sm ChanSend[T]) Ignore(ctx context.Context, it T)

Ignore performs a send and omits the error.

func (ChanSend[T]) Signal added in v0.10.0

func (sm ChanSend[T]) Signal(ctx context.Context)

Signal attempts to sends the Zero value of T through the channel and returns when: the send succeeds, the channel is full and this is a non-blocking send, the context is canceled, or the channel is closed.

func (ChanSend[T]) Write added in v0.10.0

func (sm ChanSend[T]) Write(ctx context.Context, it T) (err error)

Write sends the item into the channel captured by Blocking/NonBlocking returning the appropriate error.

The returned error is nil if the send was successful, and an io.EOF if the channel is closed (or nil) rather than a panic (as with the equivalent direct operation.) The error value is a context cancelation error when the context is canceled, and for non-blocking sends, if the channel did not accept the write, ErrSkippedNonBlockingChannelOperation is returned.

func (ChanSend[T]) Zero added in v0.10.0

func (sm ChanSend[T]) Zero(ctx context.Context) error

Zero sends the zero value of T through the channel.

type Constructors added in v0.12.0

type Constructors struct{}

The Constructors type serves to namespace constructors of common operations and specializations of generic functions provided by this package.

func (Constructors) Atoi added in v0.12.0

func (Constructors) Atoi() Converter[string, int]

Atoi produces a Transform function that converts strings into integers.

func (Constructors) ConvertErrorsToStrings added in v0.12.0

func (Constructors) ConvertErrorsToStrings() Converter[[]error, []string]

ConvertErrorsToStrings makes a Converter function that translates slices of errors to slices of errors.

func (Constructors) Counter added in v0.12.0

func (Constructors) Counter(maxVal int) *Stream[int]

Counter produces a stream that, starting at 1, yields monotonically increasing integers until the maximum is reached.

func (Constructors) ErrorChannelWorker added in v0.12.0

func (Constructors) ErrorChannelWorker(ch <-chan error) Worker

ErrorChannelWorker constructs a worker from an error channel. The resulting worker blocks until an error is produced in the error channel, the error channel is closed, or the worker's context is canceled. If the channel is closed, the worker will return a nil error, and if the context is canceled, the worker will return a context error. In all other cases the work will propagate the error (or nil) received from the channel.

You can call the resulting worker function more than once: if there are multiple errors produced or passed to the channel, they will be propogated; however, after the channel is closed subsequent calls to the worker function will return nil.

func (Constructors) ErrorHandler added in v0.12.0

func (Constructors) ErrorHandler(of fn.Handler[error]) fn.Handler[error]

ErrorHandler constructs an error observer that only calls the wrapped observer when the error passed is non-nil.

func (Constructors) ErrorHandlerWithAbort added in v0.12.0

func (Constructors) ErrorHandlerWithAbort(cancel context.CancelFunc) fn.Handler[error]

ErrorHandlerWithAbort creates a new error handler that--ignoring nil and context expiration errors--will call the provided context cancellation function when it receives an error.

Use the Chain and Join methods of handlers to further process the error.

func (Constructors) ErrorHandlerWithoutCancelation added in v0.12.0

func (Constructors) ErrorHandlerWithoutCancelation(of fn.Handler[error]) fn.Handler[error]

ErrorHandlerWithoutCancelation wraps and returns an error handler that filters all nil errors and errors that are rooted in context Cancellation from the wrapped Handler.

func (Constructors) ErrorHandlerWithoutTerminating added in v0.12.0

func (Constructors) ErrorHandlerWithoutTerminating(of fn.Handler[error]) fn.Handler[error]

ErrorHandlerWithoutTerminating wraps an error observer and only calls the underlying observer if the input error is non-nil and is not one of the "terminating" errors used by this package (e.g. io.EOF and similar errors). Context cancellation errors can and should be filtered separately.

func (Constructors) ErrorStream added in v0.12.0

func (Constructors) ErrorStream(ec *erc.Collector) *Stream[error]

ErrorStream provides a stream that provides access to the error collector.

func (Constructors) ErrorUnwindTransformer added in v0.12.0

func (Constructors) ErrorUnwindTransformer(filter erc.Filter) Converter[error, []error]

ErrorUnwindTransformer provides the ers.Unwind operation as a transform method, which consumes an error and produces a slice of its component errors. All errors are processed by the provided filter, and the transformer's context is not used. The error value of the Transform function is always nil.

func (Constructors) Itoa added in v0.12.0

func (Constructors) Itoa() Converter[int, string]

Itoa produces a Transform function that converts integers into strings.

func (Constructors) Lines added in v0.12.0

func (Constructors) Lines(reader io.Reader) *Stream[string]

Lines provides a fun.Stream access over the contexts of a (presumably plaintext) io.Reader, using the bufio.Scanner.

func (Constructors) LinesWithSpaceTrimed added in v0.12.0

func (Constructors) LinesWithSpaceTrimed(reader io.Reader) *Stream[string]

LinesWithSpaceTrimed provides a stream with access to the line-separated content of an io.Reader, line Lines(), but with the leading and trailing space trimmed from each line.

func (Constructors) OperationPool added in v0.12.0

func (Constructors) OperationPool(iter *Stream[Operation]) Operation

OperationPool returns a Operation that, when called, processes the incoming stream of Operations, starts a go routine for running each element in the stream, (without any throttling or rate limiting) and then blocks until all operations have returned, or the context passed to the output function has been canceled.

For more configuraable options, use the itertool.Worker() function which provides more configurability and supports both Operation and Worker functions.

func (Constructors) ProcessOperation added in v0.12.0

func (Constructors) ProcessOperation() Handler[Operation]

ProcessOperation constructs a Handler function for running Worker functions. Use in combination with Process and ProcessParallel, and to build worker pools.

The Handlers type serves to namespace these constructors, for interface clarity purposes. Use the HF variable to access this method as in:

fun.MAKE.ProcessOperation()

func (Constructors) Recover added in v0.12.0

func (Constructors) Recover(ob fn.Handler[error])

Recover catches a panic, turns it into an error and passes it to the provided observer function.

func (Constructors) Signal added in v0.12.0

func (Constructors) Signal() (func(), Worker)

Signal is a wrapper around the common pattern where signal channels are closed to pass termination and blocking notifications between go routines. The constructor returns two functions: a closer operation--func()--and a Worker that waits for the closer to be triggered.

The closer is safe to call multiple times. The worker ALWAYS returns the context cancellation error if its been canceled even if the signal channel was closed.

func (Constructors) Sprint added in v0.12.0

func (Constructors) Sprint(args ...any) fn.Future[string]

Sprint constructs a future that calls fmt.Sprint over the given variadic arguments.

func (Constructors) Sprintf added in v0.12.0

func (Constructors) Sprintf(tmpl string, args ...any) fn.Future[string]

Sprintf produces a future that calls and returns fmt.Sprintf for the provided arguments when the future is called.

func (Constructors) Sprintln added in v0.12.0

func (Constructors) Sprintln(args ...any) fn.Future[string]

Sprintln constructs a future that calls fmt.Sprintln over the given variadic arguments.

func (Constructors) Str added in v0.12.0

func (Constructors) Str(args []any) fn.Future[string]

Str provides a future that calls fmt.Sprint over a slice of any objects. Use fun.MAKE.Sprint for a variadic alternative.

func (Constructors) StrConcatinate added in v0.12.0

func (Constructors) StrConcatinate(strs ...string) fn.Future[string]

StrConcatinate produces a future that joins a variadic sequence of strings into a single string.

func (Constructors) StrJoin added in v0.12.0

func (Constructors) StrJoin(args []any) fn.Future[string]

StrJoin like Strln and Sprintln create a concatenated string representation of a sequence of values, however StrJoin omits the final new line character that Sprintln adds. This is similar in functionality MAKE.Sprint() or MAKE.Str() but ALWAYS adds a space between elements.

func (Constructors) StrSliceConcatinate added in v0.12.0

func (Constructors) StrSliceConcatinate(input []string) fn.Future[string]

StrSliceConcatinate produces a future for strings.Join(), concatenating the elements in the input slice with the provided separator.

func (Constructors) Strf added in v0.12.0

func (Constructors) Strf(tmpl string, args []any) fn.Future[string]

Strf produces a future that calls fmt.Sprintf for the given template string and arguments.

func (Constructors) Stringer added in v0.12.0

func (Constructors) Stringer(op fmt.Stringer) fn.Future[string]

Stringer converts a fmt.Stringer object/method call into a string-formatter.

func (Constructors) StringsJoin added in v0.12.0

func (Constructors) StringsJoin(strs []string, sep string) fn.Future[string]

StringsJoin produces a future that combines a slice of strings into a single string, joined with the separator.

func (Constructors) Strln added in v0.12.0

func (Constructors) Strln(args []any) fn.Future[string]

Strln constructs a future that calls fmt.Sprintln for the given arguments.

func (Constructors) WorkerPool added in v0.12.0

func (Constructors) WorkerPool(iter *Stream[Worker]) Worker

WorkerPool creates a work that processes a stream of worker functions, for simple and short total-duration operations. Every worker in the pool runs in it's own go routine, and there are no limits or throttling on the number of go routines. All errors are aggregated and in a single collector (erc.Stack) which is returned by the worker when the operation ends (if many Worker's error this may create memory pressure) and there's no special handling of panics.

For more configuraable options, use the itertool.Worker() function which provides more configurability and supports both Operation and Worker functions.

type Converter added in v0.10.0

type Converter[T any, O any] func(context.Context, T) (O, error)

Converter is a function type that converts T objects int objects of type O.

func MakeConverter added in v0.12.0

func MakeConverter[T any, O any](op func(T) O) Converter[T, O]

MakeConverter builds a Transform function out of an equivalent function that doesn't take a context or return an error.

func MakeConverterErr added in v0.12.0

func MakeConverterErr[T any, O any](op func(T) (O, error)) Converter[T, O]

MakeConverterErr constructs a Transform function from an analogous function that does not take a context.

func MakeCovnerterOk added in v0.12.0

func MakeCovnerterOk[T any, O any](op func(T) (O, bool)) Converter[T, O]

MakeCovnerterOk builds a Transform function from a function that converts between types T and O, but that returns a boolean/check value. When the converter function returns false the transform function returns a ErrStreamContinue error.

func (Converter[T, O]) Convert added in v0.12.0

func (mpf Converter[T, O]) Convert(ctx context.Context, in T) (O, error)

Convert uses the converter function to transform a value from one type (T) to another (O).

func (Converter[T, O]) Generator added in v0.12.0

func (mpf Converter[T, O]) Generator(prod Generator[T]) Generator[O]

Generator processes an input generator function with the Transform function. Each call to the output generator returns one value from the input generator after processing the item with the transform function applied. The output generator returns any error encountered during these operations (input, transform, output) to its caller *except* ErrStreamContinue, which is respected.

func (Converter[T, O]) Lock added in v0.12.0

func (mpf Converter[T, O]) Lock() Converter[T, O]

Lock returns a Transform function that's executed the root function inside of the sope of a mutex.

func (Converter[T, O]) Parallel added in v0.12.0

func (mpf Converter[T, O]) Parallel(
	iter *Stream[T],
	opts ...OptionProvider[*WorkerGroupConf],
) *Stream[O]

Parallel runs the input stream through the transform operation and produces an output stream, much like convert. However, the Parallel implementation has configurable parallelism, and error handling with the WorkerGroupConf options.

func (Converter[T, O]) Stream added in v0.12.0

func (mpf Converter[T, O]) Stream(iter *Stream[T]) *Stream[O]

Stream takes an input stream of one type and converts it to a stream of the another type. All errors from the original stream are propagated to the output stream.

func (Converter[T, O]) Wait added in v0.12.0

func (mpf Converter[T, O]) Wait(in T) (O, error)

Wait calls the transform function passing a context that cannot expire.

func (Converter[T, O]) WithLock added in v0.12.0

func (mpf Converter[T, O]) WithLock(mu *sync.Mutex) Converter[T, O]

WithLock returns a Transform function inside of the scope of the provided mutex.

func (Converter[T, O]) WithRecover added in v0.12.0

func (mpf Converter[T, O]) WithRecover() Converter[T, O]

WithRecover returns a Transform function that catches a panic, converts the panic object to an error if needed, and aggregates that with the Transform function's error.

type Generator added in v0.8.5

type Generator[T any] func(context.Context) (T, error)

Generator is a function type that is a failrly common constructor. It's signature is used to create iterators/streams, as a generator, and functions like a Future.

func CheckedGenerator added in v0.12.0

func CheckedGenerator[T any](op func() (T, bool)) Generator[T]

CheckedGenerator wraps a function object that uses the second ("OK") value to indicate that no more values will be produced. Errors returned from the resulting produce are always either the context cancellation error or io.EOF.

func FutureGenerator added in v0.12.0

func FutureGenerator[T any](f fn.Future[T]) Generator[T]

FutureGenerator creates a generator for the fn.Future function. The underlying Future's panics are converted to errors.

func MakeGenerator added in v0.12.0

func MakeGenerator[T any](fn func() (T, error)) Generator[T]

MakeGenerator constructs a generator that wraps a similar function that does not take a context.

func NewGenerator added in v0.12.0

func NewGenerator[T any](fn func(ctx context.Context) (T, error)) Generator[T]

NewGenerator returns a generator as a convenience function to avoid the extra cast when creating new function objects.

func PtrGenerator added in v0.12.0

func PtrGenerator[T any](fn func() *T) Generator[T]

PtrGenerator uses a function that returns a pointer to a value and converts that into a generator that de-references and returns non-nil values of the pointer, and returns EOF for nil values of the pointer.

func StaticGenerator added in v0.12.0

func StaticGenerator[T any](val T, err error) Generator[T]

StaticGenerator returns a generator function that always returns the provided values.

func ValueGenerator added in v0.12.0

func ValueGenerator[T any](val T) Generator[T]

ValueGenerator returns a generator function that always returns the provided value, and a nill error.

func (Generator[T]) After added in v0.12.0

func (pf Generator[T]) After(ts time.Time) Generator[T]

After will return a Generator that will block until the provided time is in the past, and then execute normally.

func (Generator[T]) Capture added in v0.12.0

func (pf Generator[T]) Capture() fn.Future[T]

func (Generator[T]) Check added in v0.12.0

func (pf Generator[T]) Check(ctx context.Context) (T, bool)

Check converts the error into a boolean, with true indicating success and false indicating (but not propagating it.).

func (Generator[T]) Delay added in v0.12.0

func (pf Generator[T]) Delay(d time.Duration) Generator[T]

Delay wraps a Generator in a function that will always wait for the specified duration before running.

If the value is negative, then there is always zero delay.

func (Generator[T]) Filter added in v0.12.0

func (pf Generator[T]) Filter(fl func(T) bool) Generator[T]

Filter creates a function that passes the output of the generator to the filter function, which, if it returns true. is returned to the caller, otherwise the Generator returns the zero value of type T and ers.ErrCurrentOpSkip error (e.g. continue), which streams and other generator-consuming functions can respect.

func (Generator[T]) Force added in v0.12.0

func (pf Generator[T]) Force() fn.Future[T]

Force combines the semantics of Must and Wait as a future: when the future is resolved, the generator executes with a context that never expires and panics in the case of an error.

func (Generator[T]) Future added in v0.12.0

func (pf Generator[T]) Future(ctx context.Context, ob fn.Handler[error]) fn.Future[T]

Future creates a future function using the context provided and error observer to collect the error.

func (Generator[T]) If added in v0.12.0

func (pf Generator[T]) If(cond bool) Generator[T]

If returns a generator that will execute the root generator only if the cond value is true. Otherwise, If will return the zero value for T and a nil error.

func (Generator[T]) Ignore added in v0.12.0

func (pf Generator[T]) Ignore(ctx context.Context) fn.Future[T]

Ignore creates a future that runs the generator and returns the value, ignoring the error.

func (Generator[T]) Jitter added in v0.12.0

func (pf Generator[T]) Jitter(jf func() time.Duration) Generator[T]

Jitter wraps a Generator that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the Generator.

If the function produces a negative duration, there is no delay.

func (Generator[T]) Join added in v0.12.0

func (pf Generator[T]) Join(next Generator[T]) Generator[T]

Join, on successive calls, runs the first generator until it returns an io.EOF error, and then returns the results of the second generator. If either generator returns another error (context cancelation or otherwise,) those errors are returned.

When the second function returns io.EOF, all successive calls will return io.EOF.

func (Generator[T]) Limit added in v0.12.0

func (pf Generator[T]) Limit(in int) Generator[T]

Limit runs the generator a specified number of times, and caches the result of the last execution and returns that value for any subsequent executions.

func (Generator[T]) Lock added in v0.12.0

func (pf Generator[T]) Lock() Generator[T]

Lock creates a generator that runs the root mutex as per normal, but under the protection of a mutex so that there's only one execution of the generator at a time.

func (Generator[T]) Must added in v0.12.0

func (pf Generator[T]) Must(ctx context.Context) fn.Future[T]

Must returns a future that resolves the generator returning the constructed value and panicing if the generator errors.

func (Generator[T]) Once added in v0.12.0

func (pf Generator[T]) Once() Generator[T]

Once returns a generator that only executes ones, and caches the return values, so that subsequent calls to the output generator will return the same values.

func (Generator[T]) Parallel added in v0.12.0

func (pf Generator[T]) Parallel(
	opts ...OptionProvider[*WorkerGroupConf],
) Generator[T]

Parallel returns a wrapped generator that produces items until the generator function returns io.EOF, or the context. Parallel operation, continue on error/continue-on-panic semantics are available and share configuration with the ParallelProcess and Map operations.

You must specify a number of workers in the options greater than one to get parallel operation. Otherwise, there is only one worker.

The operation returns results from a buffer that can hold a number of items equal to the number of workers. Buffered items may not be returned to the caller in the case of early termination.

func (Generator[T]) PostHook added in v0.12.0

func (pf Generator[T]) PostHook(op func()) Generator[T]

PostHook appends a function to the execution of the generator. If the function panics it is converted to an error and aggregated with the error of the generator.

Useful for calling context.CancelFunc, closers, or incrementing counters as necessary.

func (Generator[T]) PreHook added in v0.12.0

func (pf Generator[T]) PreHook(op Operation) Generator[T]

PreHook configures an operation function to run before the returned generator. If the pre-hook panics, it is converted to an error which is aggregated with the (potential) error from the generator, and returned with the generator's output.

func (Generator[T]) Read added in v0.12.0

func (pf Generator[T]) Read(ctx context.Context) (T, error)

Read executes the generator and returns the result.

func (Generator[T]) Retry added in v0.12.0

func (pf Generator[T]) Retry(n int) Generator[T]

Retry constructs a worker function that takes runs the underlying generator until the error value is nil, or it encounters a terminating error (io.EOF, ers.ErrAbortCurrentOp, or context cancellation.) In all cases, unless the error value is nil (e.g. the retry succeeds)

Context cancellation errors are returned to the caller, other terminating errors are not, with any other errors encountered during retries. ErrStreamContinue is always ignored and not aggregated. All errors are discarded if the retry operation succeeds in the provided number of retries.

Except for ErrStreamContinue, which is ignored, all other errors are aggregated and returned to the caller only if the retry fails. It's possible to return a nil error and a zero value, if the generator only returned ErrStreamContinue values.

func (Generator[T]) Stream added in v0.12.0

func (pf Generator[T]) Stream() *Stream[T]

Stream creates a stream that calls the Generator function once for every iteration, until it errors. Errors that are not context cancellation errors or io.EOF are propgated to the stream's Close method.

func (Generator[T]) TTL added in v0.12.0

func (pf Generator[T]) TTL(dur time.Duration) Generator[T]

TTL runs the generator only one time per specified interval. The interval must me greater than 0.

func (Generator[T]) Wait added in v0.12.0

func (pf Generator[T]) Wait() (T, error)

Wait runs the generator with a context that will ever expire.

func (Generator[T]) When added in v0.12.0

func (pf Generator[T]) When(cond func() bool) Generator[T]

When constructs a generator that will call the cond upon every execution, and when true, will run and return the results of the root generator. Otherwise When will return the zero value of T and a nil error.

func (Generator[T]) WithCancel added in v0.12.0

func (pf Generator[T]) WithCancel() (Generator[T], context.CancelFunc)

WithCancel creates a Generator and a cancel function which will terminate the context that the root Generator is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Generator is canceled.)

func (Generator[T]) WithErrorCheck added in v0.12.0

func (pf Generator[T]) WithErrorCheck(ef fn.Future[error]) Generator[T]

WithErrorCheck takes an error future, and checks it before executing the generator function. If the error future returns an error (any error), the generator propagates that error, rather than running the underying generator. Useful for injecting an abort into an existing pipleine or chain.

func (Generator[T]) WithErrorFilter added in v0.12.0

func (pf Generator[T]) WithErrorFilter(ef erc.Filter) Generator[T]

WithErrorFilter passes the error of the root Generator function with the erc.Filter.

func (Generator[T]) WithErrorHandler added in v0.12.0

func (pf Generator[T]) WithErrorHandler(handler fn.Handler[error], resolver fn.Future[error]) Generator[T]

func (Generator[T]) WithLock added in v0.12.0

func (pf Generator[T]) WithLock(mtx *sync.Mutex) Generator[T]

WithLock uses the provided mutex to protect the execution of the generator.

func (Generator[T]) WithLocker added in v0.12.0

func (pf Generator[T]) WithLocker(mtx sync.Locker) Generator[T]

WithLocker uses the provided mutex to protect the execution of the generator.

func (Generator[T]) WithRecover added in v0.12.0

func (pf Generator[T]) WithRecover() Generator[T]

WithRecover returns a wrapped generator with a panic handler that converts any panic to an error.

func (Generator[T]) WithoutErrors added in v0.12.0

func (pf Generator[T]) WithoutErrors(errs ...error) Generator[T]

WithoutErrors returns a Generator function that wraps the root generator and, after running the root generator, and makes the error value of the generator nil if the error returned is in the error list. The produced value in these cases is almost always the zero value for the type.

type Handler added in v0.10.2

type Handler[T any] func(context.Context, T) error

Handler are generic functions that take an argument (and a context) and return an error. They're the type of function used in various stream methods to implement worker pools, service management, and stream processing.

func FromHandler added in v0.12.0

func FromHandler[T any](f fn.Handler[T]) Handler[T]

FromHandler up-converts a fn.Handler (e.g. a more simple handler function that doesn't return an error or take a context,) into a fun.Handler. the underlying function runs with a panic handler (so fn.Handler panics are converted to errors.)

func JoinHandlers added in v0.12.0

func JoinHandlers[T any](pfs ...Handler[T]) Handler[T]

JoinHandlers takes a collection of Handler functions and merges them into a single chain, eliding any nil processors.

func MakeHandler added in v0.12.0

func MakeHandler[T any](fn func(T) error) Handler[T]

MakeHandler converts a function with the Handler signature (minus the context) for easy conversion.

func NewHandler added in v0.12.0

func NewHandler[T any](fn func(context.Context, T) error) Handler[T]

NewHandler returns a Handler Function. This is a convenience function to avoid the extra cast when creating new function objects.

func (Handler[T]) After added in v0.12.0

func (pf Handler[T]) After(ts time.Time) Handler[T]

After produces a Handler that will execute after the provided timestamp.

func (Handler[T]) Capture added in v0.10.2

func (pf Handler[T]) Capture() fn.Handler[T]

Capture creates a handler function that like, Handler.Force, passes a background context and ignores the processors error.

func (Handler[T]) Check added in v0.10.2

func (pf Handler[T]) Check(ctx context.Context, in T) bool

Check processes the input and returns true when the error is nil, and false when there was an error.

func (Handler[T]) Delay added in v0.12.0

func (pf Handler[T]) Delay(dur time.Duration) Handler[T]

Delay wraps a Handler in a function that will always wait for the specified duration before running.

If the value is negative, then there is always zero delay.

func (Handler[T]) Filter added in v0.10.2

func (pf Handler[T]) Filter(fl func(T) bool) Handler[T]

Filter returns a wrapping processor that takes a function a function that only calls the processor when the filter function returns true, and returns ers.ErrCurrentOpSkip otherwise.

func (Handler[T]) Force added in v0.12.0

func (pf Handler[T]) Force(in T)

Force processes the input, but discards the error and uses a context that will not expire.

func (Handler[T]) Handler added in v0.12.0

func (pf Handler[T]) Handler(ctx context.Context, oe fn.Handler[error]) fn.Handler[T]

Handler converts a processor into an observer, handling the error with the error observer and using the provided context.

func (Handler[T]) If added in v0.10.2

func (pf Handler[T]) If(c bool) Handler[T]

If runs the processor function if, and only if the condition is true. Otherwise the function does not run and the processor returns nil.

The resulting processor can be used more than once.

func (Handler[T]) Ignore added in v0.12.0

func (pf Handler[T]) Ignore(ctx context.Context, in T)

Ignore runs the process function and discards the error.

func (Handler[T]) Jitter added in v0.12.0

func (pf Handler[T]) Jitter(jf func() time.Duration) Handler[T]

Jitter wraps a Handler that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the processor.

If the function produces a negative duration, there is no delay.

func (Handler[T]) Join added in v0.10.2

func (pf Handler[T]) Join(pfs ...Handler[T]) Handler[T]

Join combines a sequence of processors on the same input, calling each function in order, as long as there is no error and the context does not expire. Context expiration errors are not propagated.

func (Handler[T]) Limit added in v0.12.0

func (pf Handler[T]) Limit(n int) Handler[T]

Limit ensures that the processor is called at most n times.

func (Handler[T]) Lock added in v0.10.2

func (pf Handler[T]) Lock() Handler[T]

Lock wraps the Handler and protects its execution with a mutex.

func (Handler[T]) Must added in v0.12.0

func (pf Handler[T]) Must(ctx context.Context, in T)

func (Handler[T]) Once added in v0.10.2

func (pf Handler[T]) Once() Handler[T]

Once make a processor that can only run once. Subsequent calls to the processor return the cached error of the original run.

func (Handler[T]) PostHook added in v0.12.0

func (pf Handler[T]) PostHook(op func()) Handler[T]

PostHook produces an amalgamated processor that runs after the processor completes. Panics are caught, converted to errors, and aggregated with the processors error. The hook operation is unconditionally called after the processor function (except in the case of a processor panic.)

func (Handler[T]) PreHook added in v0.10.5

func (pf Handler[T]) PreHook(op Operation) Handler[T]

PreHook creates an amalgamated Handler that runs the operation before the root processor. If the operation panics that panic is converted to an error and merged with the processor's error. Use with Operation.Once() to create an "init" function that runs once before a processor is called the first time.

func (Handler[T]) Read added in v0.12.0

func (pf Handler[T]) Read(ctx context.Context, in T) error

Read executes the Handler once.

func (Handler[T]) ReadAll added in v0.12.0

func (pf Handler[T]) ReadAll(st *Stream[T]) Worker

ReadAll reads elements from the producer until an error is encountered and passes them to a producer, until the first error is encountered. The worker is blocking.

func (Handler[T]) TTL added in v0.12.0

func (pf Handler[T]) TTL(dur time.Duration) Handler[T]

TTL returns a Handler that runs once in the specified window, and returns the error from the last run in between this interval. While the executions of the underlying function happen in isolation, in between, the processor is concurrently accessible.

func (Handler[T]) Wait added in v0.12.0

func (pf Handler[T]) Wait(in T) error

Wait runs the Handler with a context that will never be canceled.

func (Handler[T]) When added in v0.10.2

func (pf Handler[T]) When(c func() bool) Handler[T]

When returns a processor function that runs if the conditional function returns true, and does not run otherwise. The conditional function is evaluated every time the returned processor is run.

func (Handler[T]) WithCancel added in v0.12.0

func (pf Handler[T]) WithCancel() (Handler[T], context.CancelFunc)

WithCancel creates a Handler and a cancel function which will terminate the context that the root proccessor is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Handler is canceled.)

func (Handler[T]) WithErrorCheck added in v0.12.0

func (pf Handler[T]) WithErrorCheck(ef fn.Future[error]) Handler[T]

WithErrorCheck takes an error future, and checks it before executing the processor operation. If the error future returns an error (any error), the processor propagates that error, rather than running the underying processor. Useful for injecting an abort into an existing pipleine or chain.

The error future is called before running the underlying processor, to short circuit the operation, and also a second time when processor has returned in case an error has occurred during the operation of the processor.

func (Handler[T]) WithErrorFilter added in v0.12.0

func (pf Handler[T]) WithErrorFilter(ef erc.Filter) Handler[T]

WithErrorFilter uses an erc.Filter to process the error respose from the processor.

func (Handler[T]) WithLock added in v0.10.2

func (pf Handler[T]) WithLock(mtx *sync.Mutex) Handler[T]

WithLock wraps the Handler and ensures that the mutex is always held while the root Handler is called.

func (Handler[T]) WithLocker added in v0.12.0

func (pf Handler[T]) WithLocker(mtx sync.Locker) Handler[T]

func (Handler[T]) WithRecover added in v0.10.5

func (pf Handler[T]) WithRecover() Handler[T]

WithRecover runs the producer, converted all panics into errors. WithRecover is itself a processor.

func (Handler[T]) WithoutErrors added in v0.12.0

func (pf Handler[T]) WithoutErrors(errs ...error) Handler[T]

WithoutErrors returns a producer that will convert a non-nil error of the provided types to a nil error.

type Operation added in v0.10.0

type Operation func(context.Context)

Operation is a type of function object that will block until an operation returns or the context is canceled.

func MakeOperation added in v0.10.4

func MakeOperation(in func()) Operation

MakeOperation converts a function that takes no arguments into an Operation.

func WaitChannel added in v0.6.3

func WaitChannel[T any](ch <-chan T) Operation

WaitChannel converts a channel (typically, a `chan struct{}`) to a Operation. The Operation blocks till it's context is canceled or the channel is either closed or returns one item.

func WaitContext added in v0.6.3

func WaitContext(ctx context.Context) Operation

WaitContext wait's for the context to be canceled before returning. The Operation that's return also respects it's own context. Use this Operation and it's own context to wait for a context to be cacneled with a timeout, for instance.

func (Operation) Add added in v0.10.0

func (wf Operation) Add(ctx context.Context, wg *WaitGroup)

Add starts a the operation in a goroutine incrementing and decrementing the WaitGroup as appropriate.

func (Operation) After added in v0.10.0

func (wf Operation) After(ts time.Time) Operation

After provides an operation that will only run if called after the specified clock time. When called after this time, the operation blocks until that time passes (or the context is canceled.)

func (Operation) Background added in v0.10.0

func (wf Operation) Background(ctx context.Context)

Background launches the operation in a go routine. There is no panic-safety provided.

func (Operation) Delay added in v0.10.0

func (wf Operation) Delay(dur time.Duration) Operation

Delay wraps a Operation in a function that will always wait for the specified duration before running.

If the value is negative, then there is always zero delay.

func (Operation) Go added in v0.10.0

func (wf Operation) Go() Operation

Go provides access to the Go method (e.g. starting this operation in a go routine.) as a method that can be used as an operation itself.

func (Operation) Group added in v0.12.0

func (wf Operation) Group(n int) Operation

Group makes an operation that runs n copies of the underlying worker, in different go routines. Work does not start until the resulting worker is called.

func (Operation) If added in v0.10.0

func (wf Operation) If(cond bool) Operation

If provides a static version of the When that only runs if the condition is true, and is otherwise a noop.

func (Operation) Interval added in v0.10.3

func (wf Operation) Interval(dur time.Duration) Operation

Interval runs the operation with a timer that resets to the provided duration. The operation runs immediately, and then the time is reset to the specified interval after the base operation is completed. Which is to say that the runtime of the operation itself is effectively added to the interval.

func (Operation) Jitter added in v0.10.0

func (wf Operation) Jitter(dur func() time.Duration) Operation

Jitter wraps a Operation that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the Operation operation.

If the function produces a negative duration, there is no delay.

func (Operation) Join added in v0.10.0

func (wf Operation) Join(ops ...Operation) Operation

Join combines a sequence of operations, calling the Operations in order as long as the context does not expire. If the context expires, the combined operation aborts early.

func (Operation) Launch added in v0.10.0

func (wf Operation) Launch(ctx context.Context) Operation

Launch starts the operation in a background go routine and returns an operation which blocks until it's context is canceled or the underlying operation returns.

func (Operation) Limit added in v0.10.0

func (wf Operation) Limit(in int) Operation

Limit returns an operation that will only run the specified number of times. The resulting operation is safe for concurrent use, but operations can run concurrently.

func (Operation) Lock added in v0.10.0

func (wf Operation) Lock() Operation

Lock constructs a mutex that ensure that the underlying operation (when called through the output operation,) only runs within the scope of the lock.

func (Operation) Once added in v0.10.0

func (wf Operation) Once() Operation

Once produces an operation that will only execute the root operation once, no matter how many times it's called.

func (Operation) PostHook added in v0.10.0

func (wf Operation) PostHook(hook func()) Operation

PostHook unconditionally runs the post-hook operation after the operation returns, as a defer. Use the hook to run cleanup operations. The Operation returned from this method runs both the original hook, and the hook function.

func (Operation) PreHook added in v0.10.0

func (wf Operation) PreHook(hook Operation) Operation

PreHook unconditionally runs the hook operation before the underlying operation. Use Operaiton.Once() operations for the hook to initialize resources for use by the operation, or without Once to provide reset semantics. The Operation returned from this method runs both the original hook, and the hook function.

func (Operation) Run added in v0.10.0

func (wf Operation) Run(ctx context.Context)

Run is equivalent to calling the operation directly

func (Operation) Signal added in v0.10.0

func (wf Operation) Signal(ctx context.Context) <-chan struct{}

Signal starts the operation in a go routine, and provides a signal channel which will be closed when the operation is complete.

func (Operation) StartGroup added in v0.10.0

func (wf Operation) StartGroup(ctx context.Context, n int) Operation

StartGroup runs n operations, incrementing the WaitGroup to account for the job. Callers must wait on the WaitGroup independently.

func (Operation) TTL added in v0.10.0

func (wf Operation) TTL(dur time.Duration) Operation

TTL runs an operation, and if the operation is called before the specified duration, the operation is a noop.

func (Operation) Wait added in v0.10.3

func (wf Operation) Wait()

Wait runs the operation with a background context.

func (Operation) When added in v0.10.0

func (wf Operation) When(cond func() bool) Operation

When runs the condition function, and if it returns true,

func (Operation) While added in v0.10.3

func (wf Operation) While() Operation

While runs the operation in a tight loop, until the context expires.

func (Operation) WithCancel added in v0.10.0

func (wf Operation) WithCancel() (Operation, context.CancelFunc)

WithCancel creates a Operation and a cancel function which will terminate the context that the root Operation is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Operation is canceled.)

func (Operation) WithErrorHook added in v0.12.0

func (wf Operation) WithErrorHook(ef fn.Future[error]) Worker

WithErrorHook runs the operation--potentially catching a panic and converting it to an error--and then aggretaging that with the output of the error future. The error future is always called.

func (Operation) WithLock added in v0.10.0

func (wf Operation) WithLock(mtx *sync.Mutex) Operation

WithLock ensures that the underlying operation, when called through the output operation, will holed the mutex while running.

func (Operation) WithRecover added in v0.10.4

func (wf Operation) WithRecover() Worker

WithRecover converts the Operation into a Worker function that catchers panics and returns them as errors using fun.Check.

func (Operation) Worker added in v0.10.0

func (wf Operation) Worker() Worker

Worker converts a wait function into a fun.Worker. If the context is canceled, the worker function returns the context's error. Does not handle panics, use WithRecover() to convert panics to errors

type OptionProvider added in v0.10.0

type OptionProvider[T any] func(T) error

OptionProvider is a function type for building functional arguments, and is used for the parallel stream processing (map, transform, for-each, etc.) in the fun and itertool packages, and available with tooling for use in other contexts.

The type T should always be mutable (e.g. a map, or a pointer).

func JoinOptionProviders added in v0.10.0

func JoinOptionProviders[T any](op ...OptionProvider[T]) OptionProvider[T]

JoinOptionProviders takes a zero or more option providers and produces a single combined option provider. With zero or nil arguments, the operation becomes a noop.

func WorkerGroupConfAddExcludeErrors added in v0.10.0

func WorkerGroupConfAddExcludeErrors(errs ...error) OptionProvider[*WorkerGroupConf]

WorkerGroupConfAddExcludeErrors appends the provided errors to the ExcludedErrors value. The provider will return an error if any of the input streams is ErrRecoveredPanic.

func WorkerGroupConfContinueOnError added in v0.10.0

func WorkerGroupConfContinueOnError() OptionProvider[*WorkerGroupConf]

WorkerGroupConfContinueOnError toggles the option that allows the operation to continue when the operation encounters an error. Otherwise, any option will lead to an abort.

func WorkerGroupConfContinueOnPanic added in v0.10.0

func WorkerGroupConfContinueOnPanic() OptionProvider[*WorkerGroupConf]

WorkerGroupConfContinueOnPanic toggles the option that allows the operation to continue when encountering a panic.

func WorkerGroupConfIncludeContextErrors added in v0.10.0

func WorkerGroupConfIncludeContextErrors() OptionProvider[*WorkerGroupConf]

WorkerGroupConfIncludeContextErrors toggles the option that forces the operation to include context errors in the output. By default they are not included.

func WorkerGroupConfNumWorkers added in v0.10.0

func WorkerGroupConfNumWorkers(num int) OptionProvider[*WorkerGroupConf]

WorkerGroupConfNumWorkers sets the number of workers configured. It is not possible to set this value to less than 1: negative values and 0 are always ignored.

func WorkerGroupConfSet added in v0.10.0

func WorkerGroupConfSet(opt *WorkerGroupConf) OptionProvider[*WorkerGroupConf]

WorkerGroupConfSet overrides the option with the provided option.

func WorkerGroupConfWithErrorCollector added in v0.10.0

func WorkerGroupConfWithErrorCollector(ec *erc.Collector) OptionProvider[*WorkerGroupConf]

WorkerGroupConfWithErrorCollector sets an error collector implementation for later use in the WorkerGroupOptions. The resulting function will only error if the collector is nil, however, this method will override an existing error collector.

The ErrorCollector interface is typically provided by the `erc.Collector` type.

ErrorCollectors are used by some operations to collect, aggregate, and distribute errors from operations to the caller.

func WorkerGroupConfWorkerPerCPU added in v0.10.0

func WorkerGroupConfWorkerPerCPU() OptionProvider[*WorkerGroupConf]

WorkerGroupConfWorkerPerCPU sets the number of workers to the number of detected CPUs by the runtime (e.g. runtime.NumCPU()).

func (OptionProvider[T]) Apply added in v0.10.0

func (op OptionProvider[T]) Apply(in T) error

Apply applies the current Operation Provider to the configuration, and if the type T implements a Validate() method, calls that. All errors are aggregated.

func (OptionProvider[T]) Build added in v0.10.3

func (op OptionProvider[T]) Build(conf T) (out T, err error)

Build processes a configuration object, returning a modified version (or a zero value, in the case of an error).

func (OptionProvider[T]) Join added in v0.10.0

func (op OptionProvider[T]) Join(opps ...OptionProvider[T]) OptionProvider[T]

Join aggregates a collection of Option Providers into a single option provider. The amalgamated operation is panic safe and omits all nil providers.

type RuntimeInvariant added in v0.10.0

type RuntimeInvariant struct{}

RuntimeInvariant is a type defined to create a namespace, callable (typically) via the Invariant symbol. Access these functions as in:

fun.Invariant.IsTrue(len(slice) > 0, "slice must have elements", len(slice))

func (RuntimeInvariant) Failure added in v0.10.1

func (RuntimeInvariant) Failure(args ...any)

Failure unconditionally raises an invariant failure error and processes the arguments as with the other invariant failures: extracting errors and aggregating constituent errors.

func (RuntimeInvariant) IsFalse added in v0.10.0

func (RuntimeInvariant) IsFalse(cond bool, args ...any)

IsFalse provides a runtime assertion that the condition is false, and annotates panic object, which is an error rooted in the ErrInvariantViolation. In all other cases the operation is a noop.

func (RuntimeInvariant) IsTrue added in v0.10.0

func (RuntimeInvariant) IsTrue(cond bool, args ...any)

IsTrue provides a runtime assertion that the condition is true, and annotates panic object, which is an error rooted in the ErrInvariantViolation. In all other cases the operation is a noop.

func (RuntimeInvariant) Must added in v0.10.0

func (RuntimeInvariant) Must(err error, args ...any)

Must raises an invariant error if the error is not nil. The content of the panic is both--via wrapping--an ErrInvariantViolation and the error itself.

func (RuntimeInvariant) New added in v0.12.0

func (RuntimeInvariant) New(args ...any) error

New creates an error that is rooted in ers.ErrInvariantViolation, aggregating errors and annotating the error.

func (RuntimeInvariant) Ok added in v0.10.9

func (RuntimeInvariant) Ok(cond bool, args ...any)

Ok panics if the condition is false, passing an error that is rooted in InvariantViolation. Otherwise the operation is a noop.

type Stream added in v0.12.0

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream provides a safe, context-respecting iteration/sequence paradigm, and entire tool kit for consumer functions, converters, and generation options.

As the basis and heart of a programming model, streams make it possible to think about groups or sequences of objects or work, that can be constructed and populated lazily, and provide a collection of interfaces for processing and manipulating data.

Beyond the stream interactive tools provided in this package, the itertool package provdes some additional helpers and tools, while the adt and dt packages provide simple types and tooling built around these streams

The canonical way to use a stream is with the core Next() Value() and Close() methods: Next takes a context and advances the stream. Next, which is typically called in single-clause for loop (e.g. as in while loop) returns false when the stream has no items, after which the stream should be closed and cannot be re-started. When Next() returns true, the stream is advanced, and the output of Value() will provide the value at the current position in the stream. Next() will block if the stream has not been closed, and the operation with Produces or Generates new items for the stream blocks, (or continues iterating) until the stream is exhausted, or closed.

However, additional methods, such as ReadOne, the Generator() function (which is a wrapper around ReadOne) provide a different iteraction paradigm: they combine the Next() and value operations into a single function call. When the stream is exhausted these methods return the `io.EOF` error.

In all cases, checking the Close() value of the stream makes it possible to see any errors encountered during the operation of the stream.

Using Next/Value cannot be used concurrently, as there is no way to synchronize the Next/Value calls with respect to eachother: it's possible in this mode to both miss and/or get duplicate values from the stream in this case. If the generator/generator function in the stream is safe for concurrent use, then ReadOne can be used safely. As a rule, all tooling in the fun package uses ReadOne except in a few cases where a caller has exclusive access to the stream.

func ChannelStream added in v0.12.0

func ChannelStream[T any](ch <-chan T) *Stream[T]

ChannelStream exposes access to an existing "receive" channel as a stream.

func JoinStreams added in v0.12.0

func JoinStreams[T any](iters ...*Stream[T]) *Stream[T]

JoinStreams takes a sequence of streams and produces a combined stream. JoinStreams processes items sequentially from each stream. By contrast, MergeStreams constructs a stream that reads all of the items from the input streams in parallel, and returns items in an arbitrary order.

Use JoinStreams or FlattenStreams if order is important. Use FlattenStream for larger numbers of streams. Use MergeStreams when producing an item takes a non-trivial amount of time.

func MakeStream added in v0.12.0

func MakeStream[T any](gen Generator[T]) *Stream[T]

MakeStream constructs a stream that calls the Generator function once for every item, until it errors. Errors other than context cancellation errors and io.EOF are propgated to the stream's Close method.

func MergeStreams added in v0.12.0

func MergeStreams[T any](iters *Stream[*Stream[T]]) *Stream[T]

MergeStreams takes a collection of streams of the same type of objects and provides a single stream over these items.

There are a collection of background threads, one for each input stream, which will iterate over the inputs and will provide the items to the output stream. These threads start on the first iteration and will return if this context is canceled.

The stream will continue to produce items until all input streams have been consumed, the initial context is canceled, or the Close method is called, or all of the input streams have returned an error.

Use MergeStreams when producing an item takes a non-trivial amount of time. Use ChainStreams or FlattenStreams if order is important. Use FlattenStream for larger numbers of streams.

func SeqStream added in v0.12.0

func SeqStream[T any](it iter.Seq[T]) *Stream[T]

SeqStream wraps a native go iterator to a fun.Stream[T].

func SliceStream added in v0.12.0

func SliceStream[T any](in []T) *Stream[T]

SliceStream provides Stream access to the elements in a slice.

func VariadicStream added in v0.12.0

func VariadicStream[T any](in ...T) *Stream[T]

VariadicStream produces a stream from an arbitrary collection of objects, passed into the constructor.

func (*Stream[T]) AddError added in v0.12.0

func (st *Stream[T]) AddError(e error)

AddError can be used by calling code to add errors to the stream, which are merged.

AddError is not safe for concurrent use (with regards to other AddError calls or Close).

func (*Stream[T]) Any added in v0.12.0

func (st *Stream[T]) Any() *Stream[any]

Any, as a special case of Transform converts a stream of any type and converts it to a stream of any (e.g. interface{}) values.

func (*Stream[T]) Buffer added in v0.12.0

func (st *Stream[T]) Buffer(n int) *Stream[T]

Buffer adds a buffer in the queue using a channel as buffer to smooth out iteration performance, if the iteration (generator) and the consumer both take time, even a small buffer will improve the throughput of the system and prevent both components of the system from blocking on eachother.

The ordering of elements in the output stream is the same as the order of elements in the input stream.

func (*Stream[T]) BufferParallel added in v0.12.0

func (st *Stream[T]) BufferParallel(n int) *Stream[T]

BufferParallel, like buffer, process the input queue and stores those items in a channel; however, unlike Buffer, multiple workers consume the input stream: as a result the order of the elements in the output stream is not the same as the input order.

Otherwise, the two Buffer methods are equivalent and serve the same purpose: process the items from a stream without blocking the consumer of the stream.

func (*Stream[T]) BufferedChannel added in v0.12.0

func (st *Stream[T]) BufferedChannel(ctx context.Context, size int) <-chan T

BufferedChannel provides access to the content of the stream with a buffered channel that is closed when the stream is exhausted.

func (*Stream[T]) Channel added in v0.12.0

func (st *Stream[T]) Channel(ctx context.Context) <-chan T

Channel proides access to the contents of the stream as a channel. The channel is closed when the stream is exhausted.

func (*Stream[T]) Close added in v0.12.0

func (st *Stream[T]) Close() error

Close terminates the stream and returns any errors collected during iteration. If the stream allocates resources, this will typically release them, but close may not block until all resources are released.

Close is safe to call more than once and always resolves the error handler (e.g. AddError),

func (*Stream[T]) CloseHook added in v0.12.0

func (st *Stream[T]) CloseHook() func(*Stream[T])

func (*Stream[T]) Count added in v0.12.0

func (st *Stream[T]) Count(ctx context.Context) (count int)

Count returns the number of items observed by the stream. Callers should still manually call Close on the stream.

func (*Stream[T]) ErrorHandler added in v0.12.0

func (st *Stream[T]) ErrorHandler() fn.Handler[error]

ErrorHandler provides access to the AddError method as an error observer.

func (*Stream[T]) Filter added in v0.12.0

func (st *Stream[T]) Filter(check func(T) bool) *Stream[T]

Filter passes every item in the stream and, if the check function returns true propagates it to the output stream. There is no buffering, and check functions should return quickly. For more advanced use, consider using itertool.Map()

func (*Stream[T]) Generator added in v0.12.0

func (st *Stream[T]) Generator() Generator[T]

Generator provides access to the contents of the stream as a Generator function.

func (*Stream[T]) Iterator added in v0.12.0

func (st *Stream[T]) Iterator(ctx context.Context) iter.Seq[T]

Iterator converts a fun.Stream[T] into a native go iterator.

func (*Stream[T]) Join added in v0.12.0

func (st *Stream[T]) Join(iters ...*Stream[T]) *Stream[T]

Join merges multiple streams processing and producing their results sequentially, and without starting any go routines. Otherwise similar to Flatten (which processes each stream in parallel).

func (*Stream[T]) MarshalJSON added in v0.12.0

func (st *Stream[T]) MarshalJSON() ([]byte, error)

MarshalJSON is useful for implementing json.Marshaler methods from stream-supporting types. Wrapping the standard library's json encoding tools.

The contents of the stream are marshaled as elements in an JSON array.

func (*Stream[T]) Next added in v0.12.0

func (st *Stream[T]) Next(ctx context.Context) bool

Next advances the stream (using ReadOne) and caches the current value for access with the Value() method. When Next is true, the Value() will return the next item. When false, either the stream has been exhausted (e.g. the Generator function has returned io.EOF) or the context passed to Next has been canceled.

Using Next/Value cannot be done safely if stream is accessed from multiple go routines concurrently. In these cases use ReadOne directly, or use Split to create a stream that safely draws items from the parent stream.

func (*Stream[T]) Parallel added in v0.12.0

func (st *Stream[T]) Parallel(
	fn Handler[T],
	opts ...OptionProvider[*WorkerGroupConf],
) Worker

Parallel produces a worker that, when executed, will iteratively processes the contents of the stream. The options control the error handling and parallelism semantics of the operation.

This is the work-house operation of the package, and can be used as the basis of worker pools, even processing, or message dispatching for pubsub queues and related systems.

func (*Stream[T]) Read added in v0.12.0

func (st *Stream[T]) Read(ctx context.Context) (out T, err error)

Read returns a single value from the stream. This operation IS safe for concurrent use.

Read returns the io.EOF error when the stream has been exhausted, a context expiration error or the underlying error produced by the stream. All errors produced by Read are terminal and indicate that no further iteration is possible.

func (*Stream[T]) ReadAll added in v0.12.0

func (st *Stream[T]) ReadAll(fn fn.Handler[T]) Worker

ReadAll provides a function consumes all items in the stream with the provided processor function.

All panics are converted to errors and propagated in the response of the worker, and abort the processing. If the processor function returns ErrStreamContinue, processing continues. All other errors abort processing and are returned by the worker.

func (*Stream[T]) Reduce added in v0.12.0

func (st *Stream[T]) Reduce(reducer func(T, T) (T, error)) Generator[T]

Reduce processes a stream with a reducer function. The output function is a Generator operation which runs synchronously, and no processing happens before generator is called. If the reducer function returns, ErrStreamContinue, the output value is ignored, and the reducer operation continues. io.EOR errors are not propagated to the caller, and in all situations, the last value produced by the reducer is returned with an error.

The "previous" value for the first reduce option is the zero value for the type T.

func (*Stream[T]) Slice added in v0.12.0

func (st *Stream[T]) Slice(ctx context.Context) (out []T, _ error)

Slice converts a stream to the slice of it's values, and closes the stream at the when the stream has been exhausted..

In the case of an error in the underlying stream the output slice will have the values encountered before the error.

func (*Stream[T]) Split added in v0.12.0

func (st *Stream[T]) Split(num int) []*Stream[T]

Split produces an arbitrary number of streams which divide the input. The division is lazy and depends on the rate of consumption of output streams, but every item from the input stream is sent to exactly one output stream, each of which can be safely used from a different go routine.

The input stream is not closed after the output streams are exhausted. There is one background go routine that reads items off of the input stream, which starts when the first output stream is advanced: be aware that canceling this context will effectively cancel all streams.

func (*Stream[T]) Transform added in v0.12.0

func (st *Stream[T]) Transform(op Converter[T, T]) *Stream[T]

Transform processes a stream passing each element through a transform function. The type of the stream is the same for the output. Use Convert stream to change the type of the value.

func (*Stream[T]) UnmarshalJSON added in v0.12.0

func (st *Stream[T]) UnmarshalJSON(in []byte) error

UnmarshalJSON reads a byte-array of input data that contains a JSON array and then processes and returns that data iteratively.

To handle streaming data from an io.Reader that contains a stream of line-separated json documents, use itertool.JSON.

func (*Stream[T]) Value added in v0.12.0

func (st *Stream[T]) Value() T

Value returns the object at the current position in the stream. It's often used with Next() for looping over the stream.

Value and Next cannot be done safely when the stream is being used concrrently. Use ReadOne or the Generator method.

func (*Stream[T]) WithHook added in v0.12.0

func (st *Stream[T]) WithHook(hook fn.Handler[*Stream[T]]) *Stream[T]

WithHook constructs a stream from the generator. The provided hook function will run during the Stream's Close() method.

type WaitGroup added in v0.6.3

type WaitGroup struct {
	// contains filtered or unexported fields
}

WaitGroup works like sync.WaitGroup, except that the Wait method takes a context (and can be passed as a fun.Operation). The implementation is exceptionally simple. The only constraint, like sync.WaitGroup, is that you can never modify the value of the internal counter such that it is negative, event transiently. The implementation does not require background resources aside from Wait, which creates a single goroutine that lives for the entire time that Wait is running, but no other background resources are created. Multiple copies of Wait can be safely called at once, and the WaitGroup is reusable more than once.

This implementation is about 50% slower than sync.WaitGroup after informal testing. It provides a little extra flexiblity and introspection, with similar semantics, that may be worth the additional performance hit.

func (*WaitGroup) Add added in v0.8.0

func (wg *WaitGroup) Add(num int)

Add modifies the internal counter. Raises an ErrInvariantViolation error if any modification causes the internal coutner to be less than 0.

func (*WaitGroup) Done added in v0.8.0

func (wg *WaitGroup) Done()

Done marks a single operation as done.

func (*WaitGroup) Group added in v0.12.0

func (wg *WaitGroup) Group(n int, op Operation) Operation

Group returns an operation that, when executed, starts <n> copies of the operation and blocks until all have finished.

func (*WaitGroup) Inc added in v0.10.8

func (wg *WaitGroup) Inc()

Inc adds one item to the wait group.

func (*WaitGroup) IsDone added in v0.8.0

func (wg *WaitGroup) IsDone() bool

IsDone returns true if there is pending work, and false otherwise.

func (*WaitGroup) Launch added in v0.10.0

func (wg *WaitGroup) Launch(ctx context.Context, op Operation)

Launch increments the WaitGroup and starts the operation in a go routine.

func (*WaitGroup) Num added in v0.8.0

func (wg *WaitGroup) Num() int

Num returns the number of pending workers.

func (*WaitGroup) Operation added in v0.10.0

func (wg *WaitGroup) Operation() Operation

Operation returns with WaitGroups Wait method as a Operation.

func (*WaitGroup) StartGroup added in v0.12.0

func (wg *WaitGroup) StartGroup(ctx context.Context, n int, op Operation) Operation

StartGroup starts <n> copies of the operation in separate threads and returns an operation that waits on the wait group.

func (*WaitGroup) Wait added in v0.8.0

func (wg *WaitGroup) Wait(ctx context.Context)

Wait blocks until either the context is canceled or all items have completed.

Wait is pasable or usable as a fun.Operation.

In many cases, callers should not rely on the Wait operation returning after the context expires: If Done() calls are used in situations that respect a context cancellation, aborting the Wait on a context cancellation, particularly when Wait gets a context that has the same lifecycle as the operations its waiting on, the result is that worker routines will leak. Nevertheless, in some situations, when workers may take a long time to respond to a context cancellation, being able to set a second deadline on Waiting may be useful.

Consider using `fun.Operation(wg.Wait).Block()` if you want blocking semantics with the other features of this WaitGroup implementation.

func (*WaitGroup) Worker added in v0.10.0

func (wg *WaitGroup) Worker() Worker

Worker returns a worker that will block on the wait group returning and return the conbext's error if one exits.

type Worker added in v0.10.0

type Worker func(context.Context) error

Worker represents a basic function used in worker pools and other similar situations

func MakeWorker added in v0.10.4

func MakeWorker(fn func() error) Worker

MakeWorker converts a non-context worker function into a worker for compatibility with tooling.

func (Worker) After added in v0.10.0

func (wf Worker) After(ts time.Time) Worker

After returns a Worker that blocks until the timestamp provided is in the past. Additional calls to this worker will run immediately. If the timestamp is in the past the resulting worker will run immediately.

func (Worker) Background added in v0.10.0

func (wf Worker) Background(ctx context.Context, ob fn.Handler[error]) Operation

Background starts the worker function in a go routine, passing the error to the provided observer function.

func (Worker) Check added in v0.10.0

func (wf Worker) Check(ctx context.Context) bool

Check runs the worker and returns true (ok) if there was no error, and false otherwise.

func (Worker) Delay added in v0.10.0

func (wf Worker) Delay(dur time.Duration) Worker

Delay wraps a Worker in a function that will always wait for the specified duration before running.

If the value is negative, then there is always zero delay.

func (Worker) Group added in v0.10.3

func (wf Worker) Group(n int) Worker

Group makes a worker that runs n copies of the underlying worker, in different go routines and aggregates their output. Work does not start until the resulting worker is called.

func (Worker) If added in v0.10.0

func (wf Worker) If(cond bool) Worker

If returns a Worker function that runs only if the condition is true. The error is always nil if the condition is false. If-ed functions may be called more than once, and will run multiple times potentiall.y

func (Worker) Ignore added in v0.10.0

func (wf Worker) Ignore() Operation

Ignore converts the worker into a Operation that discards the error produced by the worker.

func (Worker) Interval added in v0.10.3

func (wf Worker) Interval(dur time.Duration) Worker

Interval runs the worker with a timer that resets to the provided duration. The worker runs immediately, and then the time is reset to the specified interval after the base worker has. Which is to say that the runtime of the worker's operation is effectively added to the interval.

The interval worker will run until the context is canceled or the worker returns an error.

func (Worker) Jitter added in v0.10.0

func (wf Worker) Jitter(jf func() time.Duration) Worker

Jitter wraps a Worker that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the Worker.

If the function produces a negative duration, there is no delay.

func (Worker) Join added in v0.10.0

func (wf Worker) Join(wfs ...Worker) Worker

Join combines a sequence of workers, calling the workers in order, as long as there is no error and the context does not expire. Context expiration errors are not propagated.

func (Worker) Launch added in v0.10.3

func (wf Worker) Launch(ctx context.Context) Worker

Launch runs the worker function in a go routine and returns a new fun.Worker which will block for the context to expire or the background worker to completes, which returns the error from the background request.

The underlying worker begins executing before future returns.

func (Worker) Limit added in v0.10.0

func (wf Worker) Limit(n int) Worker

Limit produces a worker than runs exactly n times. Each execution is isolated from every other, but once the limit is exceeded, the result of the *last* worker to execute is cached concurrent access to that value is possible.

func (Worker) Lock added in v0.10.0

func (wf Worker) Lock() Worker

Lock produces a Worker that will be executed within the scope of a (managed) mutex.

func (Worker) Must added in v0.10.0

func (wf Worker) Must() Operation

Must converts a Worker function into a wait function; however, if the worker produces an error Must converts the error into a panic.

func (Worker) Once added in v0.10.0

func (wf Worker) Once() Worker

Once wraps the Worker in a function that will execute only once. The return value (error) is cached, and can be accessed many times without re-running the worker.

func (Worker) Operation added in v0.10.0

func (wf Worker) Operation(ob fn.Handler[error]) Operation

Operation converts a worker function into a wait function, passing any error to the handler function.

func (Worker) PostHook added in v0.10.0

func (wf Worker) PostHook(post func()) Worker

PostHook runs hook operation after the worker function completes. If the hook panics it is converted to an error and aggregated with workers's error.

func (Worker) PreHook added in v0.10.0

func (wf Worker) PreHook(pre Operation) Worker

PreHook returns a Worker that runs an operatio unconditionally before running the underlying worker. If the hook function panics it is converted to an error and aggregated with the worker's error.

func (Worker) Retry added in v0.10.4

func (wf Worker) Retry(n int) Worker

Retry constructs a worker function that takes runs the underlying worker until the return value is nil, or it encounters a terminating error (io.EOF, ers.ErrAbortCurrentOp, or context cancellation.) Context cancellation errors are returned to the caller with any other errors encountered in previous retries, other terminating errors are not. All errors are discarded if the retry operation succeeds in the provided number of retries.

Except for ErrStreamContinue, which is ignored, all other errors are aggregated and returned to the caller only if the retry fails.

func (Worker) Run added in v0.10.0

func (wf Worker) Run(ctx context.Context) error

Run is equivalent to calling the worker function directly.

func (Worker) Signal added in v0.10.0

func (wf Worker) Signal(ctx context.Context) <-chan error

Signal runs the worker function in a background goroutine and returns the error in an error channel, that returns when the worker function returns. If Signal is called with a canceled context the worker is still executed (with that context.)

A value, possibly nil, is always sent through the channel. Panics are not caught or handled.

func (Worker) StartGroup added in v0.10.0

func (wf Worker) StartGroup(ctx context.Context, n int) Worker

StartGroup starts n copies of the worker operation and returns a future/worker that returns the aggregated errors from all workers

The operation is fundamentally continue-on-error. To get abort-on-error semantics, use the Filter() method on the input worker, that cancels the context on when it sees an error.

func (Worker) TTL added in v0.10.0

func (wf Worker) TTL(dur time.Duration) Worker

TTL produces a worker that will only run once during every specified duration, when called more than once. During the interval between calls, the previous error is returned. While each execution of the root worker is protected by a mutex, the resulting worker can be used in parallel during the intervals between calls.

func (Worker) Wait added in v0.10.3

func (wf Worker) Wait() error

Wait runs the worker with a background context and returns its error.

func (Worker) When added in v0.10.0

func (wf Worker) When(cond func() bool) Worker

When wraps a Worker function that will only run if the condition function returns true. If the condition is false the worker does not execute. The condition function is called in between every operation.

When worker functions may be called more than once, and will run multiple times potentially.

func (Worker) While added in v0.10.0

func (wf Worker) While() Worker

While runs the Worker in a continuous while loop, returning only if the underlying worker returns an error or if the context is cancled.

func (Worker) WithCancel added in v0.10.0

func (wf Worker) WithCancel() (Worker, context.CancelFunc)

WithCancel creates a Worker and a cancel function which will terminate the context that the root Worker is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Worker is canceled.)

func (Worker) WithErrorFilter added in v0.10.5

func (wf Worker) WithErrorFilter(ef erc.Filter) Worker

WithErrorFilter wraps the worker with a Worker that passes the output of the root Worker's error and returns the output of the filter.

The ers package provides a number of filter implementations but any function in the following form works:

func(error) error

func (Worker) WithErrorHook added in v0.12.0

func (wf Worker) WithErrorHook(ef fn.Future[error]) Worker

WithErrorHook runs the worker, potentially catching a panic and joining that with the error produced by the error future. Both the worker and the error future always execute, and both errors are returned in aggregated form.

func (Worker) WithLock added in v0.10.0

func (wf Worker) WithLock(mtx sync.Locker) Worker

WithLock produces a Worker that will be executed within the scope of the provided mutex.

func (Worker) WithRecover added in v0.10.4

func (wf Worker) WithRecover() Worker

WithRecover produces a worker function that converts the worker function's panics to errors.

func (Worker) WithoutErrors added in v0.10.0

func (wf Worker) WithoutErrors(errs ...error) Worker

WithoutErrors returns a worker that will return nil if the error returned by the worker is one of the errors passed to WithoutErrors.

type WorkerGroupConf added in v0.10.0

type WorkerGroupConf struct {
	// NumWorkers describes the number of parallel workers
	// processing the incoming stream items and running the map
	// function. All values less than 1 are converted to 1. Any
	// value greater than 1 will result in out-of-sequence results
	// in the output stream.
	NumWorkers int
	// ContinueOnPanic prevents the operations from halting when a
	// single processing function panics. In all modes mode panics
	// are converted to errors and propagated to the output
	// stream's Close() method,.
	ContinueOnPanic bool
	// ContinueOnError allows a processing function to return an
	// error and allow the work of the broader operation to
	// continue. Errors are aggregated propagated to the output
	// stream's Close() method.
	ContinueOnError bool
	// IncludeContextExpirationErrors changes the default handling
	// of context cancellation errors. By default all errors
	// rooted in context cancellation are not propagated to the
	// Close() method, however, when true, these errors are
	// captured. All other error handling semantics
	// (e.g. ContinueOnError) are applicable.
	IncludeContextExpirationErrors bool
	// ExcludedErrors is a list of that should not be included
	// in the collected errors of the
	// output. ers.ErrRecoveredPanic is always included and io.EOF
	// is never included.
	ExcludedErrors []error
	// ErrorCollector provides a way to connect an existing error
	// collector to a worker group.
	ErrorCollector *erc.Collector
}

WorkerGroupConf describes the runtime options to several operations operations. The zero value of this struct provides a usable strict operation.

func (*WorkerGroupConf) CanContinueOnError added in v0.10.0

func (o *WorkerGroupConf) CanContinueOnError(err error) (out bool)

CanContinueOnError checks an error, collecting it as needed using the WorkerGroupConf, and then returning true if processing should continue and false otherwise.

Neither io.EOF nor ErrStreamContinue errors are ever observed. All panic errors are observed. Context cancellation errors are observed only when configured. as well as context cancellation errors when configured.

func (*WorkerGroupConf) Validate added in v0.10.0

func (o *WorkerGroupConf) Validate() error

Validate ensures that the configuration is valid, and returns an error if there are impossible configurations

Directories

Path Synopsis
adt
Package adt provides "atomic data types" as strongly-typed generic helpers for simple atomic operations (including sync.Map, sync.Pool, and a typed value).
Package adt provides "atomic data types" as strongly-typed generic helpers for simple atomic operations (including sync.Map, sync.Pool, and a typed value).
Package assert provides an incredibly simple assertion framework, that relies on generics and simplicity.
Package assert provides an incredibly simple assertion framework, that relies on generics and simplicity.
dt
Package dt provides container type implementations and interfaces.
Package dt provides container type implementations and interfaces.
cmp
Package cmp provides comparators for sorting linked lists.
Package cmp provides comparators for sorting linked lists.
hdrhist
Package hdrhist provides an implementation of Gil Tene's HDR Histogram data structure.
Package hdrhist provides an implementation of Gil Tene's HDR Histogram data structure.
is
Package is contains a simple assertion library for the fun/ensure testing framework.
Package is contains a simple assertion library for the fun/ensure testing framework.
Package erc provides a simple/fast error aggregation tool for collecting and aggregating errors.
Package erc provides a simple/fast error aggregation tool for collecting and aggregating errors.
Package ers provides some very basic error aggregating and handling tools, as a companion to erc.
Package ers provides some very basic error aggregating and handling tools, as a companion to erc.
Package intish provides a collection of strongly type integer arithmetic operations, to make it possible to avoid floating point math for simple operations when desired.
Package intish provides a collection of strongly type integer arithmetic operations, to make it possible to avoid floating point math for simple operations when desired.
Package itertool provides a set of functional helpers for managinging and using fun.Streams, including a parallel processing, generators, Map/Reduce, Merge, and other convenient tools.
Package itertool provides a set of functional helpers for managinging and using fun.Streams, including a parallel processing, generators, Map/Reduce, Merge, and other convenient tools.
Package pubsub provides a message broker for one-to-many or many-to-many message distribution.
Package pubsub provides a message broker for one-to-many or many-to-many message distribution.
Package risky contains a bunch of bad ideas for APIs and operations that will definitely lead to panics and deadlocks and incorrect behavior when used incorrectly.
Package risky contains a bunch of bad ideas for APIs and operations that will definitely lead to panics and deadlocks and incorrect behavior when used incorrectly.
Package srv provides a framework and toolkit for service orchestration.
Package srv provides a framework and toolkit for service orchestration.
Package testt (for test tools), provides a couple of useful helpers for common test patterns.
Package testt (for test tools), provides a couple of useful helpers for common test patterns.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL