goroutines

package module
v1.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2022 License: Apache-2.0 Imports: 6 Imported by: 0

README

goroutines

GoDev Build Status Go Report Card codecov Coverage Status License Sourcegraph FOSSA Status

Package goroutines is an efficient, flexible, and lightweight goroutine pool written in Go. It provides a easy way to deal with several kinds of concurrent tasks with limited resource.

Inspired by fastsocket, the implementation is based on channel. It adopts pubsub model for dispatching tasks, and holding surplus tasks in queue if submitted more than the capacity of pool.

Features

  • Spawning and managing arbitrary number of asynchronous goroutines as a worker pool.
  • Dispatch tasks to workers through pubsub model with specified queue size.
  • Adjust the worker numbers based on the usage periodically.
  • Easy to use when dealing with concurrent one-time batch jobs.
  • Monitor current status by metrics

Table of Contents

Installation

go get github.com/viney-shih/goroutines

Get Started

Basic usage of Pool in blocking mode

By calling Schedule(), it schedules the task executed by worker (goroutines) in the Pool. It will be blocked until the workers accepting the request.

taskN := 7
rets := make(chan int, taskN)

// allocate a pool with 5 goroutines to deal with those tasks
p := goroutines.NewPool(5)
// don't forget to release the pool in the end
defer p.Release()

// assign tasks to asynchronous goroutine pool
for i := 0; i < taskN; i++ {
	idx := i
	p.Schedule(func() {
		// sleep and return the index
		time.Sleep(20 * time.Millisecond)
		rets <- idx
	})
}

// wait until all tasks done
for i := 0; i < taskN; i++ {
	fmt.Println("index:", <-rets)
}

// Unordered output:
// index: 3
// index: 1
// index: 2
// index: 4
// index: 5
// index: 6
// index: 0
Basic usage of Pool in nonblocking mode

By calling ScheduleWithTimeout(), it schedules the task executed by worker (goroutines) in the Pool within the specified period. If it exceeds the time and doesn't be accepted, it will return error ErrScheduleTimeout.

totalN, taskN := 5, 5
pause := make(chan struct{})
rets := make(chan int, taskN)

// allocate a pool with 5 goroutines to deal with those 5 tasks
p := goroutines.NewPool(totalN)
// don't forget to release the pool in the end
defer p.Release()

// full the workers which are stopped with the `pause`
for i := 0; i < taskN; i++ {
	idx := i
	p.ScheduleWithTimeout(50*time.Millisecond, func() {
		<-pause
		rets <- idx
	})
}

// no more chance to add any task in Pool, and return `ErrScheduleTimeout`
if err := p.ScheduleWithTimeout(50*time.Millisecond, func() {
	<-pause
	rets <- taskN
}); err != nil {
	fmt.Println(err.Error())
}

close(pause)
for i := 0; i < taskN; i++ {
	fmt.Println("index:", <-rets)
}

// Unordered output:
// schedule timeout
// index: 0
// index: 3
// index: 2
// index: 4
// index: 1
Advanced usage of Batch jobs

To deal with batch jobs and consider the performance, we need to run tasks concurrently. However, the use case usually happen once and need not maintain a Pool for reusing it. I wrap this patten and call it Batch. Here comes an example.

taskN := 11

// allocate a one-time batch job with 3 goroutines to deal with those tasks.
// no need to spawn extra goroutine by specifing the batch size consisting with the number of tasks.
b := goroutines.NewBatch(3, goroutines.WithBatchSize(taskN))
// don't forget to close batch job in the end
defer b.Close()

// pull all tasks to this batch queue
for i := 0; i < taskN; i++ {
	idx := i
	b.Queue(func() (interface{}, error) {
		// sleep and return the index
		time.Sleep(10 * time.Millisecond)
		return idx, nil
	})
}

// tell the batch that's all need to do
// DO NOT FORGET THIS OR GOROUTINES WILL DEADLOCK
b.QueueComplete()

for ret := range b.Results() {
	if ret.Error() != nil {
		panic("not expected")
	}

	fmt.Println("index:", ret.Value().(int))
}

// Unordered output:
// index: 3
// index: 1
// index: 2
// index: 4
// index: 5
// index: 6
// index: 10
// index: 7
// index: 9
// index: 8
// index: 0

See the examples, documentation and article for more details.

Options

PoolOption

The PoolOption interface is passed to NewPool when creating Pool.

• WithTaskQueueLength( length int )

It sets up the length of task queue for buffering tasks before sending to goroutines. The default queue length is 0.

• WithPreAllocWorkers( size int )

It sets up the number of workers to spawn when initializing Pool. Without specifying this, It initialize all numbers of goroutines consisting with Pool size at the beginning.

• WithWorkerAdjustPeriod( period time.Duration )

It sets up the duration to adjust the worker size, and needs to be used with WithPreAllocWorkers at the same time. By specifying both, it enables the mechanism to adjust the number of goroutines according to the usage dynamically.

BatchOption

The BatchOption interface is passed to NewBatch when creating Batch.

• WithBatchSize( size int )

It specifies the batch size used to forward tasks. By default, it needs to spawn an extra goroutine to prevent deadlocks. It's helpful by specifing the batch size consisting with the number of tasks without an extra goroutine (see the example). The default batch size is 10.

References

License

Apache-2.0

FOSSA Status

Documentation

Overview

Package goroutines is an efficient, flexible, and lightweight goroutine pool written in Go. It provides a easy way to deal with several kinds of concurrent tasks with limited resource.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrQueueComplete indicates no more incoming tasks allowed to put in the pool
	ErrQueueComplete = errors.New("queue has completed already")
	// ErrQueueCTXDone indicates context in queue is done due to timeout or cancellation.
	ErrQueueCTXDone = errors.New("context in queue is done")
)
View Source
var (
	// ErrPoolRelease indicates the pool is released and closed.
	ErrPoolRelease = errors.New("pool released")
	// ErrScheduleTimeout indicates there is no resource to handle this task within specified period.
	ErrScheduleTimeout = errors.New("schedule timeout")
)
View Source
var (
	// ErrStateCorrupted indicates the worker state is corrupted.
	ErrStateCorrupted = errors.New("state corrupted")
)

Functions

This section is empty.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch is the struct containing all Batch operations

func NewBatch

func NewBatch(size int, options ...BatchOption) *Batch

NewBatch creates a asynchronous goroutine pool with the given size indicating total numbers of workers, and register consumers to deal with tasks past by producers.

func (*Batch) Close

func (b *Batch) Close()

Close will terminate all workers and close the job channel of this pool.

func (*Batch) GracefulClose

func (b *Batch) GracefulClose()

GracefulClose will terminate all workers and close the job channel of this pool in the background.

func (*Batch) Queue

func (b *Batch) Queue(fn BatchFunc) error

Queue plays as a producer to queue a task into pool, and starts processing immediately.

HINT: make sure not to call QueueComplete concurrently

Example (Default)
package main

import (
	"fmt"
	"time"

	"github.com/viney-shih/goroutines"
)

func main() {
	taskN := 14

	// allocate a one-time batch job with 3 goroutines to deal with those tasks.
	// need to spawn an extra goroutine to prevent deadlocks.
	b := goroutines.NewBatch(3)
	// don't forget to close batch job in the end
	defer b.Close()

	// need extra goroutine to play as a producer
	go func() {
		for i := 0; i < taskN; i++ {
			num := i
			b.Queue(func() (interface{}, error) {
				// sleep and return the index
				time.Sleep(10 * time.Millisecond)
				return num, nil
			})
		}

		b.QueueComplete()
	}()

	for ret := range b.Results() {
		if ret.Error() != nil {
			panic("not expected")
		}

		fmt.Println("index:", ret.Value().(int))
	}

}
Output:

index: 3
index: 1
index: 2
index: 4
index: 5
index: 6
index: 10
index: 7
index: 9
index: 8
index: 0
index: 11
index: 12
index: 13
Example (WithBatchSize)
package main

import (
	"fmt"
	"time"

	"github.com/viney-shih/goroutines"
)

func main() {
	taskN := 11

	// allocate a one-time batch job with 3 goroutines to deal with those tasks.
	// no need to spawn extra goroutine by specifing the batch size consisting with the number of tasks.
	b := goroutines.NewBatch(3, goroutines.WithBatchSize(taskN))
	// don't forget to close batch job in the end
	defer b.Close()

	// pull all tasks to this batch queue
	for i := 0; i < taskN; i++ {
		idx := i
		b.Queue(func() (interface{}, error) {
			// sleep and return the index
			time.Sleep(10 * time.Millisecond)
			return idx, nil
		})
	}

	// tell the batch that's all need to do
	// DO NOT FORGET THIS OR GOROUTINES WILL DEADLOCK
	b.QueueComplete()

	for ret := range b.Results() {
		if ret.Error() != nil {
			panic("not expected")
		}

		fmt.Println("index:", ret.Value().(int))
	}

}
Output:

index: 3
index: 1
index: 2
index: 4
index: 5
index: 6
index: 10
index: 7
index: 9
index: 8
index: 0

func (*Batch) QueueComplete

func (b *Batch) QueueComplete()

QueueComplete means finishing queuing tasks HINT: make sure not to call Queue concurrently

func (*Batch) QueueWithContext added in v1.1.0

func (b *Batch) QueueWithContext(ctx context.Context, fn BatchFunc) error

QueueWithContext plays as a producer to queue a task into pool, or return ErrQueueCTXDone due to ctx is done (timeout or cancellation).

HINT: make sure not to call QueueComplete concurrently

func (*Batch) Results

func (b *Batch) Results() <-chan Result

Results returns a Result channel that will output all completed tasks.

func (*Batch) WaitAll

func (b *Batch) WaitAll()

WaitAll is an alternative to Results() where you may want/need to wait until all work has been processed, but don't need to check results.

type BatchFunc

type BatchFunc func() (interface{}, error)

BatchFunc is the task function assigned by caller, running in the goroutine pool

type BatchOption

type BatchOption func(*batchOption)

BatchOption is an alias for functional argument in Batch

func WithBatchSize

func WithBatchSize(size int) BatchOption

WithBatchSize specifies the batch size used to forward tasks. If it is bigger enough, no more need to fork another goroutine to trigger Queue() defaultBatchSize is 10.

type Metric

type Metric interface {
	IncBusyWorker()
	DecBusyWorker()
	BusyWorkers() uint64
}

Metric represents the contract that it must report corresponding metrics.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is the struct handling the interacetion with asynchronous goroutines

func NewPool

func NewPool(size int, options ...PoolOption) *Pool

NewPool creates an instance of asynchronously goroutine pool with the given size which indicates total numbers of workers.

Example (WithAutoScaledSize)
package main

import (
	"time"

	"github.com/viney-shih/goroutines"
)

func main() {
	// allocate a pool with maximum size 5, and initialize 2 goroutines.
	// if necessary, the number of goroutines increase to 5.
	// if not busy ( by checking the running status every 10 seconds ), the number goes to 2.
	p := goroutines.NewPool(
		5,
		goroutines.WithPreAllocWorkers(2),
		goroutines.WithWorkerAdjustPeriod(time.Duration(time.Second*10)),
	)
	// don't forget to release the resource in the end
	defer p.Release()
}
Output:

Example (WithFixedSize)
package main

import (
	"github.com/viney-shih/goroutines"
)

func main() {
	// allocate a pool with maximum size 5, and initialize all goroutines at the beginning.
	p := goroutines.NewPool(5)
	// don't forget to release the resource in the end
	defer p.Release()
}
Output:

Example (WithFixedSizeAndQueues)
package main

import (
	"github.com/viney-shih/goroutines"
)

func main() {
	// allocate a pool with maximum size 5, and initialize all goroutines at the beginning.
	// at the same time, prepare a queue for buffering the tasks before sending to goroutines.
	p := goroutines.NewPool(5, goroutines.WithTaskQueueLength(2))
	// don't forget to release the resource in the end
	defer p.Release()
}
Output:

Example (WithIncreasingSize)
package main

import (
	"github.com/viney-shih/goroutines"
)

func main() {
	// allocate a pool with maximum size 5, and initialize 2 goroutines.
	// if necessary, the number of goroutines increase to 5 and never go down.
	p := goroutines.NewPool(5, goroutines.WithPreAllocWorkers(2))
	// don't forget to release the resource in the end
	defer p.Release()
}
Output:

func (*Pool) Release

func (p *Pool) Release()

Release will terminate all workers, and force them finishing what they are working on ASAP.

func (*Pool) Running

func (p *Pool) Running() int

Running returns the number of workers running for tasks.

func (*Pool) Schedule

func (p *Pool) Schedule(task TaskFunc) error

Schedule schedules the task executed by worker (goroutines) in the Pool. It will be blocked until the works accepting the request.

Example
package main

import (
	"fmt"
	"time"

	"github.com/viney-shih/goroutines"
)

func main() {
	taskN := 7
	rets := make(chan int, taskN)

	// allocate a pool with 5 goroutines to deal with those tasks
	p := goroutines.NewPool(5)
	// don't forget to release the pool in the end
	defer p.Release()

	// assign tasks to asynchronous goroutine pool
	for i := 0; i < taskN; i++ {
		idx := i
		p.Schedule(func() {
			// sleep and return the index
			time.Sleep(20 * time.Millisecond)
			rets <- idx
		})
	}

	// wait until all tasks done
	for i := 0; i < taskN; i++ {
		fmt.Println("index:", <-rets)
	}

}
Output:

index: 3
index: 1
index: 2
index: 4
index: 5
index: 6
index: 0

func (*Pool) ScheduleWithContext added in v1.1.0

func (p *Pool) ScheduleWithContext(ctx context.Context, task TaskFunc) error

ScheduleWithContext schedules the task executed by worker (goroutines) in the Pool. It will be blocked until works accepting the request, or return ErrScheduleTimeout because ctx is done (timeout or cancellation).

func (*Pool) ScheduleWithTimeout

func (p *Pool) ScheduleWithTimeout(timeout time.Duration, task TaskFunc) error

ScheduleWithTimeout schedules the task executed by worker (goroutines) in the Pool within the specified period. Or return ErrScheduleTimeout.

Example
package main

import (
	"fmt"
	"time"

	"github.com/viney-shih/goroutines"
)

func main() {
	totalN, taskN := 5, 5
	pause := make(chan struct{})
	rets := make(chan int, taskN)

	// allocate a pool with 5 goroutines to deal with those 5 tasks
	p := goroutines.NewPool(totalN)
	// don't forget to release the pool in the end
	defer p.Release()

	// full the workers which are stopped with the `pause`
	for i := 0; i < taskN; i++ {
		idx := i
		p.ScheduleWithTimeout(50*time.Millisecond, func() {
			<-pause
			rets <- idx
		})
	}

	// no more chance to add any task in Pool, and return `ErrScheduleTimeout`
	if err := p.ScheduleWithTimeout(50*time.Millisecond, func() {
		<-pause
		rets <- taskN
	}); err != nil {
		fmt.Println(err.Error())
	}

	close(pause)
	for i := 0; i < taskN; i++ {
		fmt.Println("index:", <-rets)
	}

}
Output:

schedule timeout
index: 0
index: 3
index: 2
index: 4
index: 1

func (*Pool) Workers

func (p *Pool) Workers() int

Workers returns the numbers of workers created.

type PoolOption

type PoolOption func(opts *poolOption)

PoolOption is an alias for functional argument.

func WithPreAllocWorkers

func WithPreAllocWorkers(size int) PoolOption

WithPreAllocWorkers sets up the number of workers to spawn when initializing Pool.

func WithTaskQueueLength

func WithTaskQueueLength(length int) PoolOption

WithTaskQueueLength sets up the length of task queue.

func WithWorkerAdjustPeriod

func WithWorkerAdjustPeriod(period time.Duration) PoolOption

WithWorkerAdjustPeriod sets up the duration to adjust the worker size.

type Result

type Result interface {
	// Value returns the value
	Value() interface{}
	// Error returns the error
	Error() error
}

Result is the interface returned by Results()

type TaskFunc

type TaskFunc func()

TaskFunc is the task function assigned by caller, running in the goroutine pool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL