parallel

package
v0.1.361 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2024 License: GPL-3.0 Imports: 6 Imported by: 0

Documentation

Overview

Package parallel implements helpers for fast processing of line oriented inputs. Basic usage example:

r := strings.NewReader("1\n2\n3\n")
f := func(ln int, b []byte) ([]byte, error) {
    result := fmt.Sprintf("#%d %s", ln, string(b))
    return []byte(result), nil
}

p := parallel.NewProcessor(r, os.Stdout, f)
if err := p.Run(); err != nil {
    log.Fatal(err)
}

This would print out:

#0 1
#1 2
#2 3

Note that the order of the input is not guaranteed to be preserved. If you care about the exact position, utilize the originating line number passed into the transforming function.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BytesBatch

type BytesBatch struct {
	// contains filtered or unexported fields
}

BytesBatch is a slice of byte slices.

func NewBytesBatch

func NewBytesBatch() *BytesBatch

NewBytesBatch creates a new BytesBatch with a given capacity.

func NewBytesBatchCapacity

func NewBytesBatchCapacity(cap int) *BytesBatch

NewBytesBatchCapacity creates a new BytesBatch with a given capacity.

func (*BytesBatch) Add

func (bb *BytesBatch) Add(b Record)

Add adds an element to the batch.

func (*BytesBatch) Reset

func (bb *BytesBatch) Reset()

Reset empties this batch.

func (*BytesBatch) Size

func (bb *BytesBatch) Size() int

Size returns the number of elements in the batch.

func (*BytesBatch) Slice

func (bb *BytesBatch) Slice() []Record

Slice returns a slice of byte slices.

type Processor

type Processor struct {
	BatchSize        int
	RecordSeparator  byte
	NumWorkers       int
	SkipEmptyLines   bool
	BatchMemoryLimit int64
	// contains filtered or unexported fields
}

Processor can process lines in parallel.

func NewProcessor

func NewProcessor(r io.Reader, w io.Writer, f TransformerFunc) *Processor

NewProcessor creates a new line processor, which reads lines from a reader, applies a function and writes results back to a writer.

func (*Processor) Run

func (p *Processor) Run() error

Run starts the workers, crunching through the input.

func (*Processor) RunWorkers

func (p *Processor) RunWorkers(numWorkers int) error

RunWorkers allows to quickly set the number of workers.

type Record

type Record struct {
	// contains filtered or unexported fields
}

Record groups a value and a corresponding line number.

type TransformerFunc

type TransformerFunc func(lineno int64, b []byte) ([]byte, error)

TransformerFunc takes a line number and a slice of bytes and returns a slice of bytes and a an error. A common denominator of functions that transform data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL