async

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2024 License: MIT Imports: 10 Imported by: 0

README

go-async

test Go Report Card codecov Version Badge License Badge Go Reference

English | 简体中文

A tool collection that provided asynchronous workflow control utilities, inspired by JavaScript Promise Object and Node.js async package.

Installation and Requirement

Run the following command to install the library, and this library requires Go 1.18 and later versions.

go get -u github.com/ghosind/go-async

Getting Started

The function to run

The most of utility functions of this library accept any type of function to run, you can set the parameters and the return values as any type and any number of return values that you want. However, for best practice, we recommend you to set the first parameter as context.Context to receive the signals and make the type of the last return value as an error to let the utilities know whether an error happened or not.

Run all functions until they are finished

All function can help you to execute all the functions asynchronously. It'll wrap all return values to a two-dimensional any type slice and return it if all functions are completed and no error returns or panic.

If any function returns an error or panics, the All function will terminate immediately and return the error. It'll also send a cancel signal to other uncompleted functions by context if they accept a context by their first parameter.

out, err := async.All(func (ctx context.Context) (int, error) {
  return 0, nil
}, func (ctx context.Context) (string, error)) {
  return "hello", nil
}/*, ...*/)
// out: [][]any{{0, <nil>}, {"hello", <nil>}}
// err: <nil>

out, err := async.All(func (ctx context.Context) (int, error) {
  return 0, nil
}, func (ctx context.Context) (string, error)) {
  return "", errors.New("some error")
}/*, ...*/)
// out: [][]any{{}, {"", some error}}
// err: function 1 error: some error

If you do not want to terminate the execution when some function returns an error or panic, you can try the AllCompleted function. The AllCompleted function executes until all functions are finished or panic. It'll return a list of the function return values, and an error to indicate whether any functions return error or panic.

out, err := async.AllCompleted(func (ctx context.Context) (int, error) {
  return 0, nil
}, func (ctx context.Context) (string, error) {
  return "", errors.New("some error")
}/*, ...*/)
// out: [][]any{{0, <nil>}, {"", some error}}}
// err: function 1 error: some error
Get first output

If you want to run a list of functions and get the output of the first finish function, you can try the Race function. The Race function will run all functions asynchronously, and return when a function is finished or panicked.

The Race function returns three values:

  • 1st value: an output list of the first finish function.
  • 2nd value: the index of the first finish function.
  • 3rd value: the execution error that from the first finish or panic function.
out, index, err := async.Race(func (ctx context.Context) (int, error) {
  request.Get("https://example.com")
  return 0, nil
}, func (ctx context.Context) (string, error) {
  time.Sleep(time.Second)
  return "test", nil
})
// If the first function faster than the second one:
// out: []any{0, <nil>}, index: 0, err: <nil>
//
// Otherwise:
// out: []any{"test", <nil>}, index: 1, err: <nil>
Run all functions with concurrency limit

To run all functions asynchronously but with the specified concurrency limitation, you can use the Parallel function. The Parallel function accepts a number that the concurrency limitation and the list of functions to run. The number of the concurrency limitation must be greater than or equal to 0. The number 0 means no limitation of the concurrency number, and it has the same effect as the All function.

// Run 2 functions asynchronously at the time.
out, err := async.Parallel(2, func (ctx context.Context) (int, error) {
  // Do something
  return 1, nil
}, func (ctx context.Context) (string, error) {
  // Do something
  return "hello", nil
}, func (ctx context.Context) error {
  // Do something
  return nil
}/* , ... */)
// out: [][]any{{1, <nil>}, {"hello", <nil>}, {<nil>}}
// err: <nil>

The Parallel will also be terminated if any function panics or returns an error. If you do not want to terminate the execution of other functions, you can try to use ParallelCompleted. The ParallelCompleted function will run all functions until all functions are finished. It will return the output list and an error to indicate whether any function errored.

Run a function forever until it returns an error or panic

For Forever function, you can use it to run a function forever until it returns an error or panics. You need to run the Forever function with a ForeverFn type function, and you can see more information about ForeverFn after the following example.

err := Forever(func(ctx context.Context, next func(context.Context)) error {
  v := ctx.Value("key")
  if v != nil {
    vi := v.(int)
    if vi == 5 {
      return errors.New("finish")
    }

    fmt.Printf("value: %d\n", vi)

    next(context.WithValue(ctx, "key", vi+1))
  } else {
    next(context.WithValue(ctx, "key", 1))
  }

  return nil
})
fmt.Printf("err: %v\n", err)
// value: 1
// value: 2
// value: 3
// value: 4
// err: finish

The ForeverFn accepts two parameters, the first one is a context from the caller or the last invocation. The second parameter of ForeverFn is a function to set the context that passes to the next invocation. For ForeverFn, it is an optional behavior to call the next function, and only the first invoke will work.

Customize context

For all utility functions, you can use the XXXWithContext function (like AllWithContext, RaceWithContext, ...) to set the context by yourself.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrContextCanceled to indicate the context was canceled or timed out.
	ErrContextCanceled error = errors.New("context canceled")
	// ErrInvalidConcurrency to indicate the number of the concurrency limitation is an invalid
	// value.
	ErrInvalidConcurrency error = errors.New("invalid concurrency")
	// ErrNotFunction indicates the value is not a function.
	ErrNotFunction error = errors.New("not function")
	// ErrUnmatchedParam indicates the function's parameter list does not match to the list from the
	// caller.
	ErrUnmatchedParam error = errors.New("parameters are unmatched")
	// ErrInvalidTestFunc indicates the test function is invalid.
	ErrInvalidTestFunc error = errors.New("invalid test function")
)

Functions

func All

func All(funcs ...AsyncFn) ([][]any, error)

All executes the functions asynchronously until all functions have been finished. If some function returns an error or panic, it will immediately return an execution error, and send a cancel signal to all other functions by context.

The index of the function will be -1 if all functions have been completed without error or panic.

out, err := async.All(func() (int, error) {
  return 1, nil
}, func() (string, error) {
  time.Sleep(100 * time.Millisecond)
  return "hello", nil
}, func(ctx context.Context) error {
  time.Sleep(50 * time.Millisecond)
  return nil
})
// out: [][]any{{1, nil}, {"hello", nil}, {nil}}
// err: nil

_, err = async.All(func() (int, error) {
  return 0, errors.New("some error")
}, func() (string, error) {
  time.Sleep(100 * time.Millisecond)
  return "hello", nil
})
// err: function 0 error: some error
Example
out, err := async.All(func() int {
	time.Sleep(100 * time.Millisecond)
	return 1
}, func() int {
	return 2
})
fmt.Println(out)
fmt.Println(err)
Output:

[[1] [2]]
<nil>

func AllCompleted

func AllCompleted(funcs ...AsyncFn) ([][]any, error)

AllCompleted executes the functions asynchronously until all functions have been finished. It will return an error slice that is ordered by the functions order, and a boolean value to indicate whether any functions return an error or panic.

out, err := async.AllCompleted(func() (int, error) {
  return 1, nil
}, func() (string, error) {
  time.Sleep(100 * time.Millisecond)
  return "hello", nil
}, func(ctx context.Context) error {
  time.Sleep(50 * time.Millisecond)
  return errors.New("some error")
})
// out: [][]any{{1, nil}, {"hello", nil}, {some error}}
// err: function 2 error: some error
Example
out, err := async.AllCompleted(func() (int, error) {
	time.Sleep(100 * time.Millisecond)
	return 1, nil
}, func() error {
	return errors.New("expected error")
})
fmt.Println(out)
fmt.Println(err)
Output:

[[1 <nil>] [expected error]]
function 1 error: expected error

func AllCompletedWithContext

func AllCompletedWithContext(
	ctx context.Context,
	funcs ...AsyncFn,
) ([][]any, error)

AllCompletedWithContext executes the functions asynchronously until all functions have been finished, or the context is done (canceled or timeout). It will return an error slice that is ordered by the functions order, and a boolean value to indicate whether any functions return an error or panic.

func AllWithContext

func AllWithContext(ctx context.Context, funcs ...AsyncFn) ([][]any, error)

AllWithContext executes the functions asynchronously until all functions have been finished, or the context is done (canceled or timeout). If some function returns an error or panic, it will immediately return an execution error and send a cancel signal to all other functions by context.

The index of the function will be -1 if all functions have been completed without error or panic, or the context has been canceled (or timeout) before all functions finished.

func Forever added in v0.3.0

func Forever(fn ForeverFn) error

Forever runs the function indefinitely until the function panics or returns an error.

You can use the context and call the next function to pass values to the next invocation. The next function can be invoked one time only, and it will have no effect if it is invoked again.

err := Forever(func(ctx context.Context, next func(context.Context)) error {
  v := ctx.Value("key")
  if v != nil {
    vi := v.(int)
    if vi == 3 {
      return errors.New("finish")
    }

    fmt.Printf("value: %d\n", vi)

    next(context.WithValue(ctx, "key", vi+1))
  } else {
    next(context.WithValue(ctx, "key", 1))
  }

  return nil
})
fmt.Printf("err: %v\n", err)
// value: 1
// value: 2
// err: finish
Example
err := async.Forever(func(ctx context.Context, next func(context.Context)) error {
	val := ctx.Value("key")
	if val == nil {
		//lint:ignore SA1029 for test case only
		next(context.WithValue(ctx, "key", 1))
	} else if v := val.(int); v < 5 {
		//lint:ignore SA1029 for test case only
		next(context.WithValue(ctx, "key", v+1))
	} else {
		return errors.New("value is 5")
	}

	return nil
})
fmt.Println(err)
Output:

value is 5

func ForeverWithContext added in v0.3.0

func ForeverWithContext(ctx context.Context, fn ForeverFn) error

ForeverWithContext runs the function indefinitely until the function panics or returns an error.

You can use the context and call the next function to pass values to the next invocation. The next function can be invoked one time only, and it will have no effect if it is invoked again.

func Parallel added in v0.2.0

func Parallel(concurrency int, funcs ...AsyncFn) ([][]any, error)

Parallel runs the functions asynchronously with the specified concurrency limitation. It will send a cancel sign to context and terminate immediately if any function returns an error or panic, and also returns an execution error to indicate the error.

The number of concurrency must be greater than or equal to 0, and it means no concurrency limitation if the number is 0.

// Run 2 functions asynchronously at the time.
out, err := Parallel(2, func(ctx context.Context) (int, error) {
  // Do something
  return 1, nil
}, func(ctx context.Context) (string, error) {
  // Do something
  return "hello", nil
}, func(ctx context.Context) error {
// Do something
  return nil
} /* , ... */)
// out: [][]any{{1, <nil>}, {"hello", <nil>}, {<nil>}}
// err: <nil>
Example
out, err := async.Parallel(2, func() int {
	time.Sleep(50 * time.Millisecond)
	return 1
}, func() int {
	time.Sleep(50 * time.Millisecond)
	return 2
}, func() int {
	time.Sleep(50 * time.Millisecond)
	return 3
})
fmt.Println(out)
fmt.Println(err)
Output:

[[1] [2] [3]]
<nil>

func ParallelCompleted added in v0.2.0

func ParallelCompleted(concurrency int, funcs ...AsyncFn) ([][]any, error)

ParallelCompleted runs the functions asynchronously with the specified concurrency limitation. It returns an error array and a boolean value to indicate whether any function panics or returns an error, and you can get the error details from the error array by the indices of the functions in the parameter list. It will return until all of the functions are finished.

The number of concurrency must be greater than or equal to 0, and it means no concurrency limitation if the number is 0.

Example
out, err := async.ParallelCompleted(2, func() int {
	time.Sleep(50 * time.Millisecond)
	return 1
}, func() error {
	time.Sleep(50 * time.Millisecond)
	return errors.New("expected error")
}, func() int {
	time.Sleep(50 * time.Millisecond)
	return 3
})
fmt.Println(out)
fmt.Println(err)
Output:

[[1] [expected error] [3]]
function 1 error: expected error

func ParallelCompletedWithContext added in v0.2.0

func ParallelCompletedWithContext(
	ctx context.Context,
	concurrency int,
	funcs ...AsyncFn,
) ([][]any, error)

ParallelCompletedWithContext runs the functions asynchronously with the specified concurrency limitation and the context. It returns an error array and a boolean value to indicate whether any function panics or returns an error, and you can get the error details from the error array by the indices of the functions in the parameter list. It will return until all of the functions are finished.

The number of concurrency must be greater than or equal to 0, and it means no concurrency limitation if the number is 0.

func ParallelWithContext added in v0.2.0

func ParallelWithContext(
	ctx context.Context,
	concurrency int,
	funcs ...AsyncFn,
) ([][]any, error)

ParallelWithContext runs the functions asynchronously with the specified concurrency limitation. It will send a cancel sign to context and terminate immediately if any function returns an error or panic, and also returns an execution error to indicate the error. If the context was canceled or timed out before all functions finished executing, it will send a cancel sign to all uncompleted functions, and return a context canceled error.

The number of concurrency must be greater than or equal to 0, and it means no concurrency limitation if the number is 0.

func Race

func Race(funcs ...AsyncFn) ([]any, int, error)

Race executes the functions asynchronously, it will return the index and the result of the first of the finished function (including panic), and it will not send a cancel signal to other functions.

out, index, err := Race(func(ctx context.Context) (int, error) {
  request.Get("https://example.com")
  return 0, nil
}, func(ctx context.Context) (string, error) {
  time.Sleep(500 * time.Millisecond)
  return "test", nil
})
// If the first function faster than the second one:
// out: []any{0, <nil>}, index: 0, err: <nil>
//
// Otherwise:
// out: []any{"test", <nil>}, index: 1, err: <nil>
Example
out, index, err := async.Race(func() int {
	time.Sleep(50 * time.Millisecond)
	return 1
}, func() int {
	time.Sleep(20 * time.Millisecond)
	return 2
})
fmt.Println(out)
fmt.Println(index)
fmt.Println(err)
Output:

[2]
1
<nil>

func RaceWithContext

func RaceWithContext(ctx context.Context, funcs ...AsyncFn) ([]any, int, error)

RaceWithContext executes the functions asynchronously, it will return the index and the result of the first of the finished function (including panic), and it will not send a cancel signal to other functions.

func Retry added in v0.4.0

func Retry(fn AsyncFn, opts ...RetryOptions) ([]any, error)

Retry attempts to get a successful response from the function with no more than the specific retry times before returning an error. If the task is successful, it will return the result of the successful task. If all attempts fail, it will return the result and the error of the final attempt.

Retry(func() error {
  // Do something
  return err
}) // Run the function 5 times without interval time or it succeed

Retry(func() error {
  // Do something
  return err
}, RetryOptions{
  Times: 3,
  Interval: 100,
}) // Run the function 3 times with 100ms interval or it succeed
Example
i := 0

out, err := async.Retry(func() error {
	i++
	if i != 3 {
		return errors.New("i != 3")
	} else {
		return nil
	}
})
fmt.Println(i)
fmt.Println(out)
fmt.Println(err)
Output:

3
[<nil>]
<nil>

func RetryWithContext added in v0.4.0

func RetryWithContext(ctx context.Context, fn AsyncFn, opts ...RetryOptions) ([]any, error)

RetryWithContext runs the function with the specified context, and attempts to get a successful response from the function with no more than the specific retry times before returning an error. If the task is successful, it will return the result of the successful task. If all attempts fail, it will return the result and the error of the final attempt.

func Until added in v0.4.0

func Until(testFn, fn AsyncFn) ([]any, error)

Until repeatedly calls the function until the test function returns true. A valid test function must match the following requirements.

- The first return value of the test function must be a boolean value. - The parameters' number of the test function must be equal to the return values' number of the execution function (exclude context). - The parameters' types of the test function must be the same or convertible to the return values' types of the execution function.

c := 0
Until(func() bool {
  return c == 5
}, func() {
  c++
})
Example
i := 0

out, err := async.Until(func(n int) bool {
	return n < 3
}, func() int {
	i++
	return i
})
fmt.Println(i)
fmt.Println(out)
fmt.Println(err)
Output:

3
[3]
<nil>

func UntilWithContext added in v0.4.0

func UntilWithContext(ctx context.Context, testFn, fn AsyncFn) ([]any, error)

UntilWithContext repeatedly calls the function with the specified context until the test function returns true.

func While added in v0.4.0

func While(testFn, fn AsyncFn) ([]any, error)

While repeatedly calls the function while the test function returns true. A valid test function must match the following requirements.

- The first return value of the test function must be a boolean value. - It should have no parameters or accept a context only.

c := 0
While(func() bool {
  return c == 5
}, func() {
  c++
})
Example
i := 0

out, err := async.While(func() bool {
	return i < 3
}, func() {
	i++
})
fmt.Println(i)
fmt.Println(out)
fmt.Println(err)
Output:

3
[]
<nil>

func WhileWithContext added in v0.4.0

func WhileWithContext(ctx context.Context, testFn, fn AsyncFn) ([]any, error)

WhileWithContext repeatedly calls the function with the specified context while the test function returns true.

Types

type AsyncFn added in v0.2.0

type AsyncFn any

AsyncFn is the function to run, the function can be a function without any restriction that accepts any parameters and any return values. For the best practice, please define the function like the following styles:

func(context.Context) error
func(context.Context) (out_type, error)
func(context.Context, in_type) error
func(context.Context, in_type) (out_type, error)
func(context.Context, in_type1, in_type2/*, ...*/) (out_type1, out_type_2,/* ...,*/ error)

type ExecutionError added in v0.3.0

type ExecutionError interface {
	// Index returns the function's index in the parameters list that the function had returned an
	// error or panicked.
	Index() int
	// Err returns the original error that was returned or panicked by the function.
	Err() error
	// Error returns the execution error message.
	Error() string
}

type ExecutionErrors added in v0.3.0

type ExecutionErrors []ExecutionError

ExecutionErrors is an array of ExecutionError.

func (ExecutionErrors) Error added in v0.3.0

func (ee ExecutionErrors) Error() string

Error combines and returns all of the execution errors' message.

type ForeverFn added in v0.3.0

type ForeverFn func(ctx context.Context, next func(context.Context)) error

ForeverFn is the function to run in the Forever function.

type RetryOptions added in v0.4.0

type RetryOptions struct {
	// Times is the number of attempts to make before giving up, the default is 5.
	Times int
	// Interval is the time to wait between retries in milliseconds, the default is 0.
	Interval int
	// IntervalFunc is the function to calculate the time to wait between retries in milliseconds, it
	// accepts an int value to indicate the retry count.
	IntervalFunc func(int) int
	// ErrorFilter is a function that is invoked on an error result. Retry will continue the retry
	// attempts if it returns true, and it will abort the workflow and return the current attempt's
	// result and error if it returns false.
	ErrorFilter func(error) bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL