routines

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2021 License: MPL-2.0 Imports: 6 Imported by: 0

README

some helpers to write common routines

GoDoc Go Report Card gopherbadger-tag-do-not-edit

Helpers here can:

  • Prevent your crawler from getting banned (RunAtleast(duration, task))
  • Running task repeatly in background (InfiniteLoop(task))
  • Retry task until first successful attempt (Retry(task))

and more.

Race conditions

Values returned in this library are thread-safe. However, thread-safety of external function is not covered.

Considering this example:

f := Recorded(yourFunc)

f is thread-safe iff yourFunc is thread-safe.

simple ws client as example

with gorilla/websocket

type ws struct {
    conn *websocket.Conn
    Ctrl routines.InfiniteLoopControl
}

func (w *ws) update() error {
    typ, buf, err := w.conn.ReadMessage()
    
    if err != nil {
        return err
    }
    
    switch typ {
    case websocket.PongMessage:
        log.Print("got Pong")
    case websocket.PingMessage:
        log.Print("got Ping")
    case websocket.TextMessage:
        log.Print("got Text: ", string(buf))
    default:
        log.Print("get Message: ", buf)
    }
    
    return nil
}

func (w *ws) ping() error {
    return w.conn.WriteControl(
		websocket.PingMessage,
		[]byte(`test`),
		time.Now().Add(time.Second),
	)
}

func New(url string) (ret *ws, err error) {
    conn, _, err := websocket.DefaultDialer.Dial(url, http.Header{})
    if err != nil {
        return
    }
    
    ret = &ws{ conn: conn }
    ret.Ctrl = routines.AnyErr(
        // send ping frame every 30s
        routines.InfiniteLoop(routines.RunAtLeast(30 * time.Second, ret.ping)),
        // handle messages
        routines.InfiniteLoop(ret.update),
    )
    return
}

func main() {
    cl, err := New("wss://example.com")
    if err != nil {
        log.Fatal("cannot connect: ", err)
    }
    defer cl.Ctrl.Cancel()
    
    for e := range cl.Ctrl.Err {
        log.Print("catched an error: ", e)
    }
}

License

Copyright Chung-Ping Jen ronmi.ren@gmail.com 2021-

MPL v2.0

Documentation

Overview

Package routines provides various tools for writing common routines.

Example
// our function
x := 0
f := func() error {
	// do some time-consume task like calling external api
	x++
	fmt.Printf("%d ", x)

	return nil
}

// - run the function 3 times
// - at least 50ms between two executions
f1 := OnceAtMost(50*time.Millisecond, f)
f1() // 1
f1() // 2
f1() // 3

// - run the function as many times as possible for 50ms
// - at least 10ms between two executions
//
// WARNING: THIS IS NOT A GOOD APPROACH IF f() RUNS FAST.
begin := time.Now()
f2 := OnceWithin(10*time.Millisecond, f)
for time.Since(begin) < 50*time.Millisecond {
	f2() // 4 5 6 7 8
}

// function always spend at least 10ms
RunAtLeast(10*time.Millisecond, f)() // 9
Output:

1 2 3 4 5 6 7 8 9

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrRunning = errors.New("StatefulFunc: function is running")

Functions

func IgnoreErr

func IgnoreErr(ch chan error)

IgnoreErr drops all errors in ch asynchronously

If you need it synchronously, just use "for range ch {}".

Example
f := Recorded(func(idx uint64) error {
	fmt.Println(idx)
	if idx <= 1 {
		// handles error here
		return errors.New("error")
	}
	return nil
})

// errors are handled in f(), no need to process again
IgnoreErr(Retry(f))

// wait til goroutines exit
time.Sleep(10 * time.Millisecond)
Output:

0
1
2

func OnceAtMost

func OnceAtMost(dur time.Duration, f func() error) func() error

OnceAtMost ensures every calls to f() is executed, but not too fast

It guarantees:

  • run the function as many times as you call
  • only one call is running at the same time
  • no more than once within the duration.

Time is recorded before calling real function, which means the duration includes function execution time.

Say you have a f() prints "test", and costs 0.1s each call

x := OnceAtMost(time.Second, f)
go x()
go x()
go x()

You'll see a "test" every second for 3 seconds.

If x() is not called in another goroutine, it acts much like RunAtLeast

x() // blocks 1s
x() // blocks 1s
Example (Async)
f := func() error {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("done")
	return nil
}
g := OnceAtMost(50*time.Millisecond, f)

go f() // you'll see message at 10ms
go f() // another message at 10ms
go g() // another message at 10ms
go g() // blocks 40ms then run f(), so message is printed at 60ms

// wait til goroutines return
time.Sleep(100 * time.Millisecond)
Output:

done
done
done
done
Example (Sync)
f := func() error {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("done")
	return nil
}
g := OnceAtMost(50*time.Millisecond, f)

f() // you'll see message at 10ms
f() // message at 20ms
g() // message at 30ms
g() // blocks 40ms then run f(), so message is printed at 80ms
Output:

done
done
done
done

func OnceSuccessAtMost

func OnceSuccessAtMost(dur time.Duration, f func() error) func() error

OnceSuccessAtMost is identical to OnceAtMost, but only successful call is ensured

Say you have a f() prints "test" no matter success or failed, and costs 0.1s each call

x := OnceAtMost(time.Second, f)
go x() // assumes it failed
go x() // assumes it failed
go x() // assumes it succeeded

You'll see:

  • a "test" at 0.1s
  • another "test" at 0.2s (0.1s after previous "test")
  • another "test" at 1.2s (1s after previous "test")

func OnceSuccessWithin

func OnceSuccessWithin(dur time.Duration, f func() error) func() error

OnceSuccessWithin is identical to OnceWithin, but only success call is ensured

Say you have a f() prints "test", and costs 0.5s each call

x := OnceSuccessWithin(time.Second, f)
go x() // f executed, assumes it failed
go x() // f is executed, assumes it succeeded, prints "test"
go x() // this should be ignored
time.Sleep(time.Second)
go x() // f is executed
Example
cnt := 0
f := func() error {
	cnt++
	if cnt == 1 {
		fmt.Println("failed")
		return errors.New("error")
	}
	fmt.Println("test")
	return nil
}

x := OnceSuccessWithin(50*time.Millisecond, f)
go x() // f executed, assumes it failed
go x() // f is executed, assumes it succeeded, prints "test"
go x() // this should be ignored
time.Sleep(50 * time.Millisecond)
go x() // f is executed

// wait til goroutines return
time.Sleep(50 * time.Millisecond)
Output:

failed
test
test

func OnceWithin

func OnceWithin(dur time.Duration, f func() error) func() error

OnceWithin is identical to OnceAtMost, but calls within duration are ignored

Say you have a f() prints "test", and costs 0.5s each call

x := OnceWithin(time.Second, f)
go x() // this should be executed and print "test"
go x() // this should be ignored
go x() // this should also be ignored
time.Sleep(time.Second)
go x() // this should be executed and print "test"
Example
f := func() error {
	fmt.Println("test")
	return nil
}

x := OnceWithin(50*time.Millisecond, f)
go x() // this should be executed and print "test"
go x() // this should be ignored
go x() // this should also be ignored
time.Sleep(50 * time.Millisecond)
go x() // this should be executed and print "test"

// wait til goroutines return
time.Sleep(50 * time.Millisecond)
Output:

test
test

func Recorded

func Recorded(f func(idx uint64) error) (ret func() error)

Recorded creates a function that remembers how many times it is called

for example:

x := Recorded(f)
x() // equals f(0)
x() // equals f(1)
x() // equals f(2)
Example
f := func(idx uint64) error {
	fmt.Println(idx)
	return nil
}

x := Recorded(f)
x() // equals f(0)
x() // equals f(1)
x() // equals f(2)
Output:

0
1
2

func Retry

func Retry(f func() error) (err chan error)

Retry runs f() until it returns nil

err is closed when f() returns nil

common usacase:

ch := Retry(RunAtLeast(time.Minute, f)) // retries every minute
idx := 1
for e := range ch {
    log.Printf("#%d attempt is failed: %v", idx, err)
    idx++
}
log.Print("#%d attempt is successfully done", idx)

WARNING: err is not buffered, so it won't execute before error in err is consumed

Example
f := Recorded(func(idx uint64) error {
	fmt.Println(idx)
	if idx <= 1 {
		return errors.New("error")
	}
	return nil
})

errs := make([]error, 0, 2)
for e := range Retry(f) {
	errs = append(errs, e)
}

for _, e := range errs {
	fmt.Println(e)
}
Output:

0
1
2
error
error

func RunAtLeast

func RunAtLeast(dur time.Duration, f func() error) func() error

RunAtLeast ensures the execution time is greater than the duration

It will blocked until dur is reached and f() is returned.

Example (LongTask)
crawler := func() error {
	// grab some web page, say it costs you 15ms
	time.Sleep(15 * time.Millisecond)
	fmt.Println("done")
	return nil
}

task := RunAtLeast(10*time.Millisecond, crawler)

task() // blocks 15ms
task() // blocks another 15ms
Output:

done
done
Example (ShortTask)
crawler := func() error {
	// grab some web page, say it costs you 1ms
	time.Sleep(time.Millisecond)
	fmt.Println("done")
	return nil
}

task := RunAtLeast(10*time.Millisecond, crawler)

task() // blocks 10ms
task() // blocks another 10ms
Output:

done
done

func RunFailedAtLeast

func RunFailedAtLeast(dur time.Duration, f func() error) func() error

RunFailedAtLeast is identical to RunAtLeast, but only failed call is ensured.

Say your f() needs 0.1s no matter success or failed:

x := RunSuccessAtLeast(time.Second, f)
x() // costs   1s if this attempt failed
x() // costs 0.1s if thst attempt succeeded
Example
cnt := 0
f := func() error {
	cnt++
	if cnt <= 1 || cnt >= 3 {
		fmt.Println("failed")
		return errors.New("failed")
	}
	fmt.Println("done")
	return nil
}

task := RunFailedAtLeast(10*time.Millisecond, f)

task() // blocks 10ms, returns an error
task() // does not block
task() // blocks 10ms, returns an error
Output:

failed
done
failed

func RunSuccessAtLeast

func RunSuccessAtLeast(dur time.Duration, f func() error) func() error

RunSuccessAtLeast is identical to RunAtLeast, but only successful call is ensured.

Say your f() needs 0.1s no matter success or failed:

x := RunSuccessAtLeast(time.Second, f)
x() // costs 0.1s if this attempt failed
x() // costs   1s if thst attempt succeeded
Example
cnt := 0
f := func() error {
	cnt++
	if cnt <= 1 {
		fmt.Println("failed")
		return errors.New("failed")
	}
	fmt.Println("done")
	return nil
}

task := RunSuccessAtLeast(10*time.Millisecond, f)

task() // does not block, returns an error
task() // blocks 10ms
task() // blocks 10ms
Output:

failed
done
done

func TilErr added in v0.0.2

func TilErr(funcs ...func() error) (err error)

TilErr iterates funcs until first error

It guarantees f in funcs is executed one by one in order, and stops at first error. See example for common use case.

Example
// say you have to initialize following resources in order
//
//   - db connection
//   - redis connection
//   - prefill data into redis
//
// instead using an init() to run them all, it's better to write
initDB := func() error {
	fmt.Println("db")
	return nil
}
initRedis := func() error {
	fmt.Println("redis")
	return errors.New("cannot connect to redis")
}
prefill := func() error {
	fmt.Println("prefill")
	return nil
}

err := TilErr(initDB, initRedis, prefill)
if err != nil {
	fmt.Println("an error occurred:", err)
}
Output:

db
redis
an error occurred: cannot connect to redis

func TilErrAsync added in v0.0.2

func TilErrAsync(funcs ...func() error) (err error)

TilErrAsync runs all funcs in background, block til done and return first error

It guarantees all f in funcs are executed. Only first error is returned, others are ignored. See example for common use case.

Example
// say you have to connect to 3 independant apis
initConn1 := func() error {
	// wait some time to simulate api call
	time.Sleep(10 * time.Millisecond)
	fmt.Println("api1")
	return nil
}
initConn2 := func() error {
	time.Sleep(20 * time.Millisecond)
	fmt.Println("api2")
	return errors.New("failed to connect to api2")
}
initConn3 := func() error {
	time.Sleep(30 * time.Millisecond)
	fmt.Println("api3")
	return errors.New("failed to connect to api3")
}

err := TilErrAsync(initConn1, initConn2, initConn3)
if err != nil {
	fmt.Println("an error occurred:", err)
}
Output:

api1
api2
api3
an error occurred: failed to connect to api2

func TriesAtMost

func TriesAtMost(n uint64, f func() error) (err chan error)

TriesAtMost retries f for at most n times

suppose your f() costs 1s to run and always fail, TriesAtMost(10, f) will run f() 10 times (which costs you 10s), and err is closed at 10s

func TryAtMost added in v0.0.3

func TryAtMost(n uint64, f func() error) (err error)

TryAtMost wraps TriesAtMost, returns last error iff all attempts failed.

Types

type InfiniteLoopControl

type InfiniteLoopControl struct {
	Cancel context.CancelFunc
	Err    chan error
}

InfiniteLoopControl is the control structure of InfiniteLoop()

Err is closed by InfiniteLoop()

func AllErr

func AllErr(ctrls ...InfiniteLoopControl) (ret InfiniteLoopControl)

AllErr merges several InfiniteLoopControl into one, and returns all error.

Cancel functions are called either

  • an error occurs
  • an infinite loop exits

All errors are returned. The order is determined by reflect.Select()

It is possible to use it as sort of sync.WaitGroup

ctrl := AllErr(
    InfiniteLoop(crawlsSite),
    InfiniteLoop(crawlsAnotherSite),
)
defer ctrl.Cancel()

if err := <- ctrl.Err; err != nil {
    log.Print("an error occurred: ", err)
}

// gracefully shutdown: waits all tasks to stop
for range ctrl.Err {
}
log.Print("all tasks are stopped, program exits")
Example
// crawler 1: banned at 3rd try
crawl1 := Recorded(func(idx uint64) error {
	time.Sleep(10 * time.Millisecond)
	if idx > 1 {
		fmt.Println("banned1")
		return errors.New("banned")
	}
	fmt.Println("done1")
	return nil
})
// crawler 1: banned at 9th try
crawl2 := Recorded(func(idx uint64) error {
	time.Sleep(12 * time.Millisecond)
	if idx > 7 {
		fmt.Println("banned2")
		return errors.New("banned")
	}
	fmt.Println("done2")
	return nil
})

ctrl := AllErr(
	InfiniteLoop(crawl2),
	InfiniteLoop(crawl1),
)
defer ctrl.Cancel()

fmt.Println(<-ctrl.Err)

// will get context canceled error from crawl2
for err := range ctrl.Err {
	fmt.Println(err)
}
Output:

done1
done2
done1
done2
banned1
banned
done2
context canceled

func AnyErr

func AnyErr(ctrls ...InfiniteLoopControl) (ret InfiniteLoopControl)

AnyErr merges several InfiniteLoopControl into one.

Only the first error (could be nil if an error channel in ctrls is closed) is returned, later ARE DISCARDED IN BACKGROUND.

Example
// crawler 1: banned at 3rd try
crawl1 := Recorded(func(idx uint64) error {
	time.Sleep(10 * time.Millisecond)
	if idx > 1 {
		fmt.Println("banned1")
		return errors.New("banned")
	}
	fmt.Println("done1")
	return nil
})
// crawler 1: banned at 9th try
crawl2 := Recorded(func(idx uint64) error {
	time.Sleep(12 * time.Millisecond)
	if idx > 7 {
		fmt.Println("banned2")
		return errors.New("banned")
	}
	fmt.Println("done2")
	return nil
})

ctrl := AnyErr(
	InfiniteLoop(crawl2),
	InfiniteLoop(crawl1),
)
defer ctrl.Cancel()

fmt.Println(<-ctrl.Err)

// will not get anything
for err := range ctrl.Err {
	fmt.Println(err)
}
Output:

done1
done2
done1
done2
banned1
banned
done2

func InfiniteLoop

func InfiniteLoop(task func() error) (ret InfiniteLoopControl)

InfiniteLoop loops your function and capable to cancel-on-demand

There are few things you should take care of:

  • It will not interrupt current loop.
  • It will not wait any second between tasks.

Common usecase is InfiniteLoop(RunAtLeast(someDuration, task))

type StatefulFunc added in v0.0.3

type StatefulFunc interface {
	IsRunning() bool
	// try to run the function now, return ErrRunning immediately if it is running
	TryRun() (err error)
	// run this function, blocks until ran
	Run() (err error)
	// blocks until it's free to run, and hold the lock to prevent others running
	// calling Run() without release the lock cause deadlock! use with care.
	//
	// It's safe to call release multiple times, only first time is executed.
	Lock() (release func())
}

StatefulFunc is a function that possible to get current running state

Example (IsRunning)
f := func() error {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("run")
	return nil
}

sf := NewStatefulFunc(f)
fmt.Println(sf.IsRunning()) // false
go sf.Run()                 // "run" at 10ms
time.Sleep(5 * time.Millisecond)
fmt.Println(sf.IsRunning()) // "true" at 5ms
time.Sleep(6 * time.Millisecond)
fmt.Println(sf.IsRunning()) // "false" as 11ms
Output:

false
true
run
false
Example (Lock)
f := func() error {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("run")
	return nil
}

sf := NewStatefulFunc(f)

go func() {
	sf.Run() // "run" at 10ms
	sf.Run() // blocked by Lock() for 5ms, so print "run" at 25ms
	sf.Run() // "run" at 35ms
}()

time.Sleep(1 * time.Millisecond)
release := sf.Lock() // return at 10ms
fmt.Println("lock")  // printed at 10ms, just after first sf.Run() returns
time.Sleep(5 * time.Millisecond)
release() // release the lock so second Run() can be executed

time.Sleep(30 * time.Millisecond)
Output:

run
lock
run
run
Example (TryRun)
f := func() error {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("run")
	return nil
}

sf := NewStatefulFunc(f)
go sf.TryRun() // "run" at 10ms
time.Sleep(time.Millisecond)
go sf.Run() // "run" at 20ms
time.Sleep(5 * time.Millisecond)
fmt.Println(sf.TryRun()) // ErrRunning at 6ms
time.Sleep(10 * time.Millisecond)
fmt.Println(sf.TryRun()) // ErrRunning at 16ms
time.Sleep(5 * time.Millisecond)
fmt.Println(sf.TryRun()) // called at 21ms, so print "run" and "<nil>" at 31ms
Output:

StatefulFunc: function is running
run
StatefulFunc: function is running
run
run
<nil>

func NewStatefulFunc added in v0.0.3

func NewStatefulFunc(f func() error) (ret StatefulFunc)

NewStatefulFunc creates a new StatefulFunc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL