Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ProcessingQueue ¶
type ProcessingQueue chan emptyStruct
ProcessingQueue is a simple utility type for limiting the number of goroutines in execution. This is convenient especially in the context of heavy concurrent computations.
ProcessingQueue is implemented as a queue of slots, where each slot represents a busy concurrent job. The queue is initialized with a size of your choice (see New) and is initially empty. Each time Run is called, the function first waits for one job-slot to be available; then it marks one free slot as busy and simply calls the given callback. Once the callback is fully executed (even in case of panics), it frees a busy slot, making it available for further usages.
Run is suitable for being called from a goroutine. Go simply calls Run directly as a goroutine.
func New ¶
func New(size int) ProcessingQueue
New returns a new ProcessingQueue initialized with the given size. It panics if size is lower than 1.
func (ProcessingQueue) Go ¶
func (pq ProcessingQueue) Go(f func())
Go simply executes Run as a goroutine.
Example ¶
package main import ( "fmt" "github.com/nlpodyssey/spago/pkg/utils/processingqueue" "sync" "time" ) func main() { pq := processingqueue.New(2) var wg sync.WaitGroup wg.Add(4) for i := 0; i < 4; i++ { time.Sleep(100 * time.Millisecond) ii := i pq.Go(func() { fmt.Printf("Processing %d\n", ii) // Do something computationally heavy... time.Sleep(500 * time.Millisecond) fmt.Printf("Processed %d\n", ii) wg.Done() }) } wg.Wait() }
Output: Processing 0 Processing 1 Processed 0 Processing 2 Processed 1 Processing 3 Processed 2 Processed 3
func (ProcessingQueue) Run ¶
func (pq ProcessingQueue) Run(f func())
Run waits for a free job-slot to be available in the queue, than marks one slot as busy and calls f, eventually releasing the slot.
Example ¶
package main import ( "fmt" "github.com/nlpodyssey/spago/pkg/utils/processingqueue" "sync" "time" ) func main() { pq := processingqueue.New(2) var wg sync.WaitGroup wg.Add(4) for i := 0; i < 4; i++ { time.Sleep(100 * time.Millisecond) go func(i int) { fmt.Printf("Before %d\n", i) // Do something computationally light... pq.Run(func() { fmt.Printf("Processing %d\n", i) // Do something computationally heavy... time.Sleep(500 * time.Millisecond) fmt.Printf("Processed %d\n", i) }) fmt.Printf("After %d\n", i) // Do something computationally light... wg.Done() }(i) } wg.Wait() }
Output: Before 0 Processing 0 Before 1 Processing 1 Before 2 Before 3 Processed 0 After 0 Processing 2 Processed 1 After 1 Processing 3 Processed 2 After 2 Processed 3 After 3
func (ProcessingQueue) Size ¶
func (pq ProcessingQueue) Size() int
Size returns the size of the ProcessingQueue.