Documentation ¶
Overview ¶
Package pipeline provides a pipeline for processing data.
The pipeline package offers a convenient way to process data using a series of stages. Each stage in the pipeline performs a specific operation on the data and passes it to the next stage. This allows for a modular and flexible approach to data processing.
One of the key benefits of using the pipeline package is that it manages the flow of data using channels. This ensures that data is passed between stages efficiently and without the need for complex synchronisation mechanisms. Additionally, the use of channels enables concurrent processing, allowing multiple stages to execute in parallel, which can significantly improve performance for computationally intensive tasks.
Another advantage of using the pipeline package is its error handling mechanism. The pipeline will stop on the first encountered error, preventing further processing and ensuring that errors are handled gracefully. This makes it easier to identify and debug issues in the data processing pipeline.
Overall, the pipeline package provides a convenient and efficient way to process data by leveraging channels for data flow management, supporting concurrency, and handling errors effectively.
Index ¶
- Variables
- func AddMerger[I any](pipe *Pipeline, name string, steps ...*model.Step[I]) (*model.Step[I], error)
- func AddRootStep[O any](pipe *Pipeline, name string, ...) (*model.Step[O], error)
- func AddSink[I any](pipe *Pipeline, name string, input *model.Step[I], ...) error
- func AddSinkFromChan[I any](pipe *Pipeline, name string, input *model.Step[I], ...) error
- func AddStepFromChan[I any, O any](pipe *Pipeline, name string, input *model.Step[I], ...) (*model.Step[O], error)
- func AddStepOneToMany[I any, O any](pipe *Pipeline, name string, input *model.Step[I], oneToMany OneToManyFn[I, O], ...) (*model.Step[O], error)
- func AddStepOneToOne[I any, O any](pipe *Pipeline, name string, input *model.Step[I], oneToOne OneToOneFn[I, O], ...) (*model.Step[O], error)
- func AddStepOneToOneOrZero[I any, O any](pipe *Pipeline, name string, input *model.Step[I], oneToOne OneToOneFn[I, O], ...) (*model.Step[O], error)
- type OneToManyFn
- type OneToOneFn
- type Pipeline
- type Splitter
- type SplitterFn
- type SplitterOption
- type StepFromChanFn
- type StepOption
Constants ¶
This section is empty.
Variables ¶
var ( // ErrPipelineMustBeSet is returned when the pipeline is not set. ErrPipelineMustBeSet = errors.New("pipe must be set") // ErrInputMustBeSet is returned when the input is not set. ErrInputMustBeSet = errors.New("input must be set") // ErrSplitterTotal is returned when the total is not set. ErrSplitterTotal = errors.New("total must be greater than 0") )
Functions ¶
func AddMerger ¶
AddMerger adds a merger step to the pipeline. It will merge the output of the steps into a single channel.
func AddRootStep ¶
func AddRootStep[O any]( pipe *Pipeline, name string, stepFn func(ctx context.Context, rootChan chan<- O) error, opts ...StepOption[O], ) (*model.Step[O], error)
AddRootStep adds a root step to the pipeline. It will run the step function.
func AddSink ¶
func AddSink[I any](pipe *Pipeline, name string, input *model.Step[I], sinkFn func(ctx context.Context, input I) error) error
AddSink adds a sink step to the pipeline. It will consume the input channel and run the sink function.
func AddSinkFromChan ¶
func AddSinkFromChan[I any]( pipe *Pipeline, name string, input *model.Step[I], stepFn func(ctx context.Context, input <-chan I) error, ) error
AddSinkFromChan adds a sink step to the pipeline. It will consume the input channel.
func AddStepFromChan ¶
func AddStepFromChan[I any, O any]( pipe *Pipeline, name string, input *model.Step[I], stepFromChan StepFromChanFn[I, O], opts ...StepOption[O], ) (*model.Step[O], error)
AddStepFromChan adds a step that takes an input channel and produces an output channel.
func AddStepOneToMany ¶
func AddStepOneToMany[I any, O any]( pipe *Pipeline, name string, input *model.Step[I], oneToMany OneToManyFn[I, O], opts ...StepOption[O], ) (*model.Step[O], error)
AddStepOneToMany adds a step that takes one input and produces many outputs.
func AddStepOneToOne ¶
func AddStepOneToOne[I any, O any]( pipe *Pipeline, name string, input *model.Step[I], oneToOne OneToOneFn[I, O], opts ...StepOption[O], ) (*model.Step[O], error)
AddStepOneToOne adds a step that takes one input and produces one output.
func AddStepOneToOneOrZero ¶
func AddStepOneToOneOrZero[I any, O any]( pipe *Pipeline, name string, input *model.Step[I], oneToOne OneToOneFn[I, O], opts ...StepOption[O], ) (*model.Step[O], error)
AddStepOneToOneOrZero adds a step that takes one input and produces one output. If the output is a zero value, it is ignored.
Types ¶
type OneToManyFn ¶
OneToManyFn is a function that takes an input and produces many outputs.
type OneToOneFn ¶
OneToOneFn is a function that takes an input and produces an output.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline is a pipeline of steps.
type Splitter ¶
Splitter is a step that splits the input into multiple outputs.
func AddSplitter ¶
func AddSplitter[I any](pipe *Pipeline, name string, input *model.Step[I], total int, opts ...SplitterOption[I]) (*Splitter[I], error)
AddSplitter adds a splitter step to the pipeline. It will split the input into multiple outputs based on the total.
func AddSplitterFn ¶
func AddSplitterFn[I any]( pipe *Pipeline, name string, input *model.Step[I], fns []SplitterFn[I], opts ...SplitterOption[I], ) (*Splitter[I], error)
AddSplitterFn adds a splitter step to the pipeline. It will split the input into multiple outputs based on the provided functions.
type SplitterFn ¶
SplitterFn is a function that returns wether to keep the input or not.
type SplitterOption ¶
SplitterOption is a function that modifies a Splitter.
func SplitterBufferSize ¶
func SplitterBufferSize[I any](bufferSize int) SplitterOption[I]
SplitterBufferSize sets the buffer size of the Splitter. Each splitted step will have a buffer of this size.
type StepFromChanFn ¶
StepFromChanFn is a function that takes an input channel and produces an output channel.
type StepOption ¶
StepOption is a function that modifies a Step.
func StepConcurrency ¶
func StepConcurrency[O any](concurrent int) StepOption[O]
StepConcurrency sets the concurrency of the step.
func StepKeepOpen ¶
func StepKeepOpen[O any]() StepOption[O]
StepKeepOpen does not close input channel.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package drawer provides a way to draw a pipeline graph.
|
Package drawer provides a way to draw a pipeline graph. |
Package measure provides a way to measure the performance of a pipeline.
|
Package measure provides a way to measure the performance of a pipeline. |