etl

package
v0.0.0-...-38a333d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2024 License: GPL-3.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Do

func Do[T any](producerDef Producer[T], stageDefs ...Stage[T]) error

Do runs the parallel pipeline defined by the specified Producer and Stages. Work items of type T are produced by the Producer, then handled by each Stage in the provided order.

func SequentialDo

func SequentialDo[T any](producerDef Producer[T], stageDefs ...Stage[T]) error

SequentialDo behaves like Do(), but runs sequentially on one thread. Stage buffer lengths and concurrency options are ignored, but RecyclingProducers do recycle the (single) work item.

Types

type Metrics

type Metrics struct {
	WallDuration    time.Duration
	ProducerMetrics []*StageMetrics
	StageMetrics    [][]*StageMetrics
}

Metrics defines a set of performance metrics collected for an entire pipeline.

func Measure

func Measure[T any](producerDef Producer[T], stageDefs ...Stage[T]) (*Metrics, error)

Measure behaves like Do(), running the parallel pipeline defined by the specified Producer and Stages, but also measures the time spent in each stage.

func (*Metrics) String

func (pm *Metrics) String() string

type Producer

type Producer[T any] func(index uint) (*producer[T], error)

Producer defines a function building a pipeline producer.

func NewProducer

func NewProducer[T any](fn ProducerFn[T], optFns ...StageOptionFn) Producer[T]

NewProducer defines an initial stage in a pipeline, in which work items of type T are prepared for processing.

func NewRecyclingProducer

func NewRecyclingProducer[T any](fn RecyclingProducerFn[T], optFns ...StageOptionFn) Producer[T]

NewRecyclingProducer defines an initial stage in a pipeline, in which work items of type T are prepared for processing. The provided RecyclingProducerFn should invoke its `get` method to get a previously-allocated work item, only constructing a new work item if `get` returns false.

type ProducerFn

type ProducerFn[T any] func(put func(T)) error

type RecyclingProducerFn

type RecyclingProducerFn[T any] func(get func() (T, bool), put func(T)) error

type Stage

type Stage[T any] func(index uint) (*stage[T], error)

Stage defines a function building a pipeline stage.

func NewStage

func NewStage[T any](fn StageFn[T], optFns ...StageOptionFn) Stage[T]

NewStage defines an intermediate stage in a pipeline, in which work items of type T are operated upon.

type StageFn

type StageFn[T any] func(in T) (out T, err error)

type StageMetrics

type StageMetrics struct {
	StageName                   string
	StageInstance               uint
	WorkDuration, StageDuration time.Duration
	Items                       uint
}

StageMetrics defines a set of performance metrics collected for a particular pipeline stage.

type StageOptionFn

type StageOptionFn func(so *stageOptions) error

func Concurrency

func Concurrency(concurrency uint) StageOptionFn

Concurrency specifies the desired concurrency of a Stage or Producer. A Stage's concurrency is the number of worker goroutines performing that Stage.

func InputBufferSize

func InputBufferSize(inputBufferSize uint) StageOptionFn

InputBufferSize defines the size of the input buffer of a Stage. For recycling Producers, this defines the size of the input buffer of the recycling mechanism. For non-recyling Producers, this has no effect. Defaults to 1.

func Name

func Name(name string) StageOptionFn

Name specifies the name of a Stage or a Producer, for debugging. If unspecified, the Stage number (0 for the Producer) will be used.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL