pipeline

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2023 License: GPL-3.0 Imports: 13 Imported by: 0

Documentation

Overview

The pipeline package manages segments in Pipeline objects.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SegmentsFromRepr

func SegmentsFromRepr(segmentReprs *[]SegmentRepr) []segments.Segment

Creates a list of Segments from their config representations. Handles recursive definitions found in Segments.

Types

type Pipeline

type Pipeline struct {
	In   chan *pb.EnrichedFlow
	Out  <-chan *pb.EnrichedFlow
	Drop chan *pb.EnrichedFlow

	SegmentList []segments.Segment
	// contains filtered or unexported fields
}

Basically a list of segments. It further exposes the In and Out channels of the Pipeline as a whole, i.e. the ingress channel of the first and the egress channel of the last segment in its SegmentList.

func New

func New(segmentList ...segments.Segment) *Pipeline

Initializes a new Pipeline object and then starts all segment goroutines therein. Initialization includes creating any intermediate channels and wiring up the segments in the segmentList with them.

func NewFromConfig

func NewFromConfig(config []byte) *Pipeline

Builds a list of Segment objects from raw configuration bytes and initializes a Pipeline with them.

func (*Pipeline) AutoDrain

func (pipeline *Pipeline) AutoDrain()

Starts up a goroutine specific to this Pipeline which reads any message from the Out channel and discards it. This is a convenience function to enable having a segment at the end of the pipeline handle all results, i.e. having no post-pipeline processing.

func (*Pipeline) Close

func (pipeline *Pipeline) Close()

Closes down a Pipeline by closing its In channel and waiting for all segments to propagate this close event through the full pipeline, terminating all segment goroutines and thus releasing the waitgroup. Blocking.

func (*Pipeline) GetDrop

func (pipeline *Pipeline) GetDrop() <-chan *pb.EnrichedFlow

func (*Pipeline) GetInput

func (pipeline *Pipeline) GetInput() chan *pb.EnrichedFlow

func (*Pipeline) GetOutput

func (pipeline *Pipeline) GetOutput() <-chan *pb.EnrichedFlow

func (*Pipeline) Start

func (pipeline *Pipeline) Start()

Starts the Pipeline by starting all segment goroutines therein.

type SegmentRepr

type SegmentRepr struct {
	Name   string            `yaml:"segment"`             // to be looked up with a registry
	Config map[string]string `yaml:"config"`              // to be expanded by our instance
	If     []SegmentRepr     `yaml:"if,omitempty,flow"`   // only used by group segment
	Then   []SegmentRepr     `yaml:"then,omitempty,flow"` // only used by group segment
	Else   []SegmentRepr     `yaml:"else,omitempty,flow"` // only used by group segment
}

A config representation of a segment. It is intended to look like this:

  • segment: pass config: key: value foo: bar

This struct has the appropriate yaml tags inline.

func (*SegmentRepr) ExpandedConfig

func (s *SegmentRepr) ExpandedConfig() map[string]string

Returns the SegmentRepr's Config with all its variables expanded. It tries to match numeric variables such as '$1' to the corresponding command line argument not matched by flags, or else uses regular environment variable expansion.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL