processor

package
v0.26.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2018 License: MIT Imports: 38 Imported by: 0

Documentation

Overview

Package processor contains implementations of types.Processor, which perform an arbitrary operation on a message and either returns >0 messages to be propagated towards a sink, or a response to be sent back to the message source.

Index

Constants

View Source
const (
	TypeArchive      = "archive"
	TypeBatch        = "batch"
	TypeBoundsCheck  = "bounds_check"
	TypeCombine      = "combine"
	TypeCompress     = "compress"
	TypeConditional  = "conditional"
	TypeDecode       = "decode"
	TypeDecompress   = "decompress"
	TypeDedupe       = "dedupe"
	TypeEncode       = "encode"
	TypeFilter       = "filter"
	TypeFilterParts  = "filter_parts"
	TypeGrok         = "grok"
	TypeHash         = "hash"
	TypeHashSample   = "hash_sample"
	TypeHTTP         = "http"
	TypeInsertPart   = "insert_part"
	TypeJMESPath     = "jmespath"
	TypeJSON         = "json"
	TypeMergeJSON    = "merge_json"
	TypeMetadata     = "metadata"
	TypeMetric       = "metric"
	TypeNoop         = "noop"
	TypeProcessField = "process_field"
	TypeProcessMap   = "process_map"
	TypeSample       = "sample"
	TypeSelectParts  = "select_parts"
	TypeSplit        = "split"
	TypeText         = "text"
	TypeThrottle     = "throttle"
	TypeUnarchive    = "unarchive"
)

String constants representing each processor type.

Variables

View Source
var Constructors = map[string]TypeSpec{}

Constructors is a map of all processor types with their specs.

Functions

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

func SanitiseConfig

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

Types

type Archive

type Archive struct {
	// contains filtered or unexported fields
}

Archive is a processor that can selectively archive parts of a message into a single part using a chosen archive type.

func (*Archive) ProcessMessage

func (d *Archive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type ArchiveConfig

type ArchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Path   string `json:"path" yaml:"path"`
}

ArchiveConfig contains configuration fields for the Archive processor.

func NewArchiveConfig

func NewArchiveConfig() ArchiveConfig

NewArchiveConfig returns a ArchiveConfig with default values.

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch is a processor that combines messages into a batch until a size limit or other condition is reached, at which point the batch is sent out. When a message is combined without yet producing a batch a NoAck response is returned, which is interpretted as source types as an instruction to send another message through but hold off on acknowledging this one.

Eventually, when the batch reaches its target size, the batch is sent through the pipeline as a single message and an acknowledgement for that message determines whether the whole batch of messages are acknowledged.

func (*Batch) ProcessMessage

func (c *Batch) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type BatchConfig

type BatchConfig struct {
	ByteSize  int              `json:"byte_size" yaml:"byte_size"`
	Condition condition.Config `json:"condition" yaml:"condition"`
	PeriodMS  int              `json:"period_ms" yaml:"period_ms"`
}

BatchConfig contains configuration fields for the Batch processor.

func NewBatchConfig

func NewBatchConfig() BatchConfig

NewBatchConfig returns a BatchConfig with default values.

type BoundsCheck

type BoundsCheck struct {
	// contains filtered or unexported fields
}

BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*BoundsCheck) ProcessMessage

func (m *BoundsCheck) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
	MinPartSize int `json:"min_part_size" yaml:"min_part_size"`
}

BoundsCheckConfig contains configuration fields for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type Combine

type Combine struct {
	// contains filtered or unexported fields
}

Combine is a processor that combines messages into a batch until a target number of message parts is reached, at which point the batch is sent out. When a message is combined without yet producing a batch a NoAck response is returned, which is interpretted as source types as an instruction to send another message through but hold off on acknowledging this one.

Eventually, when the batch reaches its target size, the batch is sent through the pipeline as a single message and an acknowledgement for that message determines whether the whole batch of messages are acknowledged.

func (*Combine) ProcessMessage

func (c *Combine) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type CombineConfig

type CombineConfig struct {
	Parts int `json:"parts" yaml:"parts"`
}

CombineConfig contains configuration fields for the Combine processor.

func NewCombineConfig

func NewCombineConfig() CombineConfig

NewCombineConfig returns a CombineConfig with default values.

type Compress

type Compress struct {
	// contains filtered or unexported fields
}

Compress is a processor that can selectively compress parts of a message as a chosen compression algorithm.

func (*Compress) ProcessMessage

func (c *Compress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type CompressConfig

type CompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Level     int    `json:"level" yaml:"level"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

CompressConfig contains configuration fields for the Compress processor.

func NewCompressConfig

func NewCompressConfig() CompressConfig

NewCompressConfig returns a CompressConfig with default values.

type Conditional

type Conditional struct {
	// contains filtered or unexported fields
}

Conditional is a processor that only applies child processors under a certain condition.

func (*Conditional) ProcessMessage

func (c *Conditional) ProcessMessage(msg types.Message) (msgs []types.Message, res types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type ConditionalConfig

type ConditionalConfig struct {
	Condition      condition.Config `json:"condition" yaml:"condition"`
	Processors     []Config         `json:"processors" yaml:"processors"`
	ElseProcessors []Config         `json:"else_processors" yaml:"else_processors"`
}

ConditionalConfig is a config struct containing fields for the Conditional processor.

func NewConditionalConfig

func NewConditionalConfig() ConditionalConfig

NewConditionalConfig returns a default ConditionalConfig.

type Config

type Config struct {
	Type         string             `json:"type" yaml:"type"`
	Archive      ArchiveConfig      `json:"archive" yaml:"archive"`
	Batch        BatchConfig        `json:"batch" yaml:"batch"`
	BoundsCheck  BoundsCheckConfig  `json:"bounds_check" yaml:"bounds_check"`
	Combine      CombineConfig      `json:"combine" yaml:"combine"`
	Compress     CompressConfig     `json:"compress" yaml:"compress"`
	Conditional  ConditionalConfig  `json:"conditional" yaml:"conditional"`
	Decode       DecodeConfig       `json:"decode" yaml:"decode"`
	Decompress   DecompressConfig   `json:"decompress" yaml:"decompress"`
	Dedupe       DedupeConfig       `json:"dedupe" yaml:"dedupe"`
	Encode       EncodeConfig       `json:"encode" yaml:"encode"`
	Filter       FilterConfig       `json:"filter" yaml:"filter"`
	FilterParts  FilterPartsConfig  `json:"filter_parts" yaml:"filter_parts"`
	Grok         GrokConfig         `json:"grok" yaml:"grok"`
	Hash         HashConfig         `json:"hash" yaml:"hash"`
	HashSample   HashSampleConfig   `json:"hash_sample" yaml:"hash_sample"`
	HTTP         HTTPConfig         `json:"http" yaml:"http"`
	InsertPart   InsertPartConfig   `json:"insert_part" yaml:"insert_part"`
	JMESPath     JMESPathConfig     `json:"jmespath" yaml:"jmespath"`
	JSON         JSONConfig         `json:"json" yaml:"json"`
	MergeJSON    MergeJSONConfig    `json:"merge_json" yaml:"merge_json"`
	Metadata     MetadataConfig     `json:"metadata" yaml:"metadata"`
	Metric       MetricConfig       `json:"metric" yaml:"metric"`
	ProcessField ProcessFieldConfig `json:"process_field" yaml:"process_field"`
	ProcessMap   ProcessMapConfig   `json:"process_map" yaml:"process_map"`
	Sample       SampleConfig       `json:"sample" yaml:"sample"`
	SelectParts  SelectPartsConfig  `json:"select_parts" yaml:"select_parts"`
	Split        SplitConfig        `json:"split" yaml:"split"`
	Text         TextConfig         `json:"text" yaml:"text"`
	Throttle     ThrottleConfig     `json:"throttle" yaml:"throttle"`
	Unarchive    UnarchiveConfig    `json:"unarchive" yaml:"unarchive"`
}

Config is the all encompassing configuration struct for all processor types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

func (*Config) UnmarshalJSON

func (m *Config) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.

func (*Config) UnmarshalYAML

func (m *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type Decode

type Decode struct {
	// contains filtered or unexported fields
}

Decode is a processor that can selectively decode parts of a message following a chosen scheme.

func (*Decode) ProcessMessage

func (c *Decode) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type DecodeConfig

type DecodeConfig struct {
	Scheme string `json:"scheme" yaml:"scheme"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

DecodeConfig contains configuration fields for the Decode processor.

func NewDecodeConfig

func NewDecodeConfig() DecodeConfig

NewDecodeConfig returns a DecodeConfig with default values.

type Decompress

type Decompress struct {
	// contains filtered or unexported fields
}

Decompress is a processor that can decompress parts of a message following a chosen compression algorithm.

func (*Decompress) ProcessMessage

func (d *Decompress) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type DecompressConfig

type DecompressConfig struct {
	Algorithm string `json:"algorithm" yaml:"algorithm"`
	Parts     []int  `json:"parts" yaml:"parts"`
}

DecompressConfig contains configuration fields for the Decompress processor.

func NewDecompressConfig

func NewDecompressConfig() DecompressConfig

NewDecompressConfig returns a DecompressConfig with default values.

type Dedupe

type Dedupe struct {
	// contains filtered or unexported fields
}

Dedupe is a processor that deduplicates messages either by hashing the full contents of message parts or by hashing the value of an interpolated string.

func (*Dedupe) ProcessMessage

func (d *Dedupe) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type DedupeConfig

type DedupeConfig struct {
	Cache          string `json:"cache" yaml:"cache"`
	HashType       string `json:"hash" yaml:"hash"`
	Parts          []int  `json:"parts" yaml:"parts"` // message parts to hash
	Key            string `json:"key" yaml:"key"`
	DropOnCacheErr bool   `json:"drop_on_err" yaml:"drop_on_err"`
}

DedupeConfig contains configuration fields for the Dedupe processor.

func NewDedupeConfig

func NewDedupeConfig() DedupeConfig

NewDedupeConfig returns a DedupeConfig with default values.

type Encode

type Encode struct {
	// contains filtered or unexported fields
}

Encode is a processor that can selectively encode parts of a message following a chosen scheme.

func (*Encode) ProcessMessage

func (c *Encode) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type EncodeConfig

type EncodeConfig struct {
	Scheme string `json:"scheme" yaml:"scheme"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

EncodeConfig contains configuration fields for the Encode processor.

func NewEncodeConfig

func NewEncodeConfig() EncodeConfig

NewEncodeConfig returns a EncodeConfig with default values.

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

Filter is a processor that checks each message against a condition and rejects the message if a condition returns false.

func (*Filter) ProcessMessage

func (c *Filter) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type FilterConfig

type FilterConfig struct {
	condition.Config `json:",inline" yaml:",inline"`
}

FilterConfig contains configuration fields for the Filter processor.

func NewFilterConfig

func NewFilterConfig() FilterConfig

NewFilterConfig returns a FilterConfig with default values.

type FilterParts

type FilterParts struct {
	// contains filtered or unexported fields
}

FilterParts is a processor that checks each part from a message against a condition and removes the part if the condition returns false.

func (*FilterParts) ProcessMessage

func (c *FilterParts) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type FilterPartsConfig

type FilterPartsConfig struct {
	condition.Config `json:",inline" yaml:",inline"`
}

FilterPartsConfig contains configuration fields for the FilterParts processor.

func NewFilterPartsConfig

func NewFilterPartsConfig() FilterPartsConfig

NewFilterPartsConfig returns a FilterPartsConfig with default values.

type Grok

type Grok struct {
	// contains filtered or unexported fields
}

Grok is a processor that executes Grok queries on a message part and replaces the contents with the result.

func (*Grok) ProcessMessage

func (g *Grok) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type GrokConfig

type GrokConfig struct {
	Parts       []int    `json:"parts" yaml:"parts"`
	Patterns    []string `json:"patterns" yaml:"patterns"`
	RemoveEmpty bool     `json:"remove_empty_values" yaml:"remove_empty_values"`
	NamedOnly   bool     `json:"named_captures_only" yaml:"named_captures_only"`
	UseDefaults bool     `json:"use_default_patterns" yaml:"use_default_patterns"`
	To          string   `json:"output_format" yaml:"output_format"`
}

GrokConfig contains configuration fields for the Grok processor.

func NewGrokConfig

func NewGrokConfig() GrokConfig

NewGrokConfig returns a GrokConfig with default values.

type HTTP

type HTTP struct {
	// contains filtered or unexported fields
}

HTTP is a processor that performs an HTTP request using the message as the request body, and returns the response.

func (*HTTP) ProcessMessage

func (h *HTTP) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type HTTPConfig

type HTTPConfig struct {
	Client      client.Config `json:"request" yaml:"request"`
	Parallel    bool          `json:"parallel" yaml:"parallel"`
	MaxParallel int           `json:"max_parallel" yaml:"max_parallel"`
}

HTTPConfig contains configuration fields for the HTTP processor.

func NewHTTPConfig

func NewHTTPConfig() HTTPConfig

NewHTTPConfig returns a HTTPConfig with default values.

type Hash

type Hash struct {
	// contains filtered or unexported fields
}

Hash is a processor that can selectively hash parts of a message following a chosen algorithm.

func (*Hash) ProcessMessage

func (c *Hash) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type HashConfig

type HashConfig struct {
	Parts     []int  `json:"parts" yaml:"parts"`
	Algorithm string `json:"algorithm" yaml:"algorithm"`
}

HashConfig contains configuration fields for the Hash processor.

func NewHashConfig

func NewHashConfig() HashConfig

NewHashConfig returns a HashConfig with default values.

type HashSample

type HashSample struct {
	// contains filtered or unexported fields
}

HashSample is a processor that removes messages based on a sample factor by hashing its contents.

func (*HashSample) ProcessMessage

func (s *HashSample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type HashSampleConfig

type HashSampleConfig struct {
	RetainMin float64 `json:"retain_min" yaml:"retain_min"`
	RetainMax float64 `json:"retain_max" yaml:"retain_max"`
	Parts     []int   `json:"parts" yaml:"parts"` // message parts to hash
}

HashSampleConfig contains configuration fields for the HashSample processor.

func NewHashSampleConfig

func NewHashSampleConfig() HashSampleConfig

NewHashSampleConfig returns a HashSampleConfig with default values.

type InsertPart

type InsertPart struct {
	// contains filtered or unexported fields
}

InsertPart is a processor that inserts a new message part at a specific index.

func (*InsertPart) ProcessMessage

func (p *InsertPart) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type InsertPartConfig

type InsertPartConfig struct {
	Index   int    `json:"index" yaml:"index"`
	Content string `json:"content" yaml:"content"`
}

InsertPartConfig contains configuration fields for the InsertPart processor.

func NewInsertPartConfig

func NewInsertPartConfig() InsertPartConfig

NewInsertPartConfig returns a InsertPartConfig with default values.

type JMESPath

type JMESPath struct {
	// contains filtered or unexported fields
}

JMESPath is a processor that executes JMESPath queries on a message part and replaces the contents with the result.

func (*JMESPath) ProcessMessage

func (p *JMESPath) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type JMESPathConfig

type JMESPathConfig struct {
	Parts []int  `json:"parts" yaml:"parts"`
	Query string `json:"query" yaml:"query"`
}

JMESPathConfig contains configuration fields for the JMESPath processor.

func NewJMESPathConfig

func NewJMESPathConfig() JMESPathConfig

NewJMESPathConfig returns a JMESPathConfig with default values.

type JSON

type JSON struct {
	// contains filtered or unexported fields
}

JSON is a processor that performs an operation on a JSON payload.

func (*JSON) ProcessMessage

func (p *JSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type JSONConfig

type JSONConfig struct {
	Parts    []int        `json:"parts" yaml:"parts"`
	Operator string       `json:"operator" yaml:"operator"`
	Path     string       `json:"path" yaml:"path"`
	Value    rawJSONValue `json:"value" yaml:"value"`
}

JSONConfig contains configuration fields for the JSON processor.

func NewJSONConfig

func NewJSONConfig() JSONConfig

NewJSONConfig returns a JSONConfig with default values.

type MergeJSON

type MergeJSON struct {
	// contains filtered or unexported fields
}

MergeJSON is a processor that merges JSON parsed message parts into a single value.

func (*MergeJSON) ProcessMessage

func (p *MergeJSON) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type MergeJSONConfig

type MergeJSONConfig struct {
	Parts       []int `json:"parts" yaml:"parts"`
	RetainParts bool  `json:"retain_parts" yaml:"retain_parts"`
}

MergeJSONConfig contains configuration fields for the MergeJSON processor.

func NewMergeJSONConfig

func NewMergeJSONConfig() MergeJSONConfig

NewMergeJSONConfig returns a MergeJSONConfig with default values.

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

Metadata is a processor that performs an operation on the Metadata of a message.

func (*Metadata) ProcessMessage

func (p *Metadata) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type MetadataConfig

type MetadataConfig struct {
	Parts    []int  `json:"parts" yaml:"parts"`
	Operator string `json:"operator" yaml:"operator"`
	Key      string `json:"key" yaml:"key"`
	Value    string `json:"value" yaml:"value"`
}

MetadataConfig contains configuration fields for the Metadata processor.

func NewMetadataConfig

func NewMetadataConfig() MetadataConfig

NewMetadataConfig returns a MetadataConfig with default values.

type Metric

type Metric struct {
	// contains filtered or unexported fields
}

Metric is a processor that creates a metric from extracted values from a message part.

func (*Metric) ProcessMessage

func (m *Metric) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message

type MetricConfig

type MetricConfig struct {
	Type   string            `json:"type" yaml:"type"`
	Path   string            `json:"path" yaml:"path"`
	Labels map[string]string `json:"labels" yaml:"labels"`
	Value  string            `json:"value" yaml:"value"`
}

MetricConfig contains configuration fields for the Metric processor.

func NewMetricConfig

func NewMetricConfig() MetricConfig

NewMetricConfig returns a MetricConfig with default values.

type Noop

type Noop struct {
}

Noop is a no-op processor that does nothing.

func (*Noop) ProcessMessage

func (c *Noop) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage does nothing and returns the message unchanged.

type ProcessField

type ProcessField struct {
	// contains filtered or unexported fields
}

ProcessField is a processor that applies a list of child processors to a field extracted from the original payload.

func (*ProcessField) ProcessMessage

func (p *ProcessField) ProcessMessage(msg types.Message) (msgs []types.Message, res types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type ProcessFieldConfig

type ProcessFieldConfig struct {
	Parts      []int    `json:"parts" yaml:"parts"`
	Path       string   `json:"path" yaml:"path"`
	Processors []Config `json:"processors" yaml:"processors"`
}

ProcessFieldConfig is a config struct containing fields for the ProcessField processor.

func NewProcessFieldConfig

func NewProcessFieldConfig() ProcessFieldConfig

NewProcessFieldConfig returns a default ProcessFieldConfig.

type ProcessMap

type ProcessMap struct {
	// contains filtered or unexported fields
}

ProcessMap is a processor that applies a list of child processors to a new payload mapped from the original, and after processing attempts to overlay the results back onto the original payloads according to more mappings.

func (*ProcessMap) ProcessMessage

func (p *ProcessMap) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type ProcessMapConfig

type ProcessMapConfig struct {
	Parts           []int              `json:"parts" yaml:"parts"`
	Conditions      []condition.Config `json:"conditions" yaml:"conditions"`
	Premap          map[string]string  `json:"premap" yaml:"premap"`
	PremapOptional  map[string]string  `json:"premap_optional" yaml:"premap_optional"`
	Postmap         map[string]string  `json:"postmap" yaml:"postmap"`
	PostmapOptional map[string]string  `json:"postmap_optional" yaml:"postmap_optional"`
	Processors      []Config           `json:"processors" yaml:"processors"`
}

ProcessMapConfig is a config struct containing fields for the ProcessMap processor.

func NewProcessMapConfig

func NewProcessMapConfig() ProcessMapConfig

NewProcessMapConfig returns a default ProcessMapConfig.

type Sample

type Sample struct {
	// contains filtered or unexported fields
}

Sample is a processor that drops messages based on a random sample.

func (*Sample) ProcessMessage

func (s *Sample) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type SampleConfig

type SampleConfig struct {
	Retain     float64 `json:"retain" yaml:"retain"`
	RandomSeed int64   `json:"seed" yaml:"seed"`
}

SampleConfig contains configuration fields for the Sample processor.

func NewSampleConfig

func NewSampleConfig() SampleConfig

NewSampleConfig returns a SampleConfig with default values.

type SelectParts

type SelectParts struct {
	// contains filtered or unexported fields
}

SelectParts is a processor that selects parts from a message to append to a new message.

func (*SelectParts) ProcessMessage

func (m *SelectParts) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type SelectPartsConfig

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains configuration fields for the SelectParts processor.

func NewSelectPartsConfig

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type Split

type Split struct {
	// contains filtered or unexported fields
}

Split is a processor that splits messages into a message per part.

func (*Split) ProcessMessage

func (s *Split) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type SplitConfig

type SplitConfig struct {
	Size int `json:"size" yaml:"size"`
}

SplitConfig is a configuration struct containing fields for the Split processor, which breaks message batches down into batches of a smaller size.

func NewSplitConfig

func NewSplitConfig() SplitConfig

NewSplitConfig returns a SplitConfig with default values.

type Text

type Text struct {
	// contains filtered or unexported fields
}

Text is a processor that performs a text based operation on a payload.

func (*Text) ProcessMessage

func (t *Text) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type TextConfig

type TextConfig struct {
	Parts    []int  `json:"parts" yaml:"parts"`
	Operator string `json:"operator" yaml:"operator"`
	Arg      string `json:"arg" yaml:"arg"`
	Value    string `json:"value" yaml:"value"`
}

TextConfig contains configuration fields for the Text processor.

func NewTextConfig

func NewTextConfig() TextConfig

NewTextConfig returns a TextConfig with default values.

type Throttle

type Throttle struct {
	// contains filtered or unexported fields
}

Throttle is a processor that limits the stream of a pipeline to one message batch per period specified.

func (*Throttle) ProcessMessage

func (m *Throttle) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type ThrottleConfig

type ThrottleConfig struct {
	Period string `json:"period" yaml:"period"`
}

ThrottleConfig contains configuration fields for the Throttle processor.

func NewThrottleConfig

func NewThrottleConfig() ThrottleConfig

NewThrottleConfig returns a ThrottleConfig with default values.

type Type

type Type interface {
	// ProcessMessage attempts to process a message. Since processing can fail
	// this call returns both a slice of messages in case of success or a
	// response in case of failure. If the slice of messages is empty the
	// response should be returned to the source.
	ProcessMessage(msg types.Message) ([]types.Message, types.Response)
}

Type reads a message, performs a processing operation, and returns either a slice of messages resulting from the process to be propagated through the pipeline, or a response that should be sent back to the source instead.

func New

func New(
	conf Config,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (Type, error)

New creates a processor type based on a processor configuration.

func NewArchive

func NewArchive(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewArchive returns a Archive processor.

func NewBatch

func NewBatch(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewBatch returns a Batch processor.

func NewBoundsCheck

func NewBoundsCheck(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewBoundsCheck returns a BoundsCheck processor.

func NewCombine

func NewCombine(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCombine returns a Combine processor.

func NewCompress

func NewCompress(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewCompress returns a Compress processor.

func NewConditional

func NewConditional(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewConditional returns a Conditional processor.

func NewDecode

func NewDecode(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDecode returns a Decode processor.

func NewDecompress

func NewDecompress(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDecompress returns a Decompress processor.

func NewDedupe

func NewDedupe(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewDedupe returns a Dedupe processor.

func NewEncode

func NewEncode(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewEncode returns a Encode processor.

func NewFilter

func NewFilter(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewFilter returns a Filter processor.

func NewFilterParts

func NewFilterParts(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewFilterParts returns a FilterParts processor.

func NewGrok

func NewGrok(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewGrok returns a Grok processor.

func NewHTTP

func NewHTTP(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewHTTP returns a HTTP processor.

func NewHash

func NewHash(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewHash returns a Hash processor.

func NewHashSample

func NewHashSample(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewHashSample returns a HashSample processor.

func NewInsertPart

func NewInsertPart(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewInsertPart returns a InsertPart processor.

func NewJMESPath

func NewJMESPath(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewJMESPath returns a JMESPath processor.

func NewJSON

func NewJSON(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewJSON returns a JSON processor.

func NewMergeJSON

func NewMergeJSON(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewMergeJSON returns a MergeJSON processor.

func NewMetadata

func NewMetadata(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewMetadata returns a Metadata processor.

func NewMetric

func NewMetric(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewMetric returns a Metric processor.

func NewNoop

func NewNoop(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewNoop returns a Noop processor.

func NewProcessField

func NewProcessField(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewProcessField returns a ProcessField processor.

func NewProcessMap

func NewProcessMap(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewProcessMap returns a ProcessField processor.

func NewSample

func NewSample(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSample returns a Sample processor.

func NewSelectParts

func NewSelectParts(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSelectParts returns a SelectParts processor.

func NewSplit

func NewSplit(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewSplit returns a Split processor.

func NewText

func NewText(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewText returns a Text processor.

func NewThrottle

func NewThrottle(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewThrottle returns a Throttle processor.

func NewUnarchive

func NewUnarchive(
	conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error)

NewUnarchive returns a Unarchive processor.

type TypeSpec

type TypeSpec struct {
	// contains filtered or unexported fields
}

TypeSpec Constructor and a usage description for each processor type.

type Unarchive

type Unarchive struct {
	// contains filtered or unexported fields
}

Unarchive is a processor that can selectively unarchive parts of a message following a chosen archive type.

func (*Unarchive) ProcessMessage

func (d *Unarchive) ProcessMessage(msg types.Message) ([]types.Message, types.Response)

ProcessMessage applies the processor to a message, either creating >0 resulting messages or a response to be sent back to the message source.

type UnarchiveConfig

type UnarchiveConfig struct {
	Format string `json:"format" yaml:"format"`
	Parts  []int  `json:"parts" yaml:"parts"`
}

UnarchiveConfig contains configuration fields for the Unarchive processor.

func NewUnarchiveConfig

func NewUnarchiveConfig() UnarchiveConfig

NewUnarchiveConfig returns a UnarchiveConfig with default values.

Directories

Path Synopsis
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.
Package condition contains logical operators that, based on their configuration, return boolean values from messages under certain circumstances.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL