processor

package
v0.6.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2018 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package processor contains logical message processors that can be pipelined within benthos using the pipeline.Processor type.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

Types

type BlobToMulti

type BlobToMulti struct {
	// contains filtered or unexported fields
}

BlobToMulti is a processor that takes messages with a single part in a benthos multiple part blob format and decodes them into multiple part messages.

func (*BlobToMulti) ProcessMessage

func (m *BlobToMulti) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage takes a message with 1 part in multiple part blob format and returns a multiple part message by decoding it.

type BoundsCheck

type BoundsCheck struct {
	// contains filtered or unexported fields
}

BoundsCheck is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*BoundsCheck) ProcessMessage

func (m *BoundsCheck) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage checks each message against a set of bounds.

type BoundsCheckConfig

type BoundsCheckConfig struct {
	MaxParts    int `json:"max_parts" yaml:"max_parts"`
	MinParts    int `json:"min_parts" yaml:"min_parts"`
	MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
}

BoundsCheckConfig contains any bounds configuration for the BoundsCheck processor.

func NewBoundsCheckConfig

func NewBoundsCheckConfig() BoundsCheckConfig

NewBoundsCheckConfig returns a BoundsCheckConfig with default values.

type Combine added in v0.3.1

type Combine struct {
	// contains filtered or unexported fields
}

Combine is a processor that takes messages with a single part in a benthos multiple part blob format and decodes them into multiple part messages.

func (*Combine) ProcessMessage added in v0.3.1

func (c *Combine) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage takes a single message and buffers it, drops it, returning a NoAck response, until eventually it has N buffered messages, at which point it combines those messages into one multiple part message which is sent on.

type CombineConfig added in v0.3.1

type CombineConfig struct {
	Parts int `json:"parts" yaml:"parts"`
}

CombineConfig contains configuration for the Combine processor.

func NewCombineConfig added in v0.3.1

func NewCombineConfig() CombineConfig

NewCombineConfig returns a CombineConfig with default values.

type Config

type Config struct {
	Type        string            `json:"type" yaml:"type"`
	BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"`
	SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"`
	BlobToMulti struct{}          `json:"blob_to_multi" yaml:"blob_to_multi"`
	MultiToBlob struct{}          `json:"multi_to_blob" yaml:"multi_to_blob"`
	Sample      SampleConfig      `json:"sample" yaml:"sample"`
	Combine     CombineConfig     `json:"combine" yaml:"combine"`
}

Config is the all encompassing configuration struct for all processor types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

func (*Config) UnmarshalJSON

func (m *Config) UnmarshalJSON(bytes []byte) error

UnmarshalJSON ensures that when parsing configs that are in a slice the default values are still applied.

func (*Config) UnmarshalYAML

func (m *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a slice the default values are still applied.

type MultiToBlob

type MultiToBlob struct {
	// contains filtered or unexported fields
}

MultiToBlob is a processor that takes messages with potentially multiple parts and converts them into a single part message using the benthos binary format. This message can be converted back to multiple parts using the BlobToMulti processor.

func (MultiToBlob) ProcessMessage

func (m MultiToBlob) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage takes a message of > 0 parts and returns a single part message that can be later converted back to the original parts.

type Noop added in v0.6.5

type Noop struct {
}

Noop is a no-op processor that does nothing.

func (*Noop) ProcessMessage added in v0.6.5

func (c *Noop) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage does nothing and returns the message unchanged.

type Sample added in v0.3.2

type Sample struct {
	// contains filtered or unexported fields
}

Sample is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*Sample) ProcessMessage added in v0.3.2

func (s *Sample) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage checks each message against a set of bounds.

type SampleConfig added in v0.3.2

type SampleConfig struct {
	Retain     float64 `json:"retain"`
	RandomSeed int64   `json:"seed"`
}

SampleConfig contains any configuration for the Sample processor.

func NewSampleConfig added in v0.3.2

func NewSampleConfig() SampleConfig

NewSampleConfig returns a SampleConfig with default values.

type SelectParts added in v0.4.1

type SelectParts struct {
	// contains filtered or unexported fields
}

SelectParts is a processor that checks each message against a set of bounds and rejects messages if they aren't within them.

func (*SelectParts) ProcessMessage added in v0.4.1

func (m *SelectParts) ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)

ProcessMessage extracts a set of parts from each message.

type SelectPartsConfig added in v0.4.1

type SelectPartsConfig struct {
	Parts []int `json:"parts" yaml:"parts"`
}

SelectPartsConfig contains any configuration for the SelectParts processor.

func NewSelectPartsConfig added in v0.4.1

func NewSelectPartsConfig() SelectPartsConfig

NewSelectPartsConfig returns a SelectPartsConfig with default values.

type Type

type Type interface {
	// ProcessMessage attempts to process a message. Since processing can fail
	// this call returns both a message in case of success, a response in case
	// of failure, and a bool flag indicating (true == success) which of the two
	// should be used.
	ProcessMessage(msg *types.Message) (*types.Message, types.Response, bool)
}

Type reads a message, performs a processing operation, and returns a message and a flag indicating whether that message should be propagated or not.

func New

func New(conf Config, log log.Modular, stats metrics.Type) (Type, error)

New creates a processor type based on a processor configuration.

func NewBlobToMulti

func NewBlobToMulti(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewBlobToMulti returns a BlobToMulti processor.

func NewBoundsCheck

func NewBoundsCheck(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewBoundsCheck returns a BoundsCheck processor.

func NewCombine added in v0.3.1

func NewCombine(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewCombine returns a Combine processor.

func NewMultiToBlob

func NewMultiToBlob(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewMultiToBlob returns a MultiToBlob processor.

func NewNoop added in v0.6.5

func NewNoop(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewNoop returns a Noop processor.

func NewSample added in v0.3.2

func NewSample(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSample returns a Sample processor.

func NewSelectParts added in v0.4.1

func NewSelectParts(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSelectParts returns a SelectParts processor.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL