pipeline

package
v0.8.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2022 License: GPL-3.0 Imports: 13 Imported by: 0

Documentation

Overview

The pipeline package manages segments in Pipeline objects.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SegmentsFromRepr

func SegmentsFromRepr(segmentReprs *[]SegmentRepr) []segments.Segment

Creates a list of Segments from their config representations. Handles recursive definitions found in Segments.

Types

type Pipeline

type Pipeline struct {
	In   chan *flow.FlowMessage
	Out  <-chan *flow.FlowMessage
	Drop chan *flow.FlowMessage

	SegmentList []segments.Segment
	// contains filtered or unexported fields
}

Basically a list of segments. It further exposes the In and Out channels of the Pipeline as a whole, i.e. the ingress channel of the first and the egress channel of the last segment in its SegmentList.

func New

func New(segmentList ...segments.Segment) *Pipeline

Initializes a new Pipeline object and then starts all segment goroutines therein. Initialization includes creating any intermediate channels and wiring up the segments in the segmentList with them.

func NewFromConfig

func NewFromConfig(config []byte) *Pipeline

Builds a list of Segment objects from raw configuration bytes and initializes a Pipeline with them.

func (*Pipeline) AutoDrain

func (pipeline *Pipeline) AutoDrain()

Starts up a goroutine specific to this Pipeline which reads any message from the Out channel and discards it. This is a convenience function to enable having a segment at the end of the pipeline handle all results, i.e. having no post-pipeline processing.

func (*Pipeline) Close

func (pipeline *Pipeline) Close()

Closes down a Pipeline by closing its In channel and waiting for all segments to propagate this close event through the full pipeline, terminating all segment goroutines and thus releasing the waitgroup. Blocking.

func (*Pipeline) GetDrop

func (pipeline *Pipeline) GetDrop() <-chan *flow.FlowMessage

func (*Pipeline) GetInput

func (pipeline *Pipeline) GetInput() chan *flow.FlowMessage

func (*Pipeline) GetOutput

func (pipeline *Pipeline) GetOutput() <-chan *flow.FlowMessage

func (*Pipeline) Start

func (pipeline *Pipeline) Start()

Starts the Pipeline by starting all segment goroutines therein.

type SegmentRepr

type SegmentRepr struct {
	Name   string            `yaml:"segment"`             // to be looked up with a registry
	Config map[string]string `yaml:"config"`              // to be expanded by our instance
	If     []SegmentRepr     `yaml:"if,omitempty,flow"`   // only used by group segment
	Then   []SegmentRepr     `yaml:"then,omitempty,flow"` // only used by group segment
	Else   []SegmentRepr     `yaml:"else,omitempty,flow"` // only used by group segment
}

A config representation of a segment. It is intended to look like this:

  • segment: pass config: key: value foo: bar

This struct has the appropriate yaml tags inline.

func (*SegmentRepr) ExpandedConfig

func (s *SegmentRepr) ExpandedConfig() map[string]string

Returns the SegmentRepr's Config with all its variables expanded. It tries to match numeric variables such as '$1' to the corresponding command line argument not matched by flags, or else uses regular environment variable expansion.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL