async

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2023 License: MIT Imports: 15 Imported by: 1

README

Async

Why you want to use this package

Package async simplifies the implementation of orchestration patterns for concurrent systems. It is similar to Java Future or JS Promise, which makes life much easier when dealing with asynchronous operation and concurrent processing. Golang is excellent in terms of parallel programming. However, dealing with goroutines and channels could be a big headache when business logic gets complicated. Wrapping them into higher-level functions improves code readability significantly and makes it easier for engineers to reason about the system's behaviours.

Currently, this package includes:

  • Asynchronous tasks with cancellations, context propagation and state.
  • Task chaining by using continuations.
  • Worker pool - spawning a fixed-size pool of workers to execute tasks in parallel.
  • Fork/join pattern - running a batch of tasks in parallel and blocking until all finish.
  • Concurrency cap pattern - running a batch of tasks concurrently with a cap on max concurrency level.
  • Throttling pattern - throttling task execution at a specified rate.
  • Spread pattern - spreading tasks within a specified duration.
  • Repeat pattern - repeating a task on a pre-determined interval.
  • Batch pattern - batching many tasks to be processed together with individual continuations.
  • Partition pattern - dividing data into partitions concurrently.
  • Jitter pattern - adding a random jitter before executing a function to avoid thundering herds.

Concept

Task is a basic concept like Future in Java. You can create a Task using an executable function which takes in context.Context, then returns error and an optional result.

task := NewTask(func(context.Context) (animal, error) {
    // run the job
    return res, err
})

silentTask := NewSilentTask(func(context.Context) error {
    // run the job
    return err
})
Get the result

The function will be executed asynchronously. You can query whether it's completed by calling task.State(), which is a non-blocking function. Alternative, you can wait for the response using task.Outcome() or silentTask.Wait(), which will block the execution until the task is done. These functions are quite similar to the equivalents in Java Future.isDone() or Future.get().

Cancelling

There could be case that we don't care about the result anymore some time after execution. In this case, a task can be aborted by invoking task.Cancel().

Chaining

To have a follow-up action after a task is done, you can use the provided family of Continue functions. This could be very useful to create a chain of processing, or to have a teardown process at the end of a task.

Features

Worker pool

Similar to a thread pool in Java, a worker pool is a fixed-size pool of goroutines that can execute any kind of tasks as they come. When the number of tasks grows larger than the number of available workers, new tasks will be pushed into a waiting queue to be executed later.

Our implementation is an upgrade/retrofit of the popular https://github.com/gammazero/workerpool repo to match the behaviors of other features in this library.

wp := NewWorkerPool(
    WithMaxSize(5), 
	WithBurst(10, 5), 
)
defer wp.Stop()

task := NewTask(func(context.Context) (animal, error) {
    // run the job
    return res, err
})

ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

// The given context will be used to execute the task
wp.Submit(ctxWithTimeout, task)

// Block and wait for the outcome
result, err := task.Outcome()
Fork join

ForkJoin is meant for running multiple subtasks concurrently. They could be different parts of the main task which can be executed independently. The following code example illustrates how you can send files to S3 concurrently with a few lines of code.

func uploadFilesConcurrently(files []string) {
    var tasks []Task[string]
    for _, file := range files {
        f := file
        
        tasks = append(tasks, NewTask(func(ctx context.Context) (string, error) {
            return upload(ctx, f)
        }))
    }

    ForkJoin(context.Background(), tasks)
}

func upload(ctx context.Context, file string) (string, error){
    // do file uploading
    return "", nil
}
Concurrency cap

ForkJoin is not suitable when the number of tasks is huge. In this scenario, the number of concurrent goroutines would likely overwhelm a node and consume too much CPU resources. One solution is to put a cap on the max concurrency level. RunWithConcurrencyLevelC and RunWithConcurrencyLevelS were created for this purpose. Internally, it's like maintaining a fixed-size worker pool which aims to execute the given tasks as quickly as possible without violating the given constraint.

// RunWithConcurrencyLevelC runs the given tasks up to the max concurrency level.
func RunWithConcurrencyLevelC[T SilentTask](ctx context.Context, concurrencyLevel int, tasks <-chan T) SilentTask

// RunWithConcurrencyLevelS runs the given tasks up to the max concurrency level.
func RunWithConcurrencyLevelS[T SilentTask](ctx context.Context, concurrencyLevel int, tasks []T) SilentTask
Throttle

Sometimes you don't really care about the concurrency level but just want to execute the tasks at a particular rate. The Throttle function would come in handy in this case.

// Throttle runs the given tasks at the specified rate.
func Throttle[T SilentTask](ctx context.Context, tasks []T, rateLimit int, every time.Duration) SilentTask

For example, if you want to send 4 files every 2 seconds, the Throttle function will start a task every 0.5 second.

Spread

Instead of starting all tasks at once with ForkJoin, you can also spread the starting points of our tasks evenly within a certain duration using the Spread function.

// Spread evenly spreads the tasks within the specified duration.
func Spread[T SilentTask](ctx context.Context, tasks []T, within time.Duration) SilentTask

For example, if you want to send 50 files within 10 seconds, the Spread function would start a task every 0.2s.

Repeat

In cases where you need to repeat a background task on a pre-determined interval, Repeat is your friend. The returned SilentTask can then be used to cancel the repeating task at any time.

// Repeat executes the given SilentWork asynchronously on a pre-determined interval.
func Repeat(ctx context.Context, interval time.Duration, action SilentWork) SilentTask
Batch

Instead of executing a task immediately whenever you receive an input, sometimes, it might be more efficient to create a batch of inputs and process all in one go.

out := make(chan int, taskCount)

processBatch := func(nums []int) error {
    for _, number := range nums {
        out <- number * 10
    }
    
    return nil
}

// Auto process batch every 100ms
periodicBatcher := NewBatcher(
    processBatch,
    WithAutoProcessInterval(100*time.Millisecond),
)

// Auto process batch when pending queue reaches 10
sizeBatcher := NewBatcher(
    processBatch,
    WithAutoProcessSize(10),
)

// Auto process batch every 100ms or when pending queue reaches 10
periodicSizeBatcher := NewBatcher(
    processBatch,
    WithAutoProcessInterval(100*time.Millisecond),
    WithAutoProcessSize(10),
)

// Auto process batch when pending queue reaches 10
manualBatcher := NewBatcher(
    processBatch,
)

manualBatcher.Process()

See batch_test.go for more detailed examples on how to use this feature.

Partition

When you receive a lot of data concurrently, it might be useful to divide the data into partitions asynchronously before consuming.

partitionFunc := func(a animal) (string, bool) {
    if a.species == "" {
        return "", false
    }
    
    return a.species, true
}

p := NewPartitioner(context.Background(), partitionFunc)

input := []animal{
    {"dog", "name1"},
    {"snail", "name2"},
    {"dog", "name4"},
    {"cat", "name5"},
}

p.Take(input...)

res := p.Outcome()

See partition_test.go for a detailed example on how to use this feature.

Jitter

Using jitters to avoid thundering herds is a popular technique. We decided to make it simple for you.

t := InvokeInSilence(
    context.Background(), 
    func(ctx context.Context) error {
        DoJitter(func() {
            fmt.Println("do something after random jitter")
        }, 1000)

        return nil
    },
)

See jitter_test.go for a detailed example on how to use this feature.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrBatchProcessorNotActive = errors.New("batch processor has already shut down")
)
View Source
var ErrCancelled = errors.New("task cancelled")

ErrCancelled is returned when a task gets cancelled.

Functions

func CancelAll

func CancelAll[T SilentTask](tasks []T)

CancelAll cancels all given tasks.

Note: task cannot be nil

func DoJitter

func DoJitter(doFn func(), maxJitterDurationInMilliseconds int) int

DoJitter adds a random jitter before executing doFn, then returns the jitter duration. Why jitter?

http://highscalability.com/blog/2012/4/17/youtube-strategy-adding-jitter-isnt-a-bug.html

Example
t1 := InvokeInSilence(
	context.Background(), func(ctx context.Context) error {
		DoJitter(
			func() {
				fmt.Println("do something after random jitter")
			}, 1000,
		)

		return nil
	},
)

t2 := AddJitterT(
	NewTask(
		func(ctx context.Context) (int, error) {
			fmt.Println("return 1 after random jitter")
			return 1, nil
		},
	),
	1000,
).Run(context.Background())

t3 := AddJitterST(
	NewSilentTask(
		func(ctx context.Context) error {
			fmt.Println("return nil after random jitter")
			return nil
		},
	),
	1000,
).Execute(context.Background())

WaitAll([]SilentTask{t1, t2, t3})
Output:

func ForkJoin

func ForkJoin[T SilentTask](ctx context.Context, tasks []T)

ForkJoin executes given tasks in parallel and waits for ALL to complete before returning.

Note: task cannot be nil

Example
first := NewTask(
	func(context.Context) (int, error) {
		return 1, nil
	},
)

second := NewTask(
	func(context.Context) (interface{}, error) {
		return nil, errors.New("some error")
	},
)

ForkJoin(context.Background(), []SilentTask{first, second})

fmt.Println(first.Outcome())
fmt.Println(second.Outcome())
Output:

1 <nil>
<nil> some error

func ForkJoinFailFast added in v1.0.2

func ForkJoinFailFast[T SilentTask](ctx context.Context, tasks []T) error

ForkJoinFailFast executes given tasks in parallel and waits for the 1st task to fail and returns immediately or for ALL to complete successfully before returning.

Note: task cannot be nil

func WaitAll

func WaitAll[T SilentTask](tasks []T)

WaitAll waits for all executed tasks to finish.

Note: task cannot be nil

Types

type Batcher

type Batcher[P any, T any] interface {

	// Append adds a new payload to the batch and returns a task for that particular payload.
	// Clients MUST execute the returned task before blocking and waiting for it to complete
	// to extract result.
	Append(payload P) Task[T]
	// contains filtered or unexported methods
}

Batcher is a batch processor which is suitable for sitting in the back to receive tasks from callers to execute in one go and then return individual result to each caller.

Example
b := NewBatcher(
	func(input []int) ([]int, error) {
		fmt.Println(input)

		result := make([]int, len(input))
		for idx, number := range input {
			result[idx] = number * 2
		}

		return result, nil
	},
	nil,
)

defer b.Shutdown()

t1 := b.Append(1)
t2 := b.Append(2)

b.Process(context.Background())

ContinueWithNoResult(
	t1, func(_ context.Context, v int, err error) error {
		fmt.Println(v)

		return nil
	},
).ExecuteSync(context.Background())

ContinueWithNoResult(
	t2, func(_ context.Context, v int, err error) error {
		fmt.Println(v)

		return nil
	},
).ExecuteSync(context.Background())
Output:

[1 2]
2
4

func NewBatcher

func NewBatcher[P any, T any](
	processFn func([]P) ([]T, error),
	payloadKeyExtractor func(P) any,
	options ...BatcherOption,
) Batcher[P, T]

NewBatcher returns a new Batcher

type BatcherOption

type BatcherOption func(*batcherConfigs)

func WithAutoProcessInterval

func WithAutoProcessInterval(autoProcessIntervalInMilliseconds time.Duration) BatcherOption

WithAutoProcessInterval sets the interval at which Batcher will automatically process the pending tasks. If `autoProcessDurationInMilliseconds <= 0`, the default behavior applies: no periodic auto processing will be done.

Note: if periodic auto processing is ON, clients MUST call Batcher.Shutdown() to clean up the timer goroutine properly in order to avoid memory leak.

func WithAutoProcessSize

func WithAutoProcessSize(autoProcessSize int) BatcherOption

WithAutoProcessSize sets the limit at which Batcher will automatically process the pending tasks. If `autoProcessSize <= 0`, the default behavior applies: no auto processing will be done based on size.

func WithShutdownGraceDuration

func WithShutdownGraceDuration(shutdownDurationInMilliseconds time.Duration) BatcherOption

WithShutdownGraceDuration specifies how long Batcher will wait for the Shutdown operation to complete before returning. If `shutdownDurationInMilliseconds <= 0`, Batcher will block and wait until the shutdown operation fully completes.

type MockSilentTask added in v1.0.3

type MockSilentTask struct {
	mock.Mock
}

MockSilentTask is an autogenerated mock type for the SilentTask type

func NewMockSilentTask added in v1.0.3

func NewMockSilentTask(t mockConstructorTestingTNewMockSilentTask) *MockSilentTask

NewMockSilentTask creates a new instance of MockSilentTask. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockSilentTask) Cancel added in v1.0.3

func (_m *MockSilentTask) Cancel()

Cancel provides a mock function with given fields:

func (*MockSilentTask) Duration added in v1.0.3

func (_m *MockSilentTask) Duration() time.Duration

Duration provides a mock function with given fields:

func (*MockSilentTask) Error added in v1.0.3

func (_m *MockSilentTask) Error() error

Error provides a mock function with given fields:

func (*MockSilentTask) Execute added in v1.0.3

func (_m *MockSilentTask) Execute(ctx context.Context) SilentTask

Execute provides a mock function with given fields: ctx

func (*MockSilentTask) ExecuteSync added in v1.0.3

func (_m *MockSilentTask) ExecuteSync(ctx context.Context) SilentTask

ExecuteSync provides a mock function with given fields: ctx

func (*MockSilentTask) State added in v1.0.3

func (_m *MockSilentTask) State() State

State provides a mock function with given fields:

func (*MockSilentTask) Wait added in v1.0.3

func (_m *MockSilentTask) Wait()

Wait provides a mock function with given fields:

func (*MockSilentTask) WithRecoverAction added in v1.0.3

func (_m *MockSilentTask) WithRecoverAction(recoverAction PanicRecoverWork)

WithRecoverAction provides a mock function with given fields: recoverAction

type MockTask added in v1.0.3

type MockTask[T interface{}] struct {
	mock.Mock
}

MockTask is an autogenerated mock type for the Task type

func NewMockTask added in v1.0.3

func NewMockTask[T interface{}](t mockConstructorTestingTNewMockTask) *MockTask[T]

NewMockTask creates a new instance of MockTask. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockTask[T]) Cancel added in v1.0.3

func (_m *MockTask[T]) Cancel()

Cancel provides a mock function with given fields:

func (*MockTask[T]) Duration added in v1.0.3

func (_m *MockTask[T]) Duration() time.Duration

Duration provides a mock function with given fields:

func (*MockTask[T]) Error added in v1.0.3

func (_m *MockTask[T]) Error() error

Error provides a mock function with given fields:

func (*MockTask[T]) Execute added in v1.0.3

func (_m *MockTask[T]) Execute(ctx context.Context) SilentTask

Execute provides a mock function with given fields: ctx

func (*MockTask[T]) ExecuteSync added in v1.0.3

func (_m *MockTask[T]) ExecuteSync(ctx context.Context) SilentTask

ExecuteSync provides a mock function with given fields: ctx

func (*MockTask[T]) Outcome added in v1.0.3

func (_m *MockTask[T]) Outcome() (T, error)

Outcome provides a mock function with given fields:

func (*MockTask[T]) ResultOrDefault added in v1.0.3

func (_m *MockTask[T]) ResultOrDefault(_a0 T) T

ResultOrDefault provides a mock function with given fields: _a0

func (*MockTask[T]) Run added in v1.0.3

func (_m *MockTask[T]) Run(ctx context.Context) Task[T]

Run provides a mock function with given fields: ctx

func (*MockTask[T]) RunSync added in v1.0.3

func (_m *MockTask[T]) RunSync(ctx context.Context) Task[T]

RunSync provides a mock function with given fields: ctx

func (*MockTask[T]) State added in v1.0.3

func (_m *MockTask[T]) State() State

State provides a mock function with given fields:

func (*MockTask[T]) Wait added in v1.0.3

func (_m *MockTask[T]) Wait()

Wait provides a mock function with given fields:

func (*MockTask[T]) WithRecoverAction added in v1.0.3

func (_m *MockTask[T]) WithRecoverAction(recoverAction PanicRecoverWork)

WithRecoverAction provides a mock function with given fields: recoverAction

type PanicRecoverWork added in v1.0.2

type PanicRecoverWork func(any)

PanicRecoverWork represents a unit of work to be executed when a panic occurs.

type PartitionFunc

type PartitionFunc[K comparable, V any] func(data V) (key K, ok bool)

PartitionFunc takes in data and then returns a key and whether the key can be used to route data into a partition.

type Partitioner

type Partitioner[K comparable, V any] interface {
	// Take items and divide them into separate partitions asynchronously.
	Take(items ...V)
	// Outcome returns items divided into separate partitions.
	Outcome() map[K][]V
}

Partitioner divides items into separate partitions.

Example
partitionFunc := func(a animal) (string, bool) {
	if a.species == "" {
		return "", false
	}

	return a.species, true
}

p := NewPartitioner(context.Background(), partitionFunc)

input := []animal{
	{"dog", "name1"},
	{"snail", "name2"},
	{"dog", "name4"},
	{"cat", "name5"},
}

p.Take(input...)

res := p.Outcome()
fmt.Println(res)

first := res["dog"]
fmt.Println(first[0])
fmt.Println(first[1])
Output:

map[cat:[{cat name5}] dog:[{dog name1} {dog name4}] snail:[{snail name2}]]
{dog name1}
{dog name4}

func NewPartitioner

func NewPartitioner[K comparable, V any](ctx context.Context, partitionFn PartitionFunc[K, V]) Partitioner[K, V]

NewPartitioner creates a new partitioner.

type SilentBatcher added in v1.0.2

type SilentBatcher[P any] interface {

	// Append adds a new payload to the batch and returns a task for that particular payload.
	// Clients MUST execute the returned task before blocking and waiting for it to complete
	// to extract result.
	Append(payload P) SilentTask
	// contains filtered or unexported methods
}

SilentBatcher is a batch processor which is suitable for sitting in the back to accumulate tasks and then execute all in one go silently.

Example
var wg sync.WaitGroup
wg.Add(2)

b := NewSilentBatcher(
	func(input []interface{}) error {
		fmt.Println(input)
		return nil
	},
)

ContinueInSilence(
	b.Append(1), func(_ context.Context, err error) error {
		wg.Done()

		return nil
	},
).Execute(context.Background())

ContinueInSilence(
	b.Append(2), func(_ context.Context, err error) error {
		wg.Done()

		return nil
	},
).Execute(context.Background())

b.Process(context.Background())

wg.Wait()
Output:

[1 2]

func NewSilentBatcher added in v1.0.2

func NewSilentBatcher[P any](processFn func([]P) error, options ...BatcherOption) SilentBatcher[P]

NewSilentBatcher returns a new SilentBatcher

type SilentTask

type SilentTask interface {
	// WithRecoverAction attaches the given recover action with task so that
	// it can be executed when a panic occurs.
	WithRecoverAction(recoverAction PanicRecoverWork)
	// Execute starts this task asynchronously.
	Execute(ctx context.Context) SilentTask
	// ExecuteSync starts this task synchronously.
	ExecuteSync(ctx context.Context) SilentTask
	// Wait waits for this task to complete.
	Wait()
	// Cancel changes the state of this task to `Cancelled`.
	Cancel()
	// Error returns the error that occurred when this task was executed.
	Error() error
	// State returns the current state of this task. This operation is non-blocking.
	State() State
	// Duration returns the duration of this task.
	Duration() time.Duration
}

SilentTask represents a unit of work to complete in silence like background works that return no values.

func AddJitterST

func AddJitterST(t SilentTask, maxJitterDurationInMilliseconds int) SilentTask

AddJitterST adds a random jitter before executing the given SilentTask. Why jitter?

http://highscalability.com/blog/2012/4/17/youtube-strategy-adding-jitter-isnt-a-bug.html

func ContinueInSilence

func ContinueInSilence(currentTask SilentTask, nextAction func(context.Context, error) error) SilentTask

ContinueInSilence proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.

func ContinueWithNoResult

func ContinueWithNoResult[T any](currentTask Task[T], nextAction func(context.Context, T, error) error) SilentTask

ContinueWithNoResult proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.

func InvokeInSilence

func InvokeInSilence(ctx context.Context, action SilentWork) SilentTask

InvokeInSilence creates a new SilentTask and runs it asynchronously.

func NewSilentTask

func NewSilentTask(action SilentWork) SilentTask

NewSilentTask creates a new SilentTask.

func NewSilentTasks

func NewSilentTasks(actions ...SilentWork) []SilentTask

NewSilentTasks creates a group of new SilentTask.

func Repeat

func Repeat(ctx context.Context, interval time.Duration, action SilentWork) SilentTask

Repeat executes the given SilentWork asynchronously on a pre-determined interval.

Example
out := make(chan bool, 1)
task := Repeat(
	context.Background(), time.Nanosecond*10, func(context.Context) error {
		defer func() {
			recover()
		}()

		out <- true
		return nil
	},
)

<-out
v := <-out

fmt.Println(v)

task.Cancel()
close(out)
Output:

true

func RunWithConcurrencyLevelC

func RunWithConcurrencyLevelC[T SilentTask](ctx context.Context, concurrencyLevel int, tasks <-chan T) SilentTask

RunWithConcurrencyLevelC runs the given tasks up to the max concurrency level.

Note: When `ctx` is cancelled, we spawn a new goroutine to cancel all remaining tasks in the given channel. To avoid memory leak, client MUST make sure new tasks will eventually stop arriving once `ctx` is cancelled so that the new goroutine can return.

func RunWithConcurrencyLevelS

func RunWithConcurrencyLevelS[T SilentTask](ctx context.Context, concurrencyLevel int, tasks []T) SilentTask

RunWithConcurrencyLevelS runs the given tasks up to the max concurrency level.

Example
resChan := make(chan int, 6)
works := make([]Work[struct{}], 6, 6)

for i := range works {
	j := i

	works[j] = func(context.Context) (struct{}, error) {
		fmt.Println(j / 2)
		time.Sleep(time.Millisecond * 10)

		return struct{}{}, nil
	}
}

tasks := NewTasks(works...)
RunWithConcurrencyLevelS(context.Background(), 2, tasks)

WaitAll(tasks)
close(resChan)

var res []int
for r := range resChan {
	res = append(res, r)
}
Output:

0
0
1
1
2
2

func Spread

func Spread[T SilentTask](ctx context.Context, tasks []T, within time.Duration) SilentTask

Spread evenly spreads the tasks within the specified duration.

Example
tasks := newTasks()
within := 200 * time.Millisecond

// Spread
task := Spread(context.Background(), tasks, within)
task.Wait()

// Make sure all tasks are done
for _, task := range tasks {
	v, _ := task.Outcome()
	fmt.Println(v)
}
Output:

1
1
1
1
1

func Throttle

func Throttle[T SilentTask](ctx context.Context, tasks []T, rateLimit int, every time.Duration) SilentTask

Throttle runs the given tasks at the specified rate.

type SilentWork

type SilentWork func(context.Context) error

SilentWork represents a unit of work to execute in silence like background works that return no values.

type State

type State byte

State represents the state enumeration for a task.

const (
	IsCreated   State = iota // IsCreated represents a newly created task
	IsRunning                // IsRunning represents a task which is currently running
	IsCompleted              // IsCompleted represents a task which was completed successfully or errored out
	IsCancelled              // IsCancelled represents a task which was cancelled or has timed out
)

Various task states.

type Task

type Task[T any] interface {
	SilentTask
	// Run starts this task asynchronously.
	Run(ctx context.Context) Task[T]
	// RunSync starts this task synchronously.
	RunSync(ctx context.Context) Task[T]
	// Outcome waits for this task to complete and returns the final result & error.
	Outcome() (T, error)
	// ResultOrDefault waits for this task to complete and returns the final result if
	// there's no error or the default result if there's an error.
	ResultOrDefault(T) T
}

Task represents a unit of work that is expected to return a value of a particular type.

func AddJitterT

func AddJitterT[T any](t Task[T], maxJitterDurationInMilliseconds int) Task[T]

AddJitterT adds a random jitter before executing the given Task. Why jitter?

http://highscalability.com/blog/2012/4/17/youtube-strategy-adding-jitter-isnt-a-bug.html

func Completed

func Completed[T any](result T, err error) Task[T]

Completed returns a completed task with the given result and error.

func ContinueWith

func ContinueWith[T any, S any](currentTask Task[T], nextAction func(context.Context, T, error) (S, error)) Task[S]

ContinueWith proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.

func ContinueWithResult

func ContinueWithResult[T any](currentTask SilentTask, nextAction func(context.Context, error) (T, error)) Task[T]

ContinueWithResult proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.

func Invoke

func Invoke[T any](ctx context.Context, action Work[T]) Task[T]

Invoke creates a new Task and runs it asynchronously.

func NewTask

func NewTask[T any](action Work[T]) Task[T]

NewTask creates a new Task.

func NewTasks

func NewTasks[T any](actions ...Work[T]) []Task[T]

NewTasks creates a group of new Task.

type Work

type Work[T any] func(context.Context) (T, error)

Work represents a unit of work to execute that is expected to return a value of a particular type.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is similar to a thread pool in Java, where the number of concurrent goroutines processing requests does not exceed the configured maximum.

func NewWorkerPool

func NewWorkerPool(options ...WorkerPoolOption) *WorkerPool

NewWorkerPool creates and starts a pool of worker goroutines.

`maxSize` specifies the maximum number of workers that can execute tasks concurrently. When there's no incoming tasks, workers get killed 1-by-1 until there's no remaining workers.

func (*WorkerPool) Pause

func (p *WorkerPool) Pause(ctx context.Context)

Pause causes all workers to wait on the given context, thereby making them unavailable to run tasks. Pause returns when all workers are waiting. Tasks can still be queued but won't get executed until `ctx` is cancelled or times out.

Calling Pause when the worker pool is already paused causes Pause to wait until previous pauses are cancelled. This allows a goroutine to take control of pausing and un-pausing the pool as soon as other goroutines have un-paused it.

When this worker pool is stopped, workers are un-paused and queued tasks may be executed during StopWait.

func (*WorkerPool) Size

func (p *WorkerPool) Size() int

Size returns the maximum number of concurrent workers.

func (*WorkerPool) Stop

func (p *WorkerPool) Stop()

Stop stops this worker pool and waits for the running tasks to complete. Pending tasks that are still in the queue will get cancelled. New tasks must not be submitted to the worker pool after calling Stop().

Note: to avoid memory leak, clients MUST always call Stop() or StopWait() when this worker pool is no longer needed.

func (*WorkerPool) StopWait

func (p *WorkerPool) StopWait()

StopWait stops this worker pool and waits for the running tasks + all queued tasks to complete. New tasks must not be submitted to the worker pool after calling Stop().

Note: to avoid memory leak, clients MUST always call Stop() or StopWait() when this worker pool is no longer needed.

func (*WorkerPool) Stopped

func (p *WorkerPool) Stopped() bool

Stopped returns true if this worker pool has been stopped.

func (*WorkerPool) Submit

func (p *WorkerPool) Submit(ctx context.Context, task SilentTask)

Submit enqueues a function for a worker to execute.

Submit will not block regardless of the number of tasks submitted. Each task is given to an available worker or to a newly started worker. If there's no available workers, and no new workers can be created due to the configured maximum, then the task is put onto a waiting queue.

When the waiting queue is not empty, incoming tasks will go into the queue immediately. Tasks are removed from the waiting queue as workers become available.

As long as no new tasks arrive, one idle worker will get killed periodically until no more workers are left. Since starting new goroutines is cheap & quick, there's no need to retain idle workers indefinitely.

func (*WorkerPool) WaitingQueueSize

func (p *WorkerPool) WaitingQueueSize() int

WaitingQueueSize returns the count of tasks in the waiting queue.

type WorkerPoolOption

type WorkerPoolOption func(*workerPoolConfigs)

func WithBurst

func WithBurst(burstQueueThreshold int, burstCapacity int) WorkerPoolOption

WithBurst sets the threshold for the waiting queue at which point the maximum size of the worker pool will be increased by the given capacity.

func WithIdleTimeout

func WithIdleTimeout(idleTimeout time.Duration) WorkerPoolOption

WithIdleTimeout sets the maximum duration that a worker can stay idle before one of them gets killed.

func WithMaxSize

func WithMaxSize(maxSize int) WorkerPoolOption

WithMaxSize sets the maximum size of the worker pool under normal condition.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL