worker

package
v0.5.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2024 License: MIT Imports: 2 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FanOut

type FanOut[I any] struct {
	// contains filtered or unexported fields
}

FanOut will fan out to `workerCount` total coroutines for processing.

func NewFanOut

func NewFanOut[I any](workerCount, bufferSize uint) *FanOut[I]
Example
package main

import (
	"fmt"
	"sort"

	"github.com/bir/iken/worker"
)

func main() {
	type Request struct {
		Name  string
		Index int
	}

	type Reply struct {
		Name  string
		Index int
		Size  int
	}

	inputs := []Request{{"A", 0}, {"BBBB", 1}, {"CCCCCCCCCCC", 2}}

	w := worker.NewFanOut[Request](10, 0)

	go func() {
		// Call invoke once per input data.
		for _, i := range inputs {
			w.Invoke(i)
		}

		// Call worker.FanOut.Close when all inputs are loaded.
		w.Close()
	}()

	// Unbuffered reply channel, buffer size is a tunable parameter available to the implementation
	replies := make(chan Reply)

	go func() {
		// Process and close must be executed in a separate go routine, unless the reply channel
		// is sufficiently buffered.

		w.Process(func(r Request) {
			// Do the "work".  In this example just get the size of the name.
			replies <- Reply{
				Name:  r.Name,
				Index: r.Index,
				Size:  len(r.Name),
			}
		})

		// When Process returns, all inputs have been handled.
		close(replies)
	}()

	var out []Reply
	for r := range replies {
		out = append(out, r)
	}

	// Sort the results in descending size
	sort.Slice(out, func(i int, j int) bool {
		return out[i].Size > out[j].Size
	})

	fmt.Println(out)

}
Output:

[{CCCCCCCCCCC 2 11} {BBBB 1 4} {A 0 1}]

func (*FanOut[I]) Close

func (f *FanOut[I]) Close()

Close closes the input channels. Invoke can not be called again after this call.

func (*FanOut[I]) Invoke

func (f *FanOut[I]) Invoke(input I)

Invoke adds the data to the worker for processing.

func (*FanOut[I]) Process

func (f *FanOut[I]) Process(p ProcessorFunc[I])

Process handles all inputs until the input channel is closed.

type HashFunc

type HashFunc[I any] func(I) uint

HashFunc converts an input to a uint value. It must be deterministic. See examples below.

func StringHasher added in v0.1.5

func StringHasher[I any](keyFunc KeyFunc[I, string]) HashFunc[I]

StringHasher given a KeyFunc that returns a string for a given input, returns a consistent hash for the string.

type HashedFanOut

type HashedFanOut[I any] struct {
	// contains filtered or unexported fields
}

HashedFanOut is a special purpose fan out that hashes the input so that the same keys are always processed by the same worker. This is used to ensure guard against race conditions in non-reentrant processors.

func NewHashedFanOut

func NewHashedFanOut[I any](workerCount, bufferSize uint, hasher HashFunc[I]) *HashedFanOut[I]

func (*HashedFanOut[I]) Close

func (f *HashedFanOut[I]) Close()

func (*HashedFanOut[I]) Invoke

func (f *HashedFanOut[I]) Invoke(input I)

func (*HashedFanOut[I]) Process

func (f *HashedFanOut[I]) Process(p ProcessorFunc[I])

type KeyFunc added in v0.1.5

type KeyFunc[I, K any] func(I) K

type ProcessorFunc

type ProcessorFunc[I any] func(I)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL