Documentation ¶
Overview ¶
Package goroutines is an efficient, flexible, and lightweight goroutine pool written in Go. It provides a easy way to deal with several kinds of concurrent tasks with limited resource.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrQueueComplete indicates no more incoming tasks allowed to put in the pool ErrQueueComplete = errors.New("queue has completed already") // ErrQueueCTXDone indicates context in queue is done due to timeout or cancellation. ErrQueueCTXDone = errors.New("context in queue is done") )
var ( // ErrPoolRelease indicates the pool is released and closed. ErrPoolRelease = errors.New("pool released") // ErrScheduleTimeout indicates there is no resource to handle this task within specified period. ErrScheduleTimeout = errors.New("schedule timeout") )
var ( // ErrStateCorrupted indicates the worker state is corrupted. ErrStateCorrupted = errors.New("state corrupted") )
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch is the struct containing all Batch operations
func NewBatch ¶
func NewBatch(size int, options ...BatchOption) *Batch
NewBatch creates a asynchronous goroutine pool with the given size indicating total numbers of workers, and register consumers to deal with tasks past by producers.
func (*Batch) Close ¶
func (b *Batch) Close()
Close will terminate all workers and close the job channel of this pool.
func (*Batch) GracefulClose ¶
func (b *Batch) GracefulClose()
GracefulClose will terminate all workers and close the job channel of this pool in the background.
func (*Batch) Queue ¶
Queue plays as a producer to queue a task into pool, and starts processing immediately.
HINT: make sure not to call QueueComplete concurrently
Example (Default) ¶
package main import ( "fmt" "time" "github.com/viney-shih/goroutines" ) func main() { taskN := 14 // allocate a one-time batch job with 3 goroutines to deal with those tasks. // need to spawn an extra goroutine to prevent deadlocks. b := goroutines.NewBatch(3) // don't forget to close batch job in the end defer b.Close() // need extra goroutine to play as a producer go func() { for i := 0; i < taskN; i++ { num := i b.Queue(func() (interface{}, error) { // sleep and return the index time.Sleep(10 * time.Millisecond) return num, nil }) } b.QueueComplete() }() for ret := range b.Results() { if ret.Error() != nil { panic("not expected") } fmt.Println("index:", ret.Value().(int)) } }
Output: index: 3 index: 1 index: 2 index: 4 index: 5 index: 6 index: 10 index: 7 index: 9 index: 8 index: 0 index: 11 index: 12 index: 13
Example (WithBatchSize) ¶
package main import ( "fmt" "time" "github.com/viney-shih/goroutines" ) func main() { taskN := 11 // allocate a one-time batch job with 3 goroutines to deal with those tasks. // no need to spawn extra goroutine by specifing the batch size consisting with the number of tasks. b := goroutines.NewBatch(3, goroutines.WithBatchSize(taskN)) // don't forget to close batch job in the end defer b.Close() // pull all tasks to this batch queue for i := 0; i < taskN; i++ { idx := i b.Queue(func() (interface{}, error) { // sleep and return the index time.Sleep(10 * time.Millisecond) return idx, nil }) } // tell the batch that's all need to do // DO NOT FORGET THIS OR GOROUTINES WILL DEADLOCK b.QueueComplete() for ret := range b.Results() { if ret.Error() != nil { panic("not expected") } fmt.Println("index:", ret.Value().(int)) } }
Output: index: 3 index: 1 index: 2 index: 4 index: 5 index: 6 index: 10 index: 7 index: 9 index: 8 index: 0
func (*Batch) QueueComplete ¶
func (b *Batch) QueueComplete()
QueueComplete means finishing queuing tasks HINT: make sure not to call Queue concurrently
func (*Batch) QueueWithContext ¶ added in v1.1.0
QueueWithContext plays as a producer to queue a task into pool, or return ErrQueueCTXDone due to ctx is done (timeout or cancellation).
HINT: make sure not to call QueueComplete concurrently
type BatchFunc ¶
type BatchFunc func() (interface{}, error)
BatchFunc is the task function assigned by caller, running in the goroutine pool
type BatchOption ¶
type BatchOption func(*batchOption)
BatchOption is an alias for functional argument in Batch
func WithBatchSize ¶
func WithBatchSize(size int) BatchOption
WithBatchSize specifies the batch size used to forward tasks. If it is bigger enough, no more need to fork another goroutine to trigger Queue() defaultBatchSize is 10.
type Metric ¶
type Metric interface { IncBusyWorker() DecBusyWorker() BusyWorkers() uint64 }
Metric represents the contract that it must report corresponding metrics.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is the struct handling the interacetion with asynchronous goroutines
func NewPool ¶
func NewPool(size int, options ...PoolOption) *Pool
NewPool creates an instance of asynchronously goroutine pool with the given size which indicates total numbers of workers.
Example (WithAutoScaledSize) ¶
package main import ( "time" "github.com/viney-shih/goroutines" ) func main() { // allocate a pool with maximum size 5, and initialize 2 goroutines. // if necessary, the number of goroutines increase to 5. // if not busy ( by checking the running status every 10 seconds ), the number goes to 2. p := goroutines.NewPool( 5, goroutines.WithPreAllocWorkers(2), goroutines.WithWorkerAdjustPeriod(time.Duration(time.Second*10)), ) // don't forget to release the resource in the end defer p.Release() }
Output:
Example (WithFixedSize) ¶
package main import ( "github.com/viney-shih/goroutines" ) func main() { // allocate a pool with maximum size 5, and initialize all goroutines at the beginning. p := goroutines.NewPool(5) // don't forget to release the resource in the end defer p.Release() }
Output:
Example (WithFixedSizeAndQueues) ¶
package main import ( "github.com/viney-shih/goroutines" ) func main() { // allocate a pool with maximum size 5, and initialize all goroutines at the beginning. // at the same time, prepare a queue for buffering the tasks before sending to goroutines. p := goroutines.NewPool(5, goroutines.WithTaskQueueLength(2)) // don't forget to release the resource in the end defer p.Release() }
Output:
Example (WithIncreasingSize) ¶
package main import ( "github.com/viney-shih/goroutines" ) func main() { // allocate a pool with maximum size 5, and initialize 2 goroutines. // if necessary, the number of goroutines increase to 5 and never go down. p := goroutines.NewPool(5, goroutines.WithPreAllocWorkers(2)) // don't forget to release the resource in the end defer p.Release() }
Output:
func (*Pool) Release ¶
func (p *Pool) Release()
Release will terminate all workers, and force them finishing what they are working on ASAP.
func (*Pool) Schedule ¶
Schedule schedules the task executed by worker (goroutines) in the Pool. It will be blocked until the works accepting the request.
Example ¶
package main import ( "fmt" "time" "github.com/viney-shih/goroutines" ) func main() { taskN := 7 rets := make(chan int, taskN) // allocate a pool with 5 goroutines to deal with those tasks p := goroutines.NewPool(5) // don't forget to release the pool in the end defer p.Release() // assign tasks to asynchronous goroutine pool for i := 0; i < taskN; i++ { idx := i p.Schedule(func() { // sleep and return the index time.Sleep(20 * time.Millisecond) rets <- idx }) } // wait until all tasks done for i := 0; i < taskN; i++ { fmt.Println("index:", <-rets) } }
Output: index: 3 index: 1 index: 2 index: 4 index: 5 index: 6 index: 0
func (*Pool) ScheduleWithContext ¶ added in v1.1.0
ScheduleWithContext schedules the task executed by worker (goroutines) in the Pool. It will be blocked until works accepting the request, or return ErrScheduleTimeout because ctx is done (timeout or cancellation).
func (*Pool) ScheduleWithTimeout ¶
ScheduleWithTimeout schedules the task executed by worker (goroutines) in the Pool within the specified period. Or return ErrScheduleTimeout.
Example ¶
package main import ( "fmt" "time" "github.com/viney-shih/goroutines" ) func main() { totalN, taskN := 5, 5 pause := make(chan struct{}) rets := make(chan int, taskN) // allocate a pool with 5 goroutines to deal with those 5 tasks p := goroutines.NewPool(totalN) // don't forget to release the pool in the end defer p.Release() // full the workers which are stopped with the `pause` for i := 0; i < taskN; i++ { idx := i p.ScheduleWithTimeout(50*time.Millisecond, func() { <-pause rets <- idx }) } // no more chance to add any task in Pool, and return `ErrScheduleTimeout` if err := p.ScheduleWithTimeout(50*time.Millisecond, func() { <-pause rets <- taskN }); err != nil { fmt.Println(err.Error()) } close(pause) for i := 0; i < taskN; i++ { fmt.Println("index:", <-rets) } }
Output: schedule timeout index: 0 index: 3 index: 2 index: 4 index: 1
type PoolOption ¶
type PoolOption func(opts *poolOption)
PoolOption is an alias for functional argument.
func WithPreAllocWorkers ¶
func WithPreAllocWorkers(size int) PoolOption
WithPreAllocWorkers sets up the number of workers to spawn when initializing Pool.
func WithTaskQueueLength ¶
func WithTaskQueueLength(length int) PoolOption
WithTaskQueueLength sets up the length of task queue.
func WithWorkerAdjustPeriod ¶
func WithWorkerAdjustPeriod(period time.Duration) PoolOption
WithWorkerAdjustPeriod sets up the duration to adjust the worker size.