async

package
v0.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2023 License: MIT Imports: 4 Imported by: 1

Documentation

Overview

Package async provides functions to run functions in other goroutines and wait for their results.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Queue deprecated added in v0.3.4

func Queue[T any](callback func(data T) (fin bool)) func(data T)

Queue processes data sequentially by the given callback function that prevents concurrency conflicts, it returns a new function that pushes data into the queue.

The callback function returns a boolean value indicates whether the queue has finished, once true, the internal channel will be closed and no more data shall be pushed.

Deprecated: use `goext.Queue` instead.

Example
package main

import (
	"fmt"

	"github.com/ayonli/goext/async"
)

func main() {
	out := make(chan []string)
	list := []string{}
	push := async.Queue(func(str string) (fin bool) {
		list = append(list, str)
		fin = len(list) == 2

		if fin {
			// The order of the `list` is not stable, but we can guarantee that all two strings have
			// been appended and stored to the `list`.
			//
			// Without concurrency control, we may end up `list` only has one item left or
			// len(list) == 2 but when we trying to print or send it, it becomes 1.
			out <- list
		}

		return fin
	})

	go func() {
		push("foo")
	}()

	go func() {
		push("bar")
	}()

	fmt.Println(len(<-out))
}
Output:
2

func Wait

func Wait[R any](fn func() (R, error)) (R, error)

Wait runs the given function in another goroutine and waits its return value.

Example
package main

import (
	"fmt"

	"github.com/ayonli/goext/async"
)

func main() {
	res, err := async.Wait(func() (string, error) {
		// this function runs in another goroutine
		return "Hello, World!", nil
	})
	fmt.Println(res)
	fmt.Println(err)
}
Output:
Hello, World!
<nil>
Example (Error)
package main

import (
	"errors"
	"fmt"

	"github.com/ayonli/goext/async"
)

func main() {
	res, err := async.Wait(func() (string, error) {
		// this function runs in another goroutine
		return "", errors.New("something went wrong")
	})
	fmt.Println(res)
	fmt.Println(err)
}
Output:

something went wrong

func WaitAfter added in v0.3.5

func WaitAfter[R any](fn func() (R, error), duration time.Duration) (R, error)

WaitAfter runs the given function in another goroutine and returns its result only after the given duration.

Example
package main

import (
	"fmt"
	"time"

	"github.com/ayonli/goext/async"
)

func main() {
	start := time.Now()
	res, err := async.WaitAfter(func() (string, error) {
		return "Hello, World!", nil
	}, time.Millisecond*10)

	fmt.Println(res)
	fmt.Println(err)
	fmt.Println(time.Since(start).Milliseconds() >= 10) // may exceed 10 due to context change
}
Output:
Hello, World!
<nil>
true

func WaitAll

func WaitAll[F func() (R, error), R any](fns ...F) ([]R, error)

WaitAll runs a series of functions in different goroutines and wait for all return successfully or anyone fails.

If all functions succeed (returned without error), all the results will be grouped in a single slice and ordered accordingly.

Example
package main

import (
	"fmt"
	"time"

	"github.com/ayonli/goext/async"
)

func main() {
	results, err := async.WaitAll(func() (string, error) {
		time.Sleep(time.Microsecond * 1)
		return "Hello, World!", nil
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 10)
		return "Hi, World!", nil
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 20)
		return "你好,世界!", nil
	})

	fmt.Println(results)
	fmt.Println(err)
}
Output:
[Hello, World! Hi, World! 你好,世界!]
<nil>
Example (Error)
package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/ayonli/goext/async"
)

func main() {
	results, err := async.WaitAll(func() (string, error) {
		time.Sleep(time.Microsecond * 20)
		return "Hello, World!", nil
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 10)
		return "Hi, World!", nil
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 1)
		return "你好,世界!", errors.New("something went wrong")
	})

	fmt.Printf("%#v\n", results)
	fmt.Println(err)
}
Output:
[]string(nil)
something went wrong

func WaitAny

func WaitAny[F func() (R, error), R any](fns ...F) (R, []error)

WaitAny runs a series of functions in different goroutines and wait for anyone returns successfully without error.

If all functions failed (returned with error), all the errors will be grouped in a single slice and ordered accordingly.

Example
package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/ayonli/goext/async"
)

func main() {
	res, errors := async.WaitAny(func() (string, error) {
		time.Sleep(time.Microsecond * 1)
		return "Hello, World!", nil
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 10)
		return "Hi, World!", nil
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 20)
		return "", errors.New("something went wrong")
	})

	fmt.Println(res)
	fmt.Printf("%#v\n", errors)
}
Output:
Hello, World!
[]error(nil)
Example (Error)
package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/ayonli/goext/async"
)

func main() {
	res, errors := async.WaitAny(func() (string, error) {
		time.Sleep(time.Microsecond * 20)
		return "", errors.New("something went wrong")
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 10)
		return "", errors.New("something went wrong")
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 1)
		return "", errors.New("something went wrong")
	})

	fmt.Printf("%#v\n", res)
	fmt.Println(errors)
}
Output:
""
[something went wrong something went wrong something went wrong]

func WaitRace

func WaitRace[F func() (R, error), R any](fns ...F) (R, error)

WaitRace runs a series of functions in different goroutines and wait for anyone returns, either with or without error.

Example
package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/ayonli/goext/async"
)

func main() {
	res, err := async.WaitRace(func() (string, error) {
		time.Sleep(time.Microsecond * 1)
		return "Hello, World!", nil
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 15)
		return "Hi, World!", nil
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 30)
		return "", errors.New("something went wrong")
	})

	fmt.Println(res)
	fmt.Printf("%#v\n", err)
}
Output:
Hello, World!
<nil>
Example (Error)
package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/ayonli/goext/async"
)

func main() {
	res, errors := async.WaitRace(func() (string, error) {
		time.Sleep(time.Microsecond * 30)
		return "Hello, World!", nil
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 15)
		return "Hi, World!", nil
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 1)
		return "", errors.New("something went wrong")
	})

	fmt.Printf("%#v\n", res)
	fmt.Println(errors)
}
Output:
""
something went wrong

func WaitTimeout added in v0.3.5

func WaitTimeout[R any](fn func() (R, error), duration time.Duration) (R, error)

WaitTimeout runs the given function in another goroutine and shall return its result before the timeout limit.

Example
package main

import (
	"fmt"
	"time"

	"github.com/ayonli/goext/async"
)

func main() {
	res1, err1 := async.WaitTimeout(func() (string, error) {
		return "Hello, World!", nil
	}, time.Millisecond*10)
	res2, err2 := async.WaitTimeout(func() (string, error) {
		time.Sleep(time.Millisecond * 20)
		return "Hello, World!", nil
	}, time.Millisecond*10)

	fmt.Println(res1)
	fmt.Println(err1)
	fmt.Printf("%#v\n", res2)
	fmt.Println(err2)
}
Output:
Hello, World!
<nil>
""
context deadline exceeded

func WaitUntil added in v0.4.0

func WaitUntil(test func() bool)

Blocks the context until the test is passed.

Example
package main

import (
	"fmt"

	"github.com/ayonli/goext/async"
)

func main() {
	result := 0
	async.WaitUntil(func() bool {
		result++
		return result == 10
	})
	fmt.Println(result)
}
Output:
10

Types

type AsyncTask added in v0.4.0

type AsyncTask[T any] struct {
	// contains filtered or unexported fields
}

AsyncTask represents the eventual completion (or failure) of an asynchronous operation and its resulting value.

Unlike channel, whose value can be consumed only once, AsyncTask caches the result so that it can be retrieved as many times as we want.

Example
package main

import (
	"fmt"

	"github.com/ayonli/goext/async"
)

func main() {
	task := &async.AsyncTask[string]{}

	go func() {
		task.Resolve("Hello, World!")
	}()

	res, _ := task.Result()

	fmt.Println(res)
}
Output:
Hello, World!

func (*AsyncTask[T]) Reject added in v0.4.0

func (task *AsyncTask[T]) Reject(err error)

Reject settles the task with a failure reason.

Example
package main

import (
	"errors"
	"fmt"

	"github.com/ayonli/goext/async"
)

func main() {
	task := &async.AsyncTask[string]{}

	go func() {
		task.Reject(errors.New("something went wrong"))

		// Resolve and Reject can only be called once and once one of them has been called,
		// calling the other will not effect.
		task.Resolve("Hello, World!")
	}()

	res, err := task.Result()

	fmt.Printf("%#v\n", res)
	fmt.Println(err)
}
Output:
""
something went wrong

func (*AsyncTask[T]) Resolve added in v0.4.0

func (task *AsyncTask[T]) Resolve(value T)

Resolve settles the task successfully with a given value.

Example
package main

import (
	"errors"
	"fmt"

	"github.com/ayonli/goext/async"
)

func main() {
	task := &async.AsyncTask[string]{}

	go func() {
		task.Resolve("Hello, World!")

		// Resolve and Reject can only be called once and once one of them has been called,
		// calling the other will not effect.
		task.Reject(errors.New("something went wrong"))
	}()

	res, err := task.Result()

	fmt.Println(res)
	fmt.Println(err)
}
Output:
Hello, World!
<nil>

func (*AsyncTask[T]) Result added in v0.4.0

func (task *AsyncTask[T]) Result() (T, error)

Result returns the result of the task.

Successive calling this function returns the same result.

Example
package main

import (
	"fmt"

	"github.com/ayonli/goext/async"
)

func main() {
	task := &async.AsyncTask[string]{}

	go func() {
		task.Resolve("Hello, World!")
	}()

	res1, _ := task.Result()
	res2, _ := task.Result() // successive calls returns the same result

	fmt.Println(res1)
	fmt.Println(res2)
}
Output:
Hello, World!
Hello, World!

type WaitResult

type WaitResult[R any] struct {
	Value R
	Error error
}

WaitResult represents a single result of the function passed to the `Wait` family functions.

func WaitAllSettled

func WaitAllSettled[F func() (R, error), R any](fns ...F) []WaitResult[R]

WaitAllSettled runs a series of functions in different goroutines and wait for all of them are settled, all the results are grouped in a single slice and ordered accordingly.

Example
package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/ayonli/goext/async"
)

func main() {
	results := async.WaitAllSettled(func() (string, error) {
		time.Sleep(time.Microsecond * 1)
		return "Hello, World!", nil
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 10)
		return "Hi, World!", nil
	}, func() (string, error) {
		time.Sleep(time.Microsecond * 20)
		return "", errors.New("something went wrong")
	})

	for _, result := range results {
		if result.Error != nil {
			fmt.Println(result.Error)
		} else {
			fmt.Println(result.Value)
		}
	}
}
Output:
Hello, World!
Hi, World!
something went wrong

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL