goasync

package module
v2.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2021 License: MIT Imports: 0 Imported by: 0

README

goasync

PkgGoDev GoReport GoLang .github/workflows/main.yml semantic-release Conventional Commits KeepAChangelog License

Package goasync is a helper framework for writing asynchronous code in go. It's primary goal is to reduce the amount of boiler plate code one has to write to do concurrent tasks.

Looking for v1, see the master branch

Quick Start

go get -u github.com/brad-jones/goasync/v2/...

package main

import (
	"fmt"
	"time"

	"github.com/brad-jones/goasync/v2/await"
	"github.com/brad-jones/goasync/v2/task"
)

func doSomeWorkAsync() *task.Task {
	return task.New(func(t *task.Internal) {
		time.Sleep(1 * time.Second)
		fmt.Println("doing work")
	})
}

func main() {
	start := time.Now()
	fmt.Println("START", start)

	task1 := doSomeWorkAsync()
	task2 := doSomeWorkAsync()
	await.All(task1, task2)

	fmt.Println("END", time.Since(start))
}

Running the above will output something similar to:

START 2020-09-11 18:44:14.2406928 +1000 AEST m=+0.003027901
doing work
doing work
END 1.0013651s

Also see further working examples under: https://github.com/brad-jones/goasync/tree/v2/examples

Documentation

Overview

Package goasync is a helper framework for writing asynchronous code in go. It's primary goal is to reduce the amount of boilier plate code one has to write to do concurrent tasks.

TLDR: Don't care about my journey through golang's concurrency model, just want to know how this library works, skip down to The Task API.

Prior Art

https://github.com/chebyrash/promise

https://github.com/fanliao/go-promise

https://github.com/capitalone/go-future-context

https://github.com/rafaeldias/async

Other Reading

http://www.golangpatterns.info/concurrency/futures

https://stackoverflow.com/questions/35926173/implementing-promise-with-channels-in-go

https://medium.com/strava-engineering/futures-promises-in-the-land-of-golang-1453f4807945

https://www.reddit.com/r/golang/comments/3qhgsr/futurespromises_in_the_land_of_golang

Async Functions in Go

Sure you can prefix any function call with `go` and it will run asynchronously but this isn't always the full story, more often than not you have to deal with the results of that function call, including any error handling strategy and at the very least some sort of method of ensuring it actually runs to completion not to mention cancelation.

What is a pipeline?

"Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function."

https://blog.golang.org/pipelines

If you are writting some sort of server/service (which is what go is really awesome at doing) where you might have the same async pipeline run per request. Then you can construct all of this once and it's not really a major issue.

But when you need to create many different pipelines, or pipelines that have a stage that run many different functions in goroutines then a different solution is required. This is what I consider to be the basic async function:

func fooAsync(bar <-chan string) (<-chan string, <-chan error) {
	resolver := make(chan string, 1)
	rejector := make(chan error, 1)
	go func() {
		v, err := doSomething(<-bar)
		if err != nil {
			rejector <- err
		} else {
			resolver <- v
		}
	}()
	return resolver, rejector
}

Ok so lets pull that apart:

* Lets say it's idiomatic to add the `Async` suffix to any function name, much like you might prefix a function name with `Must` to denote it might panic. This follows other languages such as C#.

* Any inputs to an async function must be channels, this is so that when that input is returned from another async function that channel can just be passed in and read from the goroutine, not blocking anyone else.

* Similarly all returned values from an async function must be channels. Effectively we have what might be similar to a Javascript promise but broken into 2 parts, the resolver and the rejector.

* Resolvers and rejectors should be buffered such that they can be executed before anything has been setup to read the channels.

* A resolver or rejector will only ever send a single value to the channel.

* A resolver or rejector can only be read once. A simple trick to "tee" a channel:

ch2 := make(chan string, 1)
ch3 := make(chan string, 1)
go func() {
	v := <-ch1
	ch2 <- v
	ch3 <- v
}()

Adding Cancelation

func fooAsync(bar <-chan string) (resolver <-chan string, rejector <-chan error, stopper chan<- struct{}) {
	resolver = make(chan string, 1)
	rejector = make(chan error, 1)
	stopper = make(chan struct{}, 1)
	go func() {
		for {
			select {
			case <-stopper:
				return
			case <-time.After(5 * time.Second):
				resolver <- value
				return
			default:
				v, err := doSomething(<-bar)
				if err != nil {
					rejector <- err
				} else {
					value = value + v
				}
			}
		}
	}()
	return resolver, rejector, stopper
}

* So we have just written 24 lines of code and only one of them actually does anything of any importance.

* While there are some cases where having the resolver, rejector & stopper separate makes sense more often than not it gets hard to keep track of the pipeline with so many variables. What if fooAsync above wanted to see the error of bar, you would have to pass that in too.

* Why not use context.Context? Plenty of reading about that https://dave.cheney.net/2017/08/20/context-isnt-for-cancellation

Awaiting in Go

Your friend is `select`, basically consider it be the replacement for the keyword `await` used in other languages.

resolver, rejector := barAsync()
select {
case v := <-resolver:
	fmt.Println("we got an value", v)
case err := <-rejector:
	fmt.Println("we got an error", err)
}

Await Any: This is how you might await for "any" of the results from a collection of async calls.

resolver1, rejector1 := fooAsync()
resolver2, rejector2 := barAsync()
resolver3, rejector3 := bazAsync()

var value string
var err error
select {
case v := <-resolver1:
	value = v
case v := <-resolver2:
	value = v
case v := <-resolver3:
	value = v
case e := <-rejector1:
	err = e
case e := <-rejector2:
	err = e
case e := <-rejector3:
	err = e
}

if err != nil {
	panic(err)
}
fmt.Println("we got a value", value)

Await All: This is how you might await for "all" of the results from a collection of async calls.

resolver1, rejector1 := fooAsync()
resolver2, rejector2 := barAsync()
resolver3, rejector3 := bazAsync()

values := []string{}
errs := []error{}
for i := 0; i < 3; i++ { // no this should not be 6
	select {
	case v := <-resolver1:
		values = append(values, v)
	case v := <-resolver2:
		values = append(values, v)
	case v := <-resolver3:
		values = append(values, v)
	case e := <-rejector1:
		errs = append(errs, e)
	case e := <-rejector2:
		errs = append(errs, e)
	case e := <-rejector3:
		errs = append(errs, e)
	}
}

if len(errs) > 0 {
	for _, err := range errs {
		fmt.Println("oh no an error", err)
	}
	panic("we got errors")
}
for _, value := range values {
	fmt.Println("we got a value", value)
}

Chaining Async Functions into a Pipeline

Normally you will want to chain additional actions to take place, in the fastest way possible. Usually those functions will consume the outputs from the previous functions. So you can just pass a channel into the next function and it will wait on the channel inside it's own goroutine.

aCh := make(chan string, 1)
aCh <- "aFile"
aTag, aErr := BuildDockerImageAsync(aCh)

bCh := make(chan string, 1)
bCh <- "bFile"
bTag, bErr := BuildDockerImageAsync(bCh)

cCh := make(chan string, 1)
cCh <- "cFile"
cTag, cErr := BuildDockerImageAsync(cCh)

aPubDone, aPubErr := PublishDockerImageAsync(aTag)
bPubDone, bPubErr := PublishDockerImageAsync(bTag)
cPubDone, cPubErr := PublishDockerImageAsync(cTag)

aDeployDone, aDeployErr := DeployDockerImageAsync(aPubDone)
bDeployDone, bDeployErr := DeployDockerImageAsync(bPubDone)
cDeployDone, cDeployErr := DeployDockerImageAsync(cPubDone)

done := make(chan struct{}, 1)
go func(){
	defer close(done)
	for i := 0; i < 3; i++ {
		select {
			case <-aDeployDone:
			case <-bDeployDone:
			case <-cDeployDone:
		}
	}
}()

var err error
select {
case <-done:
case e := <-aErr:
	err = e
case e := <-bErr:
	err = e
case e := <-cErr:
	err = e
case e := <-aPubErr:
	err = e
case e := <-bPubErr:
	err = e
case e := <-cPubErr:
	err = e
case e := <-aDeployErr:
	err = e
case e := <-bDeployErr:
	err = e
case e := <-cDeployErr:
	err = e
}
if err != nil {
	panic(err)
}

Just write a Synchronous API and let the consumer use it Asynchronously if they want

My issue with this is the boilerplate code one has to write to do this. Whatever happened to DRY? I believe the methodologies outlined by this library strike a reasonable balance between writing idiomatic go and going insane repeating yourself everywhere.

Whats more this is just one way things can be done, if you need something more powerful for a particular use case then the full power of go's concurrency model is still there, this library doesn't take any of that away.

Up to this point all I have shown you is how to do some stuff with vanilla go and I hope that not only does it illustrate some of my frustrations with the language but also acts a useful reference to go back to when you need to do something more complex.

The Task API

Here is an example of `fooAsync` but this time it uses https://github.com/brad-jones/goasync/task

func fooAsync(bar *task.Task) *task.Task {
	return task.New(func(t *task.Internal){
		// normally you would call this at the start of any long running loop
		if t.ShouldStop() {
			return
		}

		res, err := bar.Result()
		if err != nil {
			t.Reject(e) // or you could do something based on the error
			return
		}

		v, err := intToString(res.(int))
		if err != nil {
			t.Reject(e)
		} else {
			t.Resolve(v)
		}
	})
}

t := fooAsync(task.Resolved(1))
v, err := t.Result()
castedV := v.(string)

The Await API

Tasks can be awaited using https://github.com/brad-jones/goasync/await

values, errors := await.All(task1, task2, task3)
values, error := await.AllOrError(task1, task2, task3)
value, error := await.Any(task1, task2, task3)

The awaiters that return early (before all tasks are complete) such as Any will cooperatively stop the remaining tasks. So cancelation will happen automatically. If you do not care for cancelation and wish to have the awaiter return as soon as possible you may uses the `Fast` awaiters.

`Fast` awaiters will still ask any remaining tasks to cooperatively stop but they do so asynchronously.

value, error := await.FastAny(task1, task2, task3)

Or perhaps you might like to use a timeout.

value, error := await.AnyWithTimeout(5 * time.Second, task1, task2, task3)

Not Type Safe

Due to go's lack of generics the only sane why this package can be created is by the use of the `interface{}` type. This means that all values that are returned from a task's `Result()` method or an awaiter must be casted correctly by the caller.

Directories

Path Synopsis
Package await contains await helper functions for use with tasks.
Package await contains await helper functions for use with tasks.
examples
Package stop is a means of stopping many tasks in bulk.
Package stop is a means of stopping many tasks in bulk.
Package task is a asynchronous utility inspired by JS Promises & C# Tasks.
Package task is a asynchronous utility inspired by JS Promises & C# Tasks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL