buffer

package
v3.58.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2021 License: MIT Imports: 19 Imported by: 5

Documentation

Overview

Package buffer is both a types.Consumer and types.Producer implementation that is able to sit between other stream components, effectively decoupling their transaction channels by storing messages in a buffer implementation.

Buffers are not needed within Benthos, and should not be used unless there is a specific problem to be solved with one.

Index

Constants

View Source
const (
	TypeMemory = "memory"
	TypeNone   = "none"
)

String constants representing each buffer type.

Variables

View Source
var Constructors = map[string]TypeSpec{}

Constructors is a map of all buffer types with their specs.

Functions

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

func SanitiseConfig

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

func WalkConstructors added in v3.43.0

func WalkConstructors(fn func(ConstructorFunc, docs.ComponentSpec))

WalkConstructors iterates each component constructor.

Types

type Config

type Config struct {
	Type   string       `json:"type" yaml:"type"`
	Memory MemoryConfig `json:"memory" yaml:"memory"`
	None   struct{}     `json:"none" yaml:"none"`
	Plugin interface{}  `json:"plugin,omitempty" yaml:"plugin,omitempty"`
}

Config is the all encompassing configuration struct for all buffer types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

func (Config) Sanitised added in v3.24.0

func (conf Config) Sanitised(removeDeprecated bool) (interface{}, error)

Sanitised returns a sanitised version of the config, meaning sections that aren't relevant to behaviour are removed. Also optionally removes deprecated fields.

func (*Config) UnmarshalYAML

func (conf *Config) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type ConstructorFunc added in v3.43.0

type ConstructorFunc func(Config, types.Manager, log.Modular, metrics.Type) (Type, error)

ConstructorFunc is a func signature able to construct a buffer.

type Empty

type Empty struct {
	// contains filtered or unexported fields
}

Empty is an empty buffer, simply forwards messages on directly.

func (*Empty) CloseAsync

func (e *Empty) CloseAsync()

CloseAsync shuts down the StackBuffer output and stops processing messages.

func (*Empty) Consume

func (e *Empty) Consume(msgs <-chan types.Transaction) error

Consume assigns a messages channel for the output to read.

func (*Empty) ErrorsChan

func (e *Empty) ErrorsChan() <-chan []error

ErrorsChan returns the errors channel.

func (*Empty) StopConsuming

func (e *Empty) StopConsuming()

StopConsuming instructs the buffer to no longer consume data.

func (*Empty) TransactionChan

func (e *Empty) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this input.

func (*Empty) WaitForClose

func (e *Empty) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the StackBuffer output has closed down.

type EnabledBatchPolicyConfig

type EnabledBatchPolicyConfig struct {
	Enabled            bool `json:"enabled" yaml:"enabled"`
	batch.PolicyConfig `json:",inline" yaml:",inline"`
}

EnabledBatchPolicyConfig is a batch.PolicyConfig with an enable field.

type MemoryConfig

type MemoryConfig struct {
	Limit       int                      `json:"limit" yaml:"limit"`
	BatchPolicy EnabledBatchPolicyConfig `json:"batch_policy" yaml:"batch_policy"`
}

MemoryConfig is config values for a purely memory based ring buffer type.

func NewMemoryConfig

func NewMemoryConfig() MemoryConfig

NewMemoryConfig creates a new MemoryConfig with default values.

type Parallel

type Parallel interface {
	// NextMessage reads the next oldest message, the message is preserved until
	// the returned AckFunc is called.
	NextMessage() (types.Message, parallel.AckFunc, error)

	// PushMessage adds a new message to the stack. Returns the backlog in
	// bytes.
	PushMessage(types.Message) (int, error)

	// CloseOnceEmpty closes the Buffer once the buffer has been emptied, this
	// is a way for a writer to signal to a reader that it is finished writing
	// messages, and therefore the reader can close once it is caught up. This
	// call blocks until the close is completed.
	CloseOnceEmpty()

	// Close closes the Buffer so that blocked readers or writers become
	// unblocked.
	Close()
}

Parallel represents a method of buffering messages such that they can be consumed by any number of parallel consumers, and can be acknowledged in any order.

type ParallelBatcher

type ParallelBatcher struct {
	// contains filtered or unexported fields
}

ParallelBatcher wraps a buffer with a Producer/Consumer interface.

func (*ParallelBatcher) CloseAsync

func (m *ParallelBatcher) CloseAsync()

CloseAsync shuts down the ParallelBatcher and stops processing messages.

func (*ParallelBatcher) Consume

func (m *ParallelBatcher) Consume(msgs <-chan types.Transaction) error

Consume assigns a messages channel for the output to read.

func (*ParallelBatcher) StopConsuming

func (m *ParallelBatcher) StopConsuming()

StopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.

func (*ParallelBatcher) TransactionChan

func (m *ParallelBatcher) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this buffer.

func (*ParallelBatcher) WaitForClose

func (m *ParallelBatcher) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the ParallelBatcher output has closed down.

type ParallelWrapper

type ParallelWrapper struct {
	// contains filtered or unexported fields
}

ParallelWrapper wraps a buffer with a Producer/Consumer interface.

func (*ParallelWrapper) CloseAsync

func (m *ParallelWrapper) CloseAsync()

CloseAsync shuts down the ParallelWrapper and stops processing messages.

func (*ParallelWrapper) Consume

func (m *ParallelWrapper) Consume(msgs <-chan types.Transaction) error

Consume assigns a messages channel for the output to read.

func (*ParallelWrapper) StopConsuming

func (m *ParallelWrapper) StopConsuming()

StopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.

func (*ParallelWrapper) TransactionChan

func (m *ParallelWrapper) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this buffer.

func (*ParallelWrapper) WaitForClose

func (m *ParallelWrapper) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the ParallelWrapper output has closed down.

type Single

type Single interface {
	// ShiftMessage removes the oldest message from the stack. Returns the
	// backlog in bytes.
	ShiftMessage() (int, error)

	// NextMessage reads the oldest message, the message is preserved until
	// ShiftMessage is called.
	NextMessage() (types.Message, error)

	// PushMessage adds a new message to the stack. Returns the backlog in
	// bytes.
	PushMessage(types.Message) (int, error)

	// CloseOnceEmpty closes the Buffer once the buffer has been emptied, this
	// is a way for a writer to signal to a reader that it is finished writing
	// messages, and therefore the reader can close once it is caught up. This
	// call blocks until the close is completed.
	CloseOnceEmpty()

	// Close closes the Buffer so that blocked readers or writers become
	// unblocked.
	Close()
}

Single represents a method of buffering sequential messages, supporting only a single, sequential consumer.

type SingleWrapper

type SingleWrapper struct {
	// contains filtered or unexported fields
}

SingleWrapper wraps a buffer with a Producer/Consumer interface.

func (*SingleWrapper) CloseAsync

func (m *SingleWrapper) CloseAsync()

CloseAsync shuts down the SingleWrapper and stops processing messages.

func (*SingleWrapper) Consume

func (m *SingleWrapper) Consume(msgs <-chan types.Transaction) error

Consume assigns a messages channel for the output to read.

func (*SingleWrapper) StopConsuming

func (m *SingleWrapper) StopConsuming()

StopConsuming instructs the buffer to stop consuming messages and close once the buffer is empty.

func (*SingleWrapper) TransactionChan

func (m *SingleWrapper) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this buffer.

func (*SingleWrapper) WaitForClose

func (m *SingleWrapper) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the SingleWrapper output has closed down.

type Type

type Type interface {
	types.Producer
	types.Consumer
	types.Closable

	// StopConsuming instructs the buffer to cut off the producer it is
	// consuming from. It will then enter a mode whereby messages can only be
	// read, and when the buffer is empty it will shut down.
	StopConsuming()
}

Type is an interface implemented by all buffer types.

func New

func New(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)

New creates a buffer type based on a buffer configuration.

func NewEmpty

func NewEmpty(config Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)

NewEmpty creates a new buffer interface but doesn't buffer messages.

func NewMemory

func NewMemory(config Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)

NewMemory creates a buffer held in memory.

func NewParallelBatcher

func NewParallelBatcher(
	batcher *batch.Policy,
	child Type,
	log log.Modular,
	stats metrics.Type,
) Type

NewParallelBatcher creates a new Producer/Consumer around a buffer.

func NewParallelWrapper

func NewParallelWrapper(
	conf Config,
	buffer Parallel,
	log log.Modular,
	stats metrics.Type,
) Type

NewParallelWrapper creates a new Producer/Consumer around a buffer.

func NewSingleWrapper

func NewSingleWrapper(
	conf Config,
	buffer Single,
	log log.Modular,
	stats metrics.Type,
) Type

NewSingleWrapper creates a new Producer/Consumer around a buffer.

type TypeSpec

type TypeSpec struct {
	Summary     string
	Description string
	Footnotes   string

	FieldSpecs docs.FieldSpecs
	Status     docs.Status
	Version    string
	// contains filtered or unexported fields
}

TypeSpec is a constructor and usage description for each buffer type.

Directories

Path Synopsis
Package parallel contains implementations of various buffer types where the buffer can be consumed by any number of parallel consumer threads.
Package parallel contains implementations of various buffer types where the buffer can be consumed by any number of parallel consumer threads.
Package single contains implementations of various buffer types where the buffer can only be consumed by a single thread (but any number of writers).
Package single contains implementations of various buffer types where the buffer can only be consumed by a single thread (but any number of writers).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL