Fluent Forward Receiver

This receiver runs a TCP server that accepts events via the Fluent Forward protocol.

This receiver is usually used Collector and Fluent Bit are used in chained configuration. For more details see FluentBit Subprocess Extension docs.

This receiver:

  • Does not support TLS or the handshake portion of the Forward protocol.
  • Does support acknowledgments of events that have the chunk option, as per the spec.
  • Supports all three event types (message, forward, packed forward, including compressed packed forward)
  • Supports listening on a Unix domain socket by making the listenAddress option of the form unix://<path to socket>.
  • If using TCP, it will start a UDP server on the same port to deliver heartbeat echos, as per the spec.

Here is a basic example config that makes the receiver listen on all interfaces on port 8006:



If you are working on this receiver and need to regenerate any of the message pack autogenerated code, just run go generate on this package and its subpackages. You can get the msgp binary by just running go get -u -t, and make sure the Go binary path is on your shell's PATH.

Expand ▾ Collapse ▴




This section is empty.


This section is empty.


func NewFactory

func NewFactory() component.ReceiverFactory


type AckResponse

type AckResponse struct {
	Ack string `msg:"ack"`

func (AckResponse) EncodeMsg

func (z AckResponse) EncodeMsg(en *msgp.Writer) error

type Collector

type Collector struct {
	// contains filtered or unexported fields

    Collector acts as an aggregator of LogRecords so that we don't have to generate as many pdata.Logs instances...we can pre-batch the LogRecord instances from several Forward events into one to hopefully reduce allocations and GC overhead.

    func (*Collector) Start

    func (c *Collector) Start(ctx context.Context)

    type Config

    type Config struct {
    	configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
    	// The address to listen on for incoming Fluent Forward events.  Should be
    	// of the form `<ip addr>:<port>` (TCP) or `unix://<socket_path>` (Unix
    	// domain socket).
    	ListenAddress string `mapstructure:"endpoint"`

      Config defines configuration for the SignalFx receiver.

      type Event

      type Event interface {
      	DecodeMsg(dc *msgp.Reader) error
      	LogRecords() pdata.LogSlice
      	Chunk() string
      	Compressed() string

      type EventMode

      type EventMode int
      const (
      	UnknownMode EventMode = iota

      func DetermineNextEventMode

      func DetermineNextEventMode(peeker Peeker) (EventMode, error)

        DetermineNextEventMode inspects the next bit of data from the given peeker reader to determine which type of event mode it is. According to the forward protocol spec: "Server MUST detect the carrier mode by inspecting the second element of the array." It is assumed that peeker is aligned at the start of a new event, otherwise the result is undefined and will probably error.

        func (EventMode) String

        func (em EventMode) String() string

        type EventTimeExt

        type EventTimeExt time.Time

        func (*EventTimeExt) ExtensionType

        func (*EventTimeExt) ExtensionType() int8

        func (*EventTimeExt) Len

        func (e *EventTimeExt) Len() int

        func (*EventTimeExt) MarshalBinaryTo

        func (e *EventTimeExt) MarshalBinaryTo(b []byte) error

        func (*EventTimeExt) UnmarshalBinary

        func (e *EventTimeExt) UnmarshalBinary(b []byte) error

        type ForwardEventLogRecords

        type ForwardEventLogRecords struct {

        func (*ForwardEventLogRecords) DecodeMsg

        func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) (err error)

        func (*ForwardEventLogRecords) LogRecords

        func (fe *ForwardEventLogRecords) LogRecords() pdata.LogSlice

        type MessageEventLogRecord

        type MessageEventLogRecord struct {

        func (*MessageEventLogRecord) DecodeMsg

        func (melr *MessageEventLogRecord) DecodeMsg(dc *msgp.Reader) error

        func (*MessageEventLogRecord) LogRecords

        func (melr *MessageEventLogRecord) LogRecords() pdata.LogSlice

        type OptionsMap

        type OptionsMap map[string]interface{}

        func (OptionsMap) Chunk

        func (om OptionsMap) Chunk() string

          Chunk returns the `chunk` option or blank string if it was not set.

          func (OptionsMap) Compressed

          func (om OptionsMap) Compressed() string

          type PackedForwardEventLogRecords

          type PackedForwardEventLogRecords struct {

          func (*PackedForwardEventLogRecords) DecodeMsg

          func (pfe *PackedForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) error

            DecodeMsg implements msgp.Decodable. This was originally code generated but then manually copied here in order to handle the optional Options field.

            func (*PackedForwardEventLogRecords) LogRecords

            func (pfe *PackedForwardEventLogRecords) LogRecords() pdata.LogSlice

            type Peeker

            type Peeker interface {
            	Peek(n int) ([]byte, error)


            Path Synopsis
            package observ contains logic pertaining to the internal observation of the fluent forward receiver.
            package observ contains logic pertaining to the internal observation of the fluent forward receiver.