gotasks

package module
v0.0.0-...-367260e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2023 License: Apache-2.0 Imports: 12 Imported by: 0

README

Go Report Card LICENSE

gotasks

A production inspired task parallelization library

Install
go get -u github.com/monime-lab/gotasks
Sample
package main

import (
	"context"
	"errors"
	"fmt"
	"github.com/monime-lab/gotasks"
	"github.com/monime-lab/gotries"
	"log"
	"time"
)

func saveToStore1() error {
	return nil
}

func saveToStore2() error {
	return nil
}

func getUserByID(id int) (interface{}, error) {
	return fmt.Sprintf("user-%d", id), nil
}

func main() {
	runnerExampleOne()
	runnerExampleTwo()
	schedulerExampleOne()
	schedulerExampleTwo()
}

func runnerExampleOne() {
	_, err := gotasks.NewTaskRunner( /* Options here... */).
		AddRunnableTask(func(ctx context.Context) error {
			return saveToStore1()
		}).
		AddRunnableTask(func(ctx context.Context) error {
			return saveToStore2()
		}).RunAndWaitAny(context.TODO())
	if err != nil {
		panic(err)
	}
	log.Printf("At least one of them succeeds!!!")
}

func runnerExampleTwo() {
	runner := gotasks.NewTaskRunner(
		// This is a fail fast switch useful
		// when calling runner.RunAndWaitAll()
		// The call will return on the first failure
		gotasks.WithEagerFail(true),
		// The maximum parallelism. This is a
		// concurrency rate-limiter for when
		// the number of tasks can be high.
		// At any point, there are at most `max`
		// task (goroutines) running concurrently.
		// Value < 1 means unbounded parallelism
		gotasks.WithMaxParallelism(10),
		// Syntactic sugar to WithMaxParallelism(1).
		// Useful for executing multiple tasks serially
		gotasks.WithSequentialParallelism(),
		// Default retry options for all the added tasks
		gotasks.WithRetryOptions(
			// Retry all tasks twice...
			gotries.WithMaxAttempts(2),
		),
	)
	for i := 1; i <= 5; i++ {
		func(id int) {
			runner.AddCallableTask(func(ctx context.Context) (interface{}, error) {
				return getUserByID(id)
			}, gotries.WithTaskName(fmt.Sprintf("RunnerTask-%d", i)))
		}(i)
	}
	users, err := runner.RunAndWaitAll(context.TODO())
	if err != nil {
		// The error(s) are composed using https://github.com/uber-go/multierr
		log.Fatalf("At least one failed. Error: %s", err)
	}
	log.Printf("Users: %s", users)
}

func schedulerExampleOne() {
	_ = gotasks.DefaultScheduler().Schedule(context.Background(), func(ctx context.Context) error {
		println("Printed after 1 second")
		return nil
	}, 1*time.Second)
	_ = gotasks.DefaultScheduler().Schedule(context.Background(), func(ctx context.Context) error {
		println("Printed after 2 seconds")
		return nil
	}, 2*time.Second)
	future3 := gotasks.DefaultScheduler().Schedule(context.Background(), func(ctx context.Context) error {
		println("Printed after 5 seconds")
		return errors.New("error after printing: 'Printed after 5 seconds")
	}, 5*time.Second)
	if err := future3.Wait(); err != nil {
		log.Fatal(err)
	}
}

func schedulerExampleTwo() {
	future := gotasks.DefaultScheduler().ScheduleAtFixedRate(context.Background(), func(ctx context.Context) error {
		fmt.Printf("Running at: %s\n", time.Now().Format(time.RFC3339))
		return errors.New("oops!!! What's wrong")
	}, 0, 1*time.Second)
	go func() {
		time.Sleep(10 * time.Second)
		println("Stopping the scheduled action")
		future.Cancel()
	}()
	err := future.Wait()
	log.Printf(":::::::::::::::::::: Stopped. Err: %v", err)
}

Contribute

For issues, comments, recommendation or feedback please do it here.

Contributions are highly welcome.

👍

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrSchedulerStopping = errors.New("scheduler is stopping")
	ErrSchedulerStopped  = errors.New("scheduler is stopped")
)
View Source
var (
	ErrNoTasks = errors.New("no tasks in runner")
)

Functions

func ScheduleTaskFunc

func ScheduleTaskFunc(task func() error)

func ScheduleTaskFuncWithContext

func ScheduleTaskFuncWithContext(ctx context.Context, task func(ctx context.Context) error)

Types

type Callable

type Callable func(ctx context.Context) (interface{}, error)

func (Callable) Name

func (c Callable) Name() string

func (Callable) Run

func (c Callable) Run(ctx context.Context) (interface{}, error)

type CallableCallback

type CallableCallback func(interface{}, error)

type Future

type Future interface {
	OnError(callback func(err error)) Future
	OnComplete(callback func()) Future
	OnCancel(callback func()) Future
	OnFinally(callback func()) Future
	Cancel() Future
	Wait() error
}

type OnStopCallback

type OnStopCallback func()

type Permits

type Permits interface {
	Acquire(ctx context.Context) error
	Release(ctx context.Context)
}

type Runnable

type Runnable func(ctx context.Context) error

func (Runnable) Name

func (r Runnable) Name() string

func (Runnable) Run

func (r Runnable) Run(ctx context.Context) (interface{}, error)

type RunnableCallback

type RunnableCallback func(error)

type RunnerOption

type RunnerOption interface {
	// contains filtered or unexported methods
}

func WithEagerFail

func WithEagerFail(enable bool) RunnerOption

WithEagerFail is a fail fast switch useful when calling runner.RunAndWaitAll() The call will return on the first failure

func WithMaxParallelism

func WithMaxParallelism(max int) RunnerOption

WithMaxParallelism sets the maximum parallelism for the runner; if max is less than 1, then no parallelism limit is set This is a concurrency rate-limiter for when the number of tasks can be high. At any point, there are at most `max` task (goroutines) running concurrently.

func WithRetryOptions

func WithRetryOptions(options ...gotries.Option) RunnerOption

WithRetryOptions sets the default retry options for all the added tasks

func WithSequentialParallelism

func WithSequentialParallelism() RunnerOption

WithSequentialParallelism is a syntactic sugar to WithMaxParallelism(1). Useful for executing multiple tasks serially

type RunnerTask

type RunnerTask interface {
	Name() string
	Run(context.Context) (interface{}, error)
}

type Scheduler

type Scheduler interface {
	Schedule(ctx context.Context, runnable Runnable, delay time.Duration) Future
	ScheduleAtFixedRate(ctx context.Context, runnable Runnable, initialDelay, interval time.Duration) Future
}

func DefaultScheduler

func DefaultScheduler() Scheduler

func NewScheduler

func NewScheduler(options ...SchedulerOption) Scheduler

type SchedulerOption

type SchedulerOption interface {
	// contains filtered or unexported methods
}

func WithSchedulerMaxPoolSize

func WithSchedulerMaxPoolSize(max int) SchedulerOption

func WithSchedulerName

func WithSchedulerName(name string) SchedulerOption

func WithSchedulerPoolSize

func WithSchedulerPoolSize(size int) SchedulerOption

type TaskRunner

type TaskRunner interface {
	AddTask(task RunnerTask, options ...gotries.Option) TaskRunner
	AddRunnableTask(runnable Runnable, options ...gotries.Option) TaskRunner
	AddCallableTask(callable Callable, options ...gotries.Option) TaskRunner
	RunAndWaitAll(ctx context.Context) ([]interface{}, error)
	RunAndWaitAny(ctx context.Context) (interface{}, error)
}

func NewTaskRunner

func NewTaskRunner(options ...RunnerOption) TaskRunner

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL