promise

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2023 License: Apache-2.0 Imports: 4 Imported by: 0

README

go-typed-promise

A type-safe generic promise library for Go 1.19.

Why?

Why

Go has channels and goroutines, which are great concurrency tools. You may not need a promise library at all. But introduction of generics makes it possible to play with some more abstractions on top of channels.

One useful abstraction is a promise. You have a task that you want to run in a separate goroutine, and you want to reuse the result of the task.

A use case

Here is our task:

  • Make a request to a remote server
  • Then save the result to a database and publish to a queue at the same time (each of these tasks are running in separate goroutines).
  • If one of the operations fail stop the task.
  • The whole operation shouldn't take more than 1 second, so set a timeout.

One might think you wouldn't run http call in a goroutine at all but let's think that you will need to run this flow n times in parallel later.

Mock functions for our tasks (uncomment the time.Sleep to see the timeout works):

// make http request
func httpCall() (string, error) {
    //time.Sleep(2 * time.Second)
    return "Hello World", nil
}

// publish message to a queue
func publishMessage(_ string) (string, error) {
    //time.Sleep(2 * time.Second)
    return "success", nil
}

// save to database
func saveToDB(_ string) (string, error) {
    //time.Sleep(2 * time.Second)
    return "success", nil
}
Implementing with errgroup and promise
Click to see implementation with errgroup
package main

import (
    "context"
    "fmt"
    "time"

    "golang.org/x/sync/errgroup"
)

func main() {
	timeoutCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()
	g, ctx := errgroup.WithContext(timeoutCtx)

	httpResChan := make(chan string, 2)
	publishChan := make(chan string)
	saveChan := make(chan string)

	// make http request
	g.Go(func() error {
		defer close(httpResChan)

		internalResChan := make(chan string)
		internalErrChan := make(chan error)

		// make http request and send to internal channels so that it can timeout and if result is available sent to other channels
		go func() {
			defer close(internalResChan)
			defer close(internalErrChan)

			res, err := httpCall()
			if err != nil {
				internalErrChan <- err
				return
			}
			internalResChan <- res
		}()

		select {
		case <-ctx.Done():
			return ctx.Err()
		case res := <-internalResChan:
			httpResChan <- res
			httpResChan <- res // broadcast
			return nil
		case err := <-internalErrChan:
			return err
		}
	})

	// use this function to publish message and save to db, they both receive the http result and pretty much do the same thing
	runWithHttpRes := func(out chan<- string, task func(string) (string, error)) {
		// publish message to a queue
		g.Go(func() error {
			defer close(out)

			internalResChan := make(chan string)
			internalErrChan := make(chan error)

			go func() {
				defer close(internalResChan)
				defer close(internalErrChan)

				res, err := task(<-httpResChan)
				if err != nil {
					internalErrChan <- err
					return
				}
				internalResChan <- res
			}()

			select {
			case <-ctx.Done():
				return ctx.Err()
			case res := <-internalResChan:
				out <- res
				return nil
			case err := <-internalErrChan:
				return err
			}
		})
	}

	// publish message to a queue
	runWithHttpRes(publishChan, publishMessage)
	// save to database
	runWithHttpRes(saveChan, saveToDB)

	results := make([]string, 2)

	// collect results
	g.Go(func() error {
		var counter int32
		for counter < 2 {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case publishRes := <-publishChan:
				counter++
				results[0] = publishRes
				publishChan = nil
			case saveRes := <-saveChan:
				counter++
				results[1] = saveRes
				saveChan = nil
			}
		}
		return nil
	})

	// wait for all goroutines to finish.
	if err := g.Wait(); err != nil {
		fmt.Println(err)
	} else {
		fmt.Println(results)
	}
}
Click to see implementation using promise
func main() {
    ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
    defer cancel()

    httpCallPromise, _ := promise.New(ctx, httpCall)

    publishMessagePromise := httpCallPromise.Map(func (data string, httpErr error) (string, error) {
        if httpErr != nil {
            return "", httpErr
        }
        return publishMessage(data)
    })

    saveToDBPromise := httpCallPromise.Map(func (data string, httpErr error) (string, error) {
        if httpErr != nil {
            return "", httpErr
        }
        return saveToDB(data)
    })

    resultPromise, _ := promise.All(ctx, publishMessagePromise, saveToDBPromise)
    result, err := resultPromise.Await()
    if err != nil {
        fmt.Println(err)
    }

    fmt.Println(result)
}

Installation

It is still experimental and not intended to be used in production.

go get github.com/ulasakdeniz/go-typed-promise

Usage

Errors are ignored in the examples below for simplicity.

Creating a promise

Create a promise with New function. It is a generic function that takes a context.Context and a func() (T, error) as parameters. The function returns a *Promise[T].

package main

import (
	"context"
	"fmt"
	"time"
	"github.com/ulasakdeniz/go-typed-promise"
)

func main() {
	// the type of p is Promise[string]
	p, err := promise.New(context.TODO(), func() (string, error) {
		// simulate a long running task that makes a network call
		time.Sleep(1 * time.Second)
		return "Hello", nil
	})

	// err is not nil if an error occurs while creating the promise
	if err != nil {
		// handle error
	}

	// await the result
	result, err := p.Await()
	if err != nil {
		// handle error
	}

	fmt.Println(result) // prints "Hello"
}

Using Await

Await function returns the result of the promise. Calling Await multiple times will return the same result. The promise will be resolved only once. Note that Await is a blocking call. It waits until the promise is resolved.

package main

import (
	"context"
	"fmt"
	"io"
	"net/http"

	"github.com/ulasakdeniz/go-typed-promise"
)

func main() {
    ipPromise, _ := promise.New(context.TODO(), func() (string, error) {
		res, _ := http.Get("https://api.ipify.org")
		ip, _ := io.ReadAll(res.Body)
		return string(ip), nil
	})

	ip, err := ipPromise.Await()
	if err != nil {
		fmt.Println("error", err)
	}

	fmt.Println("ip", ip)
	// Output: ip <YOUR_IP>
}
Creating a promise with a timeout

A context.Context is used to create a promise with a timeout. If the promise is not resolved within the timeout, the promise will be rejected with an error.

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/ulasakdeniz/go-typed-promise"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	timeoutPromise, _ := promise.New(ctx, func() (string, error) {
		time.Sleep(2 * time.Second)
		return "Hello", nil
	})

	_, timeoutErr := timeoutPromise.Await()
	fmt.Println(timeoutErr)
	// Output: context error while waiting for promise: context deadline exceeded
}
Chaining promises

Promises can be chained with Map function. It takes a func(T) (T, error) as a parameter. The function returns a Promise[T] which is a new promise created from the result of the previous promise.

Note: Because of Golang generics limitations, you cannot change the type of the promise with Map. If you want to change the type of the promise, you can use promise.FromPromise function.

Using Promise.All

Promise.All takes a number of promises and returns a promise that resolves when all the promises in the slice are resolved. The result of the promise is a slice of the results of the promises.

Using Promise.Any

Promise.Any takes a number of promises and returns a promise that resolves when a promise successfully completes. The result of the promise is the result of the first promise that completes. If all the promises fail, the promise will have error.

Callbacks: OnComplete, OnSuccess, OnFailure

OnComplete is called when the promise is resolved or rejected. OnSuccess is called when the promise is resolved. OnFailure is called when the promise is rejected.

Piping promises to channels

ToChannel function pipes the result of the promise to a channel.

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/ulasakdeniz/go-typed-promise"
)

func main() {
	stringChannel := make(chan string)
	errChannel := make(chan error)
	p, _ := promise.New(context.TODO(), func() (string, error) {
		time.Sleep(1 * time.Second)
		return "Hello", nil
	})

	p.ToChannel(stringChannel, errChannel)

    fmt.Println(<-stringChannel) // Output: "Hello"
}

If the promise results in error, the error will be sent to the error channel.

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/ulasakdeniz/go-typed-promise"
)

func main() {
    stringChannel := make(chan string)
    errChannel := make(chan error)
    p, _ := promise.New(context.TODO(), func() (string, error) {
        time.Sleep(1 * time.Second)
        return "", fmt.Errorf("error")
    })

    p.ToChannel(stringChannel, errChannel)

    fmt.Println(<-errChannel)
	// Output: "error"
}

Documentation

Index

Constants

View Source
const (
	ContextError = "context error while waiting for promise: %s"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Promise

type Promise[T any] struct {
	// contains filtered or unexported fields
}

Promise is a wrapper for a task that will be executed asynchronously.

func All

func All[T any](ctx context.Context, promises ...*Promise[T]) (*Promise[[]T], error)

All collects all results from a list of promises and returns them in a slice of Promise.

func Any

func Any[T any](ctx context.Context, promises ...*Promise[T]) (*Promise[T], error)

Any returns a promise that is completed with the first successful result of the given promises. If all promises fail, the returned promise is failed with an error.

func Completed

func Completed[T any](value T) *Promise[T]

Completed returns a promise that is already completed with the given value.

func Failed

func Failed[T any](err error) *Promise[T]

Failed returns a promise that is already completed with the given error.

func FromPromise

func FromPromise[T any, R any](promise *Promise[T], mapper func(T, error) (R, error)) (*Promise[R], error)

FromPromise maps the result of the given promise to a new promise.

func New

func New[T any](ctx context.Context, task func() (T, error)) (*Promise[T], error)

New returns a new Promise[T] for the given task.

func (*Promise[T]) Await

func (p *Promise[T]) Await() (T, error)

Await waits for the promise to complete and returns the result or error.

func (*Promise[T]) IsCompleted

func (p *Promise[T]) IsCompleted() bool

IsCompleted returns true if the promise is completed.

func (*Promise[T]) Map

func (p *Promise[T]) Map(f func(T, error) (T, error)) *Promise[T]

Map maps the result of the promise to a new value.

func (*Promise[T]) OnComplete

func (p *Promise[T]) OnComplete(success func(T), failure func(error))

OnComplete is a callback function that is called when the promise is completed.

func (*Promise[T]) OnFailure

func (p *Promise[T]) OnFailure(failure func(error))

OnFailure is a callback function that is called when the promise is failed.

func (*Promise[T]) OnSuccess

func (p *Promise[T]) OnSuccess(success func(T))

OnSuccess is a callback function that is called when the promise is completed successfully.

func (*Promise[T]) ToChannel

func (p *Promise[T]) ToChannel(success chan<- T, failure chan<- error)

ToChannel sends the result or error of the promise to the given channels.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL