lineworker

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2024 License: MIT Imports: 2 Imported by: 2

README

lineworker provides worker pools that perform work in parallel, but output the work results in the order the work was given.

Take a look at the documentation for more info: https://godocs.io/github.com/codesoap/lineworker

Example

slowSprint := func(a int) (string, error) {
	delay := rand.Int()
	time.Sleep(time.Duration(delay%6) * time.Millisecond)
	return fmt.Sprint(a), nil
}

// Start the worker goroutines:
pool := lineworker.NewWorkerPool(runtime.NumCPU(), slowSprint)

// Put in work:
go func() {
	for i := 0; i < 10; i++ {
		workAccepted := pool.Process(i)
		if !workAccepted {
			// Cannot happen in this example, because pool.Stop is not called
			// outside this goroutine, but is handled for demonstration
			// purposes.
			return
		}
	}
	pool.Stop()
}()

// Retrieve the results:
for {
	res, err := pool.Next()
	if err == lineworker.EOS {
		break
	} else if err != nil {
		panic(err)
	}
	fmt.Println(res)
}

Documentation

Overview

package lineworker provides a worker pool with a fixed amount of workers. It outputs work results in the order the work was given. The package is designed for serial data input and output; the functions Process and Next must never be called in parallel.

Each worker caches at most one result, so that no new work is processed, if as many results are waiting to be consumed as there are workers.

Index

Examples

Constants

This section is empty.

Variables

View Source
var EOS = fmt.Errorf("no more results available")

EOS is the error returned by Next when no more results are available.

Functions

This section is empty.

Types

type WorkFunc

type WorkFunc[IN, OUT any] func(in IN) (OUT, error)

type WorkerPool

type WorkerPool[IN, OUT any] struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"fmt"
	"math/rand"
	"runtime"
	"time"

	"github.com/codesoap/lineworker"
)

func main() {
	slowSprint := func(a int) (string, error) {
		delay := rand.Int()
		time.Sleep(time.Duration(delay%6) * time.Millisecond)
		return fmt.Sprint(a), nil
	}
	pool := lineworker.NewWorkerPool(runtime.NumCPU(), slowSprint)
	go func() {
		for i := 0; i < 10; i++ {
			workAccepted := pool.Process(i)
			if !workAccepted {
				// Cannot happen in this example, because pool.Stop is not called
				// outside this goroutine, but is handled for demonstration
				// purposes.
				return
			}
		}
		pool.Stop()
	}()
	for {
		res, err := pool.Next()
		if err == lineworker.EOS {
			break
		} else if err != nil {
			panic(err)
		}
		fmt.Println(res)
	}
}
Output:

0
1
2
3
4
5
6
7
8
9

func NewWorkerPool

func NewWorkerPool[IN, OUT any](workerCount int, f WorkFunc[IN, OUT]) *WorkerPool[IN, OUT]

NewWorkerPool creates a new worker pool with workerCount workers waiting to process data of type IN to results of type OUT via f.

func (*WorkerPool[IN, OUT]) DiscardWork added in v0.2.0

func (w *WorkerPool[IN, OUT]) DiscardWork()

DiscardWork recieves and discards all pending work results, so that workers can quit after Stop has been called. It will block until all workers have quit.

DiscardWork must only be called after Stop has been called.

func (*WorkerPool[IN, OUT]) Next

func (w *WorkerPool[IN, OUT]) Next() (OUT, error)

Next will return the next result with its error. If the next result is not yet ready, it will block. If no more results are available, the EOS error will be returned.

func (*WorkerPool[IN, OUT]) Process

func (w *WorkerPool[IN, OUT]) Process(input IN) bool

Process queues a new input for processing. If all workers are currently busy, Process will block.

Process will return true if the input has been accepted. If Stop has been called previously, Process will discard the given input and return false.

func (*WorkerPool[IN, OUT]) Stop

func (w *WorkerPool[IN, OUT]) Stop()

Stop should be called after all calls to Process have been made. It stops the workers from accepting new work and allows their resources to be released after all results have been consumed via Next or discarded with DiscardWork.

Further calls to Stop after the first call will do nothing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL