README

Kafka Receiver

Kafka receiver receives traces from Kafka. Message payload encoding is configurable.

Supported pipeline types: traces, logs

Getting Started

The following settings are required:

  • protocol_version (no default): Kafka protocol version e.g. 2.0.0

The following settings can be optionally configured:

  • brokers (default = localhost:9092): The list of kafka brokers
  • topic (default = otlp_spans): The name of the kafka topic to read from
  • encoding (default = otlp_proto): The encoding of the payload sent to kafka. Available encodings:
    • otlp_proto: the payload is deserialized to ExportTraceServiceRequest.
    • jaeger_proto: the payload is deserialized to a single Jaeger proto Span.
    • jaeger_json: the payload is deserialized to a single Jaeger JSON Span using jsonpb.
    • zipkin_proto: the payload is deserialized into a list of Zipkin proto spans.
    • zipkin_json: the payload is deserialized into a list of Zipkin V2 JSON spans.
    • zipkin_thrift: the payload is deserialized into a list of Zipkin Thrift spans.
  • group_id (default = otel-collector): The consumer group that receiver will be consuming messages from
  • client_id (default = otel-collector): The consumer client ID that receiver will use
  • auth
    • plain_text
      • username: The username to use.
      • password: The password to use
    • tls
      • ca_file: path to the CA cert. For a client this verifies the server certificate. Should only be used if insecure is set to true.
      • cert_file: path to the TLS cert to use for TLS required connections. Should only be used if insecure is set to true.
      • key_file: path to the TLS key to use for TLS required connections. Should only be used if insecure is set to true.
      • insecure (default = false): Disable verifying the server's certificate chain and host name (InsecureSkipVerify in the tls config)
      • server_name_override: ServerName indicates the name of the server requested by the client in order to support virtual hosting.
    • kerberos
      • service_name: Kerberos service name
      • realm: Kerberos realm
      • use_keytab: Use of keytab instead of password, if this is true, keytab file will be used instead of password
      • username: The Kerberos username used for authenticate with KDC
      • password: The Kerberos password used for authenticate with KDC
      • config_file: Path to Kerberos configuration. i.e /etc/krb5.conf
      • keytab_file: Path to keytab file. i.e /etc/security/kafka.keytab
  • metadata
    • full (default = true): Whether to maintain a full set of metadata. When disabled the client does not make the initial request to broker at the startup.
    • retry
      • max (default = 3): The number of retries to get metadata
      • backoff (default = 250ms): How long to wait between metadata retries

Example:

receivers:
  kafka:
    protocol_version: 2.0.0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MetricViews

func MetricViews() []*view.View

MetricViews return metric views for Kafka receiver.

func NewFactory

func NewFactory(options ...FactoryOption) component.ReceiverFactory

NewFactory creates Kafka receiver factory.

Types

type Config

type Config struct {
	config.ReceiverSettings `mapstructure:",squash"`
	// The list of kafka brokers (default localhost:9092)
	Brokers []string `mapstructure:"brokers"`
	// Kafka protocol version
	ProtocolVersion string `mapstructure:"protocol_version"`
	// The name of the kafka topic to consume from (default "otlp_spans")
	Topic string `mapstructure:"topic"`
	// Encoding of the messages (default "otlp_proto")
	Encoding string `mapstructure:"encoding"`
	// The consumer group that receiver will be consuming messages from (default "otel-collector")
	GroupID string `mapstructure:"group_id"`
	// The consumer client ID that receiver will use (default "otel-collector")
	ClientID string `mapstructure:"client_id"`

	// Metadata is the namespace for metadata management properties used by the
	// Client, and shared by the Producer/Consumer.
	Metadata kafkaexporter.Metadata `mapstructure:"metadata"`

	Authentication kafkaexporter.Authentication `mapstructure:"auth"`
}

Config defines configuration for Kafka receiver.

func (*Config) Validate

func (cfg *Config) Validate() error

Validate checks the receiver configuration is valid

type FactoryOption

type FactoryOption func(factory *kafkaReceiverFactory)

FactoryOption applies changes to kafkaReceiverFactory.

func WithAddLogsUnmarshallers

func WithAddLogsUnmarshallers(encodingMarshaller map[string]LogsUnmarshaller) FactoryOption

WithAddLogsUnmarshallers adds LogsUnmarshallers.

func WithAddTracesUnmarshallers

func WithAddTracesUnmarshallers(encodingMarshaller map[string]TracesUnmarshaller) FactoryOption

WithAddTracesUnmarshallers adds marshallers.

type LogsUnmarshaller

type LogsUnmarshaller interface {
	// Unmarshal deserializes the message body into traces.
	Unmarshal([]byte) (pdata.Logs, error)

	// Encoding of the serialized messages.
	Encoding() string
}

LogsUnmarshaller deserializes the message body.

type TracesUnmarshaller

type TracesUnmarshaller interface {
	// Unmarshal deserializes the message body into traces.
	Unmarshal([]byte) (pdata.Traces, error)

	// Encoding of the serialized messages.
	Encoding() string
}

TracesUnmarshaller deserializes the message body.