v1.19.1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2021 License: Apache-2.0 Imports: 21 Imported by: 0


Fluent Forward Receiver

This receiver runs a TCP server that accepts events via the Fluent Forward protocol.

This receiver is usually used Collector and Fluent Bit are used in chained configuration. For more details see FluentBit Subprocess Extension docs.

This receiver:

  • Does not support TLS or the handshake portion of the Forward protocol.
  • Does support acknowledgments of events that have the chunk option, as per the spec.
  • Supports all three event types (message, forward, packed forward, including compressed packed forward)
  • Supports listening on a Unix domain socket by making the listenAddress option of the form unix://<path to socket>.
  • If using TCP, it will start a UDP server on the same port to deliver heartbeat echos, as per the spec.

Here is a basic example config that makes the receiver listen on all interfaces on port 8006:



If you are working on this receiver and need to regenerate any of the message pack autogenerated code, just run go generate on this package and its subpackages. You can get the msgp binary by just running go get -u -t github.com/tinylib/msgp, and make sure the Go binary path is on your shell's PATH.




This section is empty.


This section is empty.


func NewFactory

func NewFactory() component.ReceiverFactory


type AckResponse

type AckResponse struct {
	Ack string `msg:"ack"`

func (AckResponse) EncodeMsg

func (z AckResponse) EncodeMsg(en *msgp.Writer) error

type Collector

type Collector struct {
	// contains filtered or unexported fields

Collector acts as an aggregator of LogRecords so that we don't have to generate as many pdata.Logs instances...we can pre-batch the LogRecord instances from several Forward events into one to hopefully reduce allocations and GC overhead.

func (*Collector) Start

func (c *Collector) Start(ctx context.Context)

type Config

type Config struct {
	configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

	// The address to listen on for incoming Fluent Forward events.  Should be
	// of the form `<ip addr>:<port>` (TCP) or `unix://<socket_path>` (Unix
	// domain socket).
	ListenAddress string `mapstructure:"endpoint"`

Config defines configuration for the SignalFx receiver.

type Event

type Event interface {
	DecodeMsg(dc *msgp.Reader) error
	LogRecords() pdata.LogSlice
	Chunk() string
	Compressed() string

type EventMode

type EventMode int
const (
	UnknownMode EventMode = iota

func DetermineNextEventMode

func DetermineNextEventMode(peeker Peeker) (EventMode, error)

DetermineNextEventMode inspects the next bit of data from the given peeker reader to determine which type of event mode it is. According to the forward protocol spec: "Server MUST detect the carrier mode by inspecting the second element of the array." It is assumed that peeker is aligned at the start of a new event, otherwise the result is undefined and will probably error.

func (EventMode) String

func (em EventMode) String() string

type EventTimeExt

type EventTimeExt time.Time

func (*EventTimeExt) ExtensionType

func (*EventTimeExt) ExtensionType() int8

func (*EventTimeExt) Len

func (e *EventTimeExt) Len() int

func (*EventTimeExt) MarshalBinaryTo

func (e *EventTimeExt) MarshalBinaryTo(b []byte) error

func (*EventTimeExt) UnmarshalBinary

func (e *EventTimeExt) UnmarshalBinary(b []byte) error

type ForwardEventLogRecords

type ForwardEventLogRecords struct {

func (*ForwardEventLogRecords) DecodeMsg

func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) (err error)

func (*ForwardEventLogRecords) LogRecords

func (fe *ForwardEventLogRecords) LogRecords() pdata.LogSlice

type MessageEventLogRecord

type MessageEventLogRecord struct {

func (*MessageEventLogRecord) DecodeMsg

func (melr *MessageEventLogRecord) DecodeMsg(dc *msgp.Reader) error

func (*MessageEventLogRecord) LogRecords

func (melr *MessageEventLogRecord) LogRecords() pdata.LogSlice

type OptionsMap

type OptionsMap map[string]interface{}

func (OptionsMap) Chunk

func (om OptionsMap) Chunk() string

Chunk returns the `chunk` option or blank string if it was not set.

func (OptionsMap) Compressed

func (om OptionsMap) Compressed() string

type PackedForwardEventLogRecords

type PackedForwardEventLogRecords struct {

func (*PackedForwardEventLogRecords) DecodeMsg

func (pfe *PackedForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) error

DecodeMsg implements msgp.Decodable. This was originally code generated but then manually copied here in order to handle the optional Options field.

func (*PackedForwardEventLogRecords) LogRecords

func (pfe *PackedForwardEventLogRecords) LogRecords() pdata.LogSlice

type Peeker

type Peeker interface {
	Peek(n int) ([]byte, error)


Path Synopsis
package observ contains logic pertaining to the internal observation of the fluent forward receiver.
package observ contains logic pertaining to the internal observation of the fluent forward receiver.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL