concurrent

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2021 License: BSD-3-Clause Imports: 6 Imported by: 0

Documentation

Overview

Package to apply a function over an array or stream of data.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Lazily

func Lazily(f Evaluator, lookahead int, reaper <-chan struct{}, init ...interface{}) func() interface{}

Lazily is function to generate a lazy evaluator.

Lazy functions are terminated by closing the reaper channel. nil should be passed as a reaper for perpetual lazy functions.

Example
package main

import (
	"github.com/biogo/biogo/concurrent"

	"fmt"
)

func main() {
	sentence := "A sentence to be slowly ROT'ed."

	ROT13 := func(b byte) byte {
		c := b & ('a' - 'A')
		i := b&^c - 'A'
		if i < 26 {
			return (i+13)%26 + 'A' | c
		}
		return b
	}

	mutator := concurrent.Lazily(
		func(state ...interface{}) (interface{}, concurrent.State) {
			b, c := []byte(state[0].(string)), state[1].(int)
			b[c] = ROT13(b[c])
			ms := string(b)
			return state[0], concurrent.State{ms, (c + 1) % len(ms)}
		},
		0,           // No lookahead
		nil,         // Perpetual evaluator
		sentence, 0, // Initial state
	)

	var r string
	for i := 0; i < len(sentence)*2; i++ {
		r = mutator().(string)
		if i%10 == 0 {
			fmt.Println(r)
		}
	}
	fmt.Println(r)
}
Output:

A sentence to be slowly ROT'ed.
N fragrapr to be slowly ROT'ed.
N fragrapr gb or fybwly ROT'ed.
N fragrapr gb or fybjyl EBG'rq.
A sentencr gb or fybjyl EBG'rq.
A sentence to be slbjyl EBG'rq.
A sentence to be slowly ROT'eq.
A sentence to be slowly ROT'ed.

func Map

func Map(set Mapper, threads, maxChunkSize int) (results []interface{}, err error)

Map routines to iterate a function over an array, potentially splitting the array slice into chunks so that each chunk is processed concurrently. When using concurrent processing the Chunk size is either the nearest even division of the total array over the chosen concurrent processing goroutines or a specified maximum chunk size, whichever is smaller. Reducing chunk size can reduce the impact of divergence in time for processing chunks, but may add to overhead.

Example
package main

import (
	"github.com/biogo/biogo/concurrent"

	"fmt"
)

type CountConsumer []int

func (c CountConsumer) Slice(i, j int) concurrent.Mapper { return c[i:j] }
func (c CountConsumer) Len() int                         { return len(c) }

func (c CountConsumer) Operation() (r interface{}, err error) {
	var sum int
	for i, v := range c {
		sum += v
		c[i] = 0
	}
	return sum, nil
}

func main() {
	c := CountConsumer{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	fmt.Println(c)

	for c.Len() > 1 {
		result, err := concurrent.Map(c, 1, 2)
		if err != nil {
			fmt.Println(err)
		} else {
			fmt.Println(result)
			c = c[:0]
			for _, r := range result {
				c = append(c, r.(int))
			}
		}
	}

}
Output:

[1 2 3 4 5 6 7 8 9 10]
[3 7 11 15 19]
[10 26 19]
[36 19]
[55]

Types

type Concurrent

type Concurrent interface {
	Process(...interface{})
	Result() (interface{}, error)
}

The Concurrent interface represents a processor that allows adding jobs and retrieving results

type Evaluator

type Evaluator func(...interface{}) (interface{}, State)

Evaluator is a function for lazy evaluation.

type Mapper

type Mapper interface {
	Operator
	Slice(i, j int) Mapper
	Len() int
}

A Mapper is an Operator that can subdivide itself.

type Operator

type Operator interface {
	Operation() (interface{}, error)
}

Interface is a type that performs an operation on itself, returning any error.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

The Processor type manages a number of concurrent Processes.

func NewProcessor

func NewProcessor(queue chan Operator, buffer int, threads int) (p *Processor)

Return a new Processor to operate the function f over the number of threads specified taking input from queue and placing the result in buffer. Threads is limited by GOMAXPROCS, if threads is greater GOMAXPROCS or less than 1 then threads is set to GOMAXPROCS.

func (*Processor) Close

func (p *Processor) Close()

Close the queue.

func (*Processor) Process

func (p *Processor) Process(value ...Operator)

Submit values for processing.

func (*Processor) Result

func (p *Processor) Result() (interface{}, error)

Get the next available result.

func (*Processor) Stop

func (p *Processor) Stop()

Terminate the goroutines.

func (*Processor) Wait

func (p *Processor) Wait()

Wait for all running processes to finish.

func (*Processor) Working

func (p *Processor) Working() int

Return the number of working goroutines.

type Promise

type Promise struct {
	// contains filtered or unexported fields
}

Implementation of a promise multiple goroutine synchronisation and communication system based on the approach used in Alice. Promises will safely allow multiple promisers to interact with multiple promisees.

New or non-error Broken Promises can be Fulfilled or Failed. Fulfilled or Failed Promises can be Broken and any state of Promise can be Recovered if specified at creation.

Promises can be mutable or not, recoverable or not and may relay internal error states to other listeners. Mutable promises may have their value state changed with subsequence Fulfill calls. Recoverable promises may be recovered after a Fail call. Promises created with relay set to true will relay an error generated by attempting to fulfill an immutable fulfilled promise.

func NewPromise

func NewPromise(mutable, recoverable, relay bool) *Promise

Create a new promise

func PromiseMap

func PromiseMap(set Mapper, threads, maxChunkSize int) *Promise

A future Map function - synchronisation is via a Promise.

Example
package main

import (
	"github.com/biogo/biogo/concurrent"

	"fmt"
	"time"
)

type SlowCounter []int

func (c SlowCounter) Slice(i, j int) concurrent.Mapper { return c[i:j] }
func (c SlowCounter) Len() int                         { return len(c) }

func (c SlowCounter) Operation() (r interface{}, err error) {
	var sum int
	for _, v := range c {
		sum += v
		time.Sleep(1e8)
	}
	return sum, nil
}

func main() {
	c := SlowCounter{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

	p := concurrent.PromiseMap(c, 1, 2)
	fmt.Println("Waiting...")
	request1 := <-p.Wait()
	if request1.Err != nil {
		fmt.Println(request1.Err)
	} else {
		fmt.Println(request1.Value)
	}
	request2 := <-p.Wait()
	if request2.Err != nil {
		fmt.Println(request2.Err)
	} else {
		fmt.Println(request2.Value)
	}

}
Output:

Waiting...
[3 7 11 15 19]
[3 7 11 15 19]

func (*Promise) Break

func (p *Promise) Break()

Break an already fulfilled or failed promise, blocking all listeners.

func (*Promise) Fail

func (p *Promise) Fail(value interface{}, err error) (ok bool)

Fail a promise allowing listeners to unblock, but sending an error state.

func (*Promise) Fulfill

func (p *Promise) Fulfill(value interface{}) error

Fulfill a promise, allowing listeners to unblock.

func (*Promise) Recover

func (p *Promise) Recover(value interface{}) (ok bool)

Recover a failed promise, setting the error state to nil. Promise must be recoverable.

func (*Promise) Wait

func (p *Promise) Wait() <-chan Result

Wait for a promise to be fulfilled, failed or recovered.

type Result

type Result struct {
	Value interface{}
	Err   error
}

type State

type State []interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL