async

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: MIT Imports: 10 Imported by: 0

README

go-async

test Go Report Card codecov Version Badge License Badge Go Reference

English | 简体中文

It's a powerful asynchronous utility library inspired by JavaScript Promise Object and Node.js async package.

Installation and Requirement

Run the following command to install this library, and Go 1.18 and later versions required.

go get -u github.com/ghosind/go-async

And then, import the library into your own code.

import "github.com/ghosind/go-async"

This library is not stable yet, anything may change in the later versions.

Getting Started

For the following example, it runs the functions concurrently and returns the return values until all functions have been completed.

out, err := async.All(func (ctx context.Context) (int, error) {
  return 0, nil
}, func () (string, error)) {
  time.Sleep(100 * time.Millisecond)
  return "hello", nil
})
// out: [][]any{{0, <nil>}, {"hello", <nil>}}
// err: <nil>

There are over 10 asynchronous control flow functions available, please visit Go Reference to see the documentation and examples.

The function to run

The most of utility functions of this library accept any type of function to run, you can set the parameters and the return values as any type and any number of return values that you want. However, for best practice, we recommend you to set the first parameter as context.Context to receive the signals and make the type of the last return value as an error to let the utilities know whether an error happened or not.

Customize context

For all functions, you can use the XXXWithContext function (like AllWithContext, RaceWithContext, ...) to set the context by yourself.

Available Functions

License

The library published under MIT License, please see license file for more details.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrContextCanceled to indicate the context was canceled or timed out.
	ErrContextCanceled error = errors.New("context canceled")
	// ErrInvalidConcurrency to indicate the number of the concurrency limitation is an invalid
	// value.
	ErrInvalidConcurrency error = errors.New("invalid concurrency")
	// ErrNotFunction indicates the value is not a function.
	ErrNotFunction error = errors.New("not function")
	// ErrUnmatchedParam indicates the function's parameter list does not match to the list from the
	// caller.
	ErrUnmatchedParam error = errors.New("parameters are unmatched")
	// ErrInvalidTestFunc indicates the test function is invalid.
	ErrInvalidTestFunc error = errors.New("invalid test function")
	// ErrInvalidSeqFuncs indicates the functions in the Seq lists are not match.
	ErrInvalidSeqFuncs error = errors.New("invalid seq functions")
)

Functions

func All

func All(funcs ...AsyncFn) ([][]any, error)

All executes the functions asynchronously until all functions have been finished. If some function returns an error or panic, it will immediately return an execution error, and send a cancel signal to all other functions by context.

The index of the function will be -1 if all functions have been completed without error or panic.

out, err := async.All(func() (int, error) {
  return 1, nil
}, func() (string, error) {
  time.Sleep(100 * time.Millisecond)
  return "hello", nil
}, func(ctx context.Context) error {
  time.Sleep(50 * time.Millisecond)
  return nil
})
// out: [][]any{{1, nil}, {"hello", nil}, {nil}}
// err: nil

_, err = async.All(func() (int, error) {
  return 0, errors.New("some error")
}, func() (string, error) {
  time.Sleep(100 * time.Millisecond)
  return "hello", nil
})
// err: function 0 error: some error
Example
out, err := async.All(func() int {
	time.Sleep(100 * time.Millisecond)
	return 1
}, func() int {
	return 2
})
fmt.Println(out)
fmt.Println(err)
Output:

[[1] [2]]
<nil>

func AllCompleted

func AllCompleted(funcs ...AsyncFn) ([][]any, error)

AllCompleted executes the functions asynchronously until all functions have been finished. It will return an error slice that is ordered by the functions order, and a boolean value to indicate whether any functions return an error or panic.

out, err := async.AllCompleted(func() (int, error) {
  return 1, nil
}, func() (string, error) {
  time.Sleep(100 * time.Millisecond)
  return "hello", nil
}, func(ctx context.Context) error {
  time.Sleep(50 * time.Millisecond)
  return errors.New("some error")
})
// out: [][]any{{1, nil}, {"hello", nil}, {some error}}
// err: function 2 error: some error
Example
out, err := async.AllCompleted(func() (int, error) {
	time.Sleep(100 * time.Millisecond)
	return 1, nil
}, func() error {
	return errors.New("expected error")
})
fmt.Println(out)
fmt.Println(err)
Output:

[[1 <nil>] [expected error]]
function 1 error: expected error

func AllCompletedWithContext

func AllCompletedWithContext(
	ctx context.Context,
	funcs ...AsyncFn,
) ([][]any, error)

AllCompletedWithContext executes the functions asynchronously until all functions have been finished, or the context is done (canceled or timeout). It will return an error slice that is ordered by the functions order, and a boolean value to indicate whether any functions return an error or panic.

func AllWithContext

func AllWithContext(ctx context.Context, funcs ...AsyncFn) ([][]any, error)

AllWithContext executes the functions asynchronously until all functions have been finished, or the context is done (canceled or timeout). If some function returns an error or panic, it will immediately return an execution error and send a cancel signal to all other functions by context.

The index of the function will be -1 if all functions have been completed without error or panic, or the context has been canceled (or timeout) before all functions finished.

func Forever added in v0.3.0

func Forever(fn ForeverFn) error

Forever runs the function indefinitely until the function panics or returns an error.

You can use the context and call the next function to pass values to the next invocation. The next function can be invoked one time only, and it will have no effect if it is invoked again.

err := async.Forever(func(ctx context.Context, next func(context.Context)) error {
  v := ctx.Value("key")
  if v != nil {
    vi := v.(int)
    if vi == 3 {
      return errors.New("finish")
    }

    fmt.Printf("value: %d\n", vi)

    next(context.WithValue(ctx, "key", vi+1))
  } else {
    next(context.WithValue(ctx, "key", 1))
  }

  return nil
})
fmt.Printf("err: %v\n", err)
// value: 1
// value: 2
// err: finish
Example
err := async.Forever(func(ctx context.Context, next func(context.Context)) error {
	val := ctx.Value("key")
	if val == nil {
		//lint:ignore SA1029 for test case only
		next(context.WithValue(ctx, "key", 1))
	} else if v := val.(int); v < 5 {
		//lint:ignore SA1029 for test case only
		next(context.WithValue(ctx, "key", v+1))
	} else {
		return errors.New("value is 5")
	}

	return nil
})
fmt.Println(err)
Output:

value is 5

func ForeverWithContext added in v0.3.0

func ForeverWithContext(ctx context.Context, fn ForeverFn) error

ForeverWithContext runs the function indefinitely until the function panics or returns an error.

You can use the context and call the next function to pass values to the next invocation. The next function can be invoked one time only, and it will have no effect if it is invoked again.

func Parallel added in v0.2.0

func Parallel(concurrency int, funcs ...AsyncFn) ([][]any, error)

Parallel runs the functions asynchronously with the specified concurrency limitation. It will send a cancel sign to context and terminate immediately if any function returns an error or panic, and also returns an execution error to indicate the error.

The number of concurrency must be greater than or equal to 0, and it means no concurrency limitation if the number is 0.

// Run 2 functions asynchronously at the time.
out, err := async.Parallel(2, func(ctx context.Context) (int, error) {
  // Do something
  return 1, nil
}, func(ctx context.Context) (string, error) {
  // Do something
  return "hello", nil
}, func(ctx context.Context) error {
// Do something
  return nil
} /* , ... */)
// out: [][]any{{1, <nil>}, {"hello", <nil>}, {<nil>}}
// err: <nil>
Example
out, err := async.Parallel(2, func() int {
	time.Sleep(50 * time.Millisecond)
	return 1
}, func() int {
	time.Sleep(50 * time.Millisecond)
	return 2
}, func() int {
	time.Sleep(50 * time.Millisecond)
	return 3
})
fmt.Println(out)
fmt.Println(err)
Output:

[[1] [2] [3]]
<nil>

func ParallelCompleted added in v0.2.0

func ParallelCompleted(concurrency int, funcs ...AsyncFn) ([][]any, error)

ParallelCompleted runs the functions asynchronously with the specified concurrency limitation. It returns an error array and a boolean value to indicate whether any function panics or returns an error, and you can get the error details from the error array by the indices of the functions in the parameter list. It will return until all of the functions are finished.

The number of concurrency must be greater than or equal to 0, and it means no concurrency limitation if the number is 0.

Example
out, err := async.ParallelCompleted(2, func() int {
	time.Sleep(50 * time.Millisecond)
	return 1
}, func() error {
	time.Sleep(50 * time.Millisecond)
	return errors.New("expected error")
}, func() int {
	time.Sleep(50 * time.Millisecond)
	return 3
})
fmt.Println(out)
fmt.Println(err)
Output:

[[1] [expected error] [3]]
function 1 error: expected error

func ParallelCompletedWithContext added in v0.2.0

func ParallelCompletedWithContext(
	ctx context.Context,
	concurrency int,
	funcs ...AsyncFn,
) ([][]any, error)

ParallelCompletedWithContext runs the functions asynchronously with the specified concurrency limitation and the context. It returns an error array and a boolean value to indicate whether any function panics or returns an error, and you can get the error details from the error array by the indices of the functions in the parameter list. It will return until all of the functions are finished.

The number of concurrency must be greater than or equal to 0, and it means no concurrency limitation if the number is 0.

func ParallelWithContext added in v0.2.0

func ParallelWithContext(
	ctx context.Context,
	concurrency int,
	funcs ...AsyncFn,
) ([][]any, error)

ParallelWithContext runs the functions asynchronously with the specified concurrency limitation. It will send a cancel sign to context and terminate immediately if any function returns an error or panic, and also returns an execution error to indicate the error. If the context was canceled or timed out before all functions finished executing, it will send a cancel sign to all uncompleted functions, and return a context canceled error.

The number of concurrency must be greater than or equal to 0, and it means no concurrency limitation if the number is 0.

func Race

func Race(funcs ...AsyncFn) ([]any, int, error)

Race executes the functions asynchronously, it will return the index and the result of the first of the finished function (including panic), and it will not send a cancel signal to other functions.

out, index, err := async.Race(func(ctx context.Context) (int, error) {
  request.Get("https://example.com")
  return 0, nil
}, func(ctx context.Context) (string, error) {
  time.Sleep(500 * time.Millisecond)
  return "test", nil
})
// If the first function faster than the second one:
// out: []any{0, <nil>}, index: 0, err: <nil>
//
// Otherwise:
// out: []any{"test", <nil>}, index: 1, err: <nil>
Example
out, index, err := async.Race(func() int {
	time.Sleep(50 * time.Millisecond)
	return 1
}, func() int {
	time.Sleep(20 * time.Millisecond)
	return 2
})
fmt.Println(out)
fmt.Println(index)
fmt.Println(err)
Output:

[2]
1
<nil>

func RaceWithContext

func RaceWithContext(ctx context.Context, funcs ...AsyncFn) ([]any, int, error)

RaceWithContext executes the functions asynchronously, it will return the index and the result of the first of the finished function (including panic), and it will not send a cancel signal to other functions.

func Retry added in v0.4.0

func Retry(fn AsyncFn, opts ...RetryOptions) ([]any, error)

Retry attempts to get a successful response from the function with no more than the specific retry times before returning an error. If the task is successful, it will return the result of the successful task. If all attempts fail, it will return the result and the error of the final attempt.

async.Retry(func() error {
  // Do something
  return err
}) // Run the function 5 times without interval time or it succeed

async.Retry(func() error {
  // Do something
  return err
}, RetryOptions{
  Times: 3,
  Interval: 100,
}) // Run the function 3 times with 100ms interval or it succeed
Example
i := 0

out, err := async.Retry(func() error {
	i++
	if i != 3 {
		return errors.New("i != 3")
	} else {
		return nil
	}
})
fmt.Println(i)
fmt.Println(out)
fmt.Println(err)
Output:

3
[<nil>]
<nil>

func RetryWithContext added in v0.4.0

func RetryWithContext(ctx context.Context, fn AsyncFn, opts ...RetryOptions) ([]any, error)

RetryWithContext runs the function with the specified context, and attempts to get a successful response from the function with no more than the specific retry times before returning an error. If the task is successful, it will return the result of the successful task. If all attempts fail, it will return the result and the error of the final attempt.

func Seq added in v0.5.0

func Seq(funcs ...AsyncFn) ([]any, error)

Seq runs the functions in order, and each function consumes the returns value of the previous function. It returns the result of the last function, or it terminates and returns the error that panics or returns by the function in the list.

out, err := async.Seq(func () int {
	return 1
}, func (n int) int {
	return n + 1
})
// out: [2]
// err: <nil>
Example
out, err := async.Seq(func() int {
	return 1
}, func(n int) int {
	return n + 1
})
fmt.Println(out)
fmt.Println(err)
Output:

[2]
<nil>

func SeqGroups added in v0.5.1

func SeqGroups(groups ...[]AsyncFn) error

SeGroups runs the functions group in order, and it will be terminated if any function returns error.

err := async.SeqGroups([]async.AsyncFn{func () {
		// fn 1.a
	}, func () {
		// fn 1.b
	}}, []async.AsyncFn{func() {
		// fn 2
	}})
Example
flags := [][]bool{{false, false}, {false}, {true, true}}

err := async.SeqGroups([]async.AsyncFn{
	func() error {
		flags[0][0] = true
		return nil
	},
	func() error {
		flags[0][1] = true
		return nil
	},
}, []async.AsyncFn{
	func() error {
		if !flags[0][0] || !flags[0][1] {
			return errors.New("unexpected error")
		}
		flags[1][0] = true
		return nil
	},
}, []async.AsyncFn{
	func() error {
		if !flags[1][0] {
			return errors.New("unexpected error")
		}
		flags[2][0] = true
		return nil
	},
	func() error {
		if !flags[1][0] {
			return errors.New("unexpected error")
		}
		flags[2][1] = true
		return nil
	},
})
fmt.Println(err)
fmt.Println(flags)
Output:

<nil>
[[true true] [true] [true true]]

func SeqGroupsWithContext added in v0.5.1

func SeqGroupsWithContext(ctx context.Context, groups ...[]AsyncFn) error

SeqGroupsWithContext runs the functions group in order with the specified context, and it will be terminated if any function returns error.

func SeqWithContext added in v0.5.0

func SeqWithContext(ctx context.Context, funcs ...AsyncFn) ([]any, error)

SeqWithContext runs the functions in order with the specified context, and each function consumes the returns value of the previous function. It returns the result of the last function, or it terminates and returns the error that panics or returns by the function in the list.

func Series added in v0.5.0

func Series(funcs ...AsyncFn) ([][]any, error)

Series runs the functions by order and returns the results when all functions are completed. Each one runs once the previous function has been completed. If any function panics or returns an error, no more functions are run and it will return immediately.

async.Series(func () {
	// do first thing
}, func () {
	// do second thing
}/*, ...*/)
Example
i := 0
async.Series(func() {
	fmt.Println(i)
	i++
}, func() {
	fmt.Println(i)
	i++
}, func() {
	fmt.Println(i)
	i++
})
Output:

0
1
2

func SeriesWithContext added in v0.5.0

func SeriesWithContext(ctx context.Context, funcs ...AsyncFn) ([][]any, error)

SeriesWithContext runs the functions by order with the specified context and returns the results when all functions are completed. Each one runs once the previous function has been completed. If any function panics or returns an error, no more functions are run and it will return immediately.

func Times added in v0.5.0

func Times(n int, fn AsyncFn) ([][]any, error)

Times executes the function n times, and returns the results. It'll terminate if any function panics or returns an error.

// Calls api 5 times.
async.Times(5, func () => error {
	return CallAPI()
})
Example
i := atomic.Int32{}
async.Times(5, func() {
	i.Add(1)
})
fmt.Println(i.Load())
Output:

5

func TimesLimit added in v0.5.0

func TimesLimit(n, concurrency int, fn AsyncFn) ([][]any, error)

TimesLimit executes the function n times with the specified concurrency limit, and returns the results. It'll terminate if any function panics or returns an error.

// Calls api 5 times with 2 concurrency.
async.TimesLimit(5, 2, func () {
	return CallAPI()
})
Example
i := atomic.Int32{}
async.TimesLimit(5, 2, func() {
	i.Add(1)
})
fmt.Println(i.Load())
Output:

5

func TimesLimitWithContext added in v0.5.0

func TimesLimitWithContext(ctx context.Context, n, concurrency int, fn AsyncFn) ([][]any, error)

TimesLimitWithContext executes the function n times with the specified concurrency limit and the context, and returns the results. It'll terminate if any function panics or returns an error.

func TimesSeries added in v0.5.0

func TimesSeries(n int, fn AsyncFn) ([][]any, error)

TimesSeries executes the function n times with only a single invocation at a time, and returns the results. It'll terminate if any function panics or returns an error.

// Calls api 5 times but runs one at a time.
async.TimesSeries(5, func () {
	return CallAPI()
})
Example
i := atomic.Int32{}
out, err := async.TimesSeries(5, func() int32 {
	return i.Add(1)
})
fmt.Println(i.Load())
fmt.Println(out)
fmt.Println(err)
Output:

5
[[1] [2] [3] [4] [5]]
<nil>

func TimesSeriesWithContext added in v0.5.0

func TimesSeriesWithContext(ctx context.Context, n int, fn AsyncFn) ([][]any, error)

TimesSeriesWithContext executes the function n times with the context and only a single invocation at a time, and returns the results. It'll terminate if any function panics or returns an error.

func TimesWithContext added in v0.5.0

func TimesWithContext(ctx context.Context, n int, fn AsyncFn) ([][]any, error)

TimesWithContext executes the function n times with the context, and returns the results. It'll terminate if any function panics or returns an error.

func Until added in v0.4.0

func Until(testFn, fn AsyncFn) ([]any, error)

Until repeatedly calls the function until the test function returns true. A valid test function must match the following requirements.

- The first return value of the test function must be a boolean value. - The parameters' number of the test function must be equal to the return values' number of the execution function (exclude context). - The parameters' types of the test function must be the same or convertible to the return values' types of the execution function.

c := 0
async.Until(func() bool {
  return c == 5
}, func() {
  c++
})
Example
i := 0

out, err := async.Until(func(n int) bool {
	return n < 3
}, func() int {
	i++
	return i
})
fmt.Println(i)
fmt.Println(out)
fmt.Println(err)
Output:

3
[3]
<nil>

func UntilWithContext added in v0.4.0

func UntilWithContext(ctx context.Context, testFn, fn AsyncFn) ([]any, error)

UntilWithContext repeatedly calls the function with the specified context until the test function returns true.

func While added in v0.4.0

func While(testFn, fn AsyncFn) ([]any, error)

While repeatedly calls the function while the test function returns true. A valid test function must match the following requirements.

- The first return value of the test function must be a boolean value. - It should have no parameters or accept a context only.

c := 0
async.While(func() bool {
  return c == 5
}, func() {
  c++
})
Example
i := 0

out, err := async.While(func() bool {
	return i < 3
}, func() {
	i++
})
fmt.Println(i)
fmt.Println(out)
fmt.Println(err)
Output:

3
[]
<nil>

func WhileWithContext added in v0.4.0

func WhileWithContext(ctx context.Context, testFn, fn AsyncFn) ([]any, error)

WhileWithContext repeatedly calls the function with the specified context while the test function returns true.

Types

type AsyncFn added in v0.2.0

type AsyncFn any

AsyncFn is the function to run, the function can be a function without any restriction that accepts any parameters and any return values. For the best practice, please define the function like the following styles:

func(context.Context) error
func(context.Context) (out_type, error)
func(context.Context, in_type) error
func(context.Context, in_type) (out_type, error)
func(context.Context, in_type1, in_type2/*, ...*/) (out_type1, out_type_2,/* ...,*/ error)

type ExecutionError added in v0.3.0

type ExecutionError interface {
	// Index returns the function's index in the parameters list that the function had returned an
	// error or panicked.
	Index() int
	// Err returns the original error that was returned or panicked by the function.
	Err() error
	// Error returns the execution error message.
	Error() string
}

type ExecutionErrors added in v0.3.0

type ExecutionErrors []ExecutionError

ExecutionErrors is an array of ExecutionError.

func (ExecutionErrors) Error added in v0.3.0

func (ee ExecutionErrors) Error() string

Error combines and returns all of the execution errors' message.

func (ExecutionErrors) Unwrap added in v0.5.2

func (ee ExecutionErrors) Unwrap() []error

Unwrap returns the execution errors.

type ForeverFn added in v0.3.0

type ForeverFn func(ctx context.Context, next func(context.Context)) error

ForeverFn is the function to run in the Forever function.

type Paralleler added in v0.5.0

type Paralleler struct {
	// contains filtered or unexported fields
}

Paralleler is a tool to run the tasks with the specific concurrency, default no concurrency limitation.

Example
p := new(async.Paralleler)

p.Add(func() int {
	return 1
}).Add(func() int {
	return 2
}).Add(func() string {
	return "Hello"
})

ret, err := p.Run()
fmt.Println(ret)
fmt.Println(err)
Output:

[[1] [2] [Hello]]
<nil>

func (*Paralleler) Add added in v0.5.0

func (p *Paralleler) Add(funcs ...AsyncFn) *Paralleler

Add adds the functions into the pending tasks list.

func (*Paralleler) Clear added in v0.5.0

func (p *Paralleler) Clear() *Paralleler

Clear clears the paralleler's pending tasks list.

func (*Paralleler) Run added in v0.5.0

func (p *Paralleler) Run() ([][]any, error)

Run runs the tasks in the paralleler's pending list, it'll clear the pending list and return the results of the tasks.

func (*Paralleler) RunCompleted added in v0.5.0

func (p *Paralleler) RunCompleted() ([][]any, error)

RunCompleted runs the tasks in the paralleler's pending list until all functions are finished, it'll clear the pending list and return the results of the tasks.

func (*Paralleler) WithConcurrency added in v0.5.0

func (p *Paralleler) WithConcurrency(concurrency int) *Paralleler

WithConcurrency sets the number of concurrency limitation.

func (*Paralleler) WithContext added in v0.5.0

func (p *Paralleler) WithContext(ctx context.Context) *Paralleler

WithContext sets the context that passes to the tasks.

type RetryOptions added in v0.4.0

type RetryOptions struct {
	// Times is the number of attempts to make before giving up, the default is 5.
	Times int
	// Interval is the time to wait between retries in milliseconds, the default is 0.
	Interval int
	// IntervalFunc is the function to calculate the time to wait between retries in milliseconds, it
	// accepts an int value to indicate the retry count.
	IntervalFunc func(int) int
	// ErrorFilter is a function that is invoked on an error result. Retry will continue the retry
	// attempts if it returns true, and it will abort the workflow and return the current attempt's
	// result and error if it returns false.
	ErrorFilter func(error) bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL