xprocess

package module
v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2021 License: MIT Imports: 14 Imported by: 9

README

xprocess

一个通用的机制去管理goroutine的生命周期

  1. 通过xprocess.Go替换原生的直接启动goroutine的方式
  2. 通过context传递, 管理goroutine的生命周期
  3. GoLoop是对goroutine中有for的封装和替换
  4. Group是一个WaitGroup的wrap,不过增加了goroutine统计和可退出机制
  5. Stack会得到内存中所有在运行的goroutine, 以及数量

Examples

fmt.Println(xprocess.Stack())
cancel := xprocess.Go(func(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            time.Sleep(time.Millisecond * 100)
            fmt.Println("g1")
        }
    }
})

time.Sleep(time.Second)
fmt.Println(xprocess.Stack())
fmt.Println(cancel())
time.Sleep(time.Second)
fmt.Println(xprocess.Stack())
fmt.Println(xprocess.Stack())
for {
    xprocess.Go(func(ctx context.Context) error {
        time.Sleep(time.Second)
        fmt.Println("g2")
        return ctx.Err()
    })
    xprocess.GoLoop(func(ctx context.Context) error {
        time.Sleep(time.Second)
        fmt.Println("g3")
        return ctx.Err()
    })

    g := xprocess.NewGroup()
    g.Go(func(ctx context.Context) error {
        fmt.Println("g4")
        return nil
    })
    g.Go(func(ctx context.Context) error {
        fmt.Println("g5")
        return nil
    })
    g.Go(func(ctx context.Context) error {
        fmt.Println("g6")
        return xerror.Fmt("test error")
    })
    g.Wait()
    fmt.Println(g.Err())

    g.Cancel()

    fmt.Println(xprocess.Stack())
    time.Sleep(time.Second)
}
Timeout
func TestTimeout(t *testing.T) {
	err := xprocess.Timeout(time.Second, func(ctx context.Context) error {
		time.Sleep(time.Millisecond * 990)
		return nil
	})
	assert.Nil(t, err)

	err = xprocess.Timeout(time.Second, func(ctx context.Context) error {
		time.Sleep(time.Second + time.Millisecond*10)
		return nil
	})
	assert.NotNil(t, err)
}
Future

async,await,yield

package xprocess

import (
	"fmt"
	"net/http"
	"testing"

	"github.com/pubgo/xerror"
)

func handleReq(i int) Value {
	fmt.Println("url", i)
	return Async(http.Get, "https://www.cnblogs.com")
}

func getData() IFuture {
	return Future(func(y Yield) {
		for i := 10; i > 0; i-- {
			i := i
			if i <= 3 {
				return
			}

			y.Await(handleReq(i), func(resp *http.Response, err error) (*http.Response, error) {
				xerror.Panic(err)

				resp.Header.Add("test", "11111")
				return resp, err
			})

			y.Yield(Async(http.Get, "https://www.cnblogs.com"))
		}
	})
}

func handleData() IFuture {
	return Future(func(y Yield) {
		getData().Value(func(resp *http.Response, err error) {
			y.Yield(resp.Header)
		})
	})
}

func TestStream(t *testing.T) {
	handleData().Value(func(head http.Header) {
		fmt.Println("dt", head)
	})
}

func TestAsync(t *testing.T) {
	val1 := handleReq(1)
	val2 := handleReq(2)
	val3 := handleReq(3)
	val4 := handleReq(4)

	fmt.Printf("%#v, %#v, %#v, %#v\n", val1.Get(), val2.Get(), val3.Get(), val4.Get())
}

func TestGetData(t *testing.T) {
	getData().Value(func(resp *http.Response, err error) {
		fmt.Println(resp)
	})
}

func handleData2() IFuture {
	return Future(func(y Yield) {
		for i := 10; i > 0; i-- {
			y.Yield(i)
		}
	})
}

func TestName11w(t *testing.T) {
	handleData2().Value(func(i int) {
		fmt.Println(i)
	})
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Async added in v0.1.1

func Async(fn interface{}, args ...interface{}) xprocess_abc.FutureValue

func Await added in v0.1.1

func Await(val xprocess_abc.FutureValue, fn interface{}) xprocess_abc.FutureValue

func Break added in v0.0.10

func Break()

func CostWith added in v0.1.3

func CostWith(fn func()) time.Duration

func Count added in v0.1.3

func Count(n int) <-chan int

func Go

func Go(fn func(ctx context.Context)) context.CancelFunc

Go 启动一个goroutine

func GoDelay added in v0.0.9

func GoDelay(dur time.Duration, fn func()) error

GoDelay 延迟goroutine

func GoLoop

func GoLoop(fn func(ctx context.Context)) context.CancelFunc

GoLoop 启动一个goroutine loop 是为了替换 `go func() {for{ }}()` 这类的代码

func MemStatsPrint added in v0.1.3

func MemStatsPrint()

func NewEvent added in v0.1.2

func NewEvent() *xprocess_event.Event

func NewGroup

func NewGroup(c ...uint16) *xprocess_group.Group

func Promise added in v0.1.1

func Promise(fn func(g xprocess_abc.Future)) xprocess_abc.IPromise

func Test added in v0.1.3

func Test(fn interface{}) *xprocess_test.Test

func Tick added in v0.1.3

func Tick(args ...interface{}) <-chan time.Time

Tick 简单定时器 Example: Tick(100, time.Second)

func Timeout added in v0.0.3

func Timeout(dur time.Duration, fn func(ctx context.Context)) error

Timeout 执行超时函数, 超时后, 函数自动退出

func Try added in v0.1.3

func Try(fn func()) error

Try try wrap

Types

type FutureValue added in v0.1.1

type FutureValue = xprocess_abc.FutureValue

type IPromise added in v0.1.1

type IPromise = xprocess_abc.IPromise

type Value added in v0.1.4

type Value = xprocess_abc.Value

type WaitGroup added in v0.1.1

type WaitGroup = xprocess_waitgroup.WaitGroup

func NewWaitGroup added in v0.1.2

func NewWaitGroup(check bool, c ...uint16) WaitGroup

Directories

Path Synopsis
Package gorilla/schema fills a struct with form values.
Package gorilla/schema fills a struct with form values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL