kafkareceiver

package module
v0.77.0-dev1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

README

Kafka Receiver

Status
Stability beta: metrics, logs, traces
Distributions contrib

Kafka receiver receives traces, metrics, and logs from Kafka. Message payload encoding is configurable.

Note that metrics and logs only support OTLP.

Getting Started

The following settings are required:

  • protocol_version (no default): Kafka protocol version e.g. 2.0.0

The following settings can be optionally configured:

  • brokers (default = localhost:9092): The list of kafka brokers
  • topic (default = otlp_spans): The name of the kafka topic to read from
  • encoding (default = otlp_proto): The encoding of the payload received from kafka. Available encodings:
    • otlp_proto: the payload is deserialized to ExportTraceServiceRequest, ExportLogsServiceRequest or ExportMetricsServiceRequest respectively.
    • jaeger_proto: the payload is deserialized to a single Jaeger proto Span.
    • jaeger_json: the payload is deserialized to a single Jaeger JSON Span using jsonpb.
    • zipkin_proto: the payload is deserialized into a list of Zipkin proto spans.
    • zipkin_json: the payload is deserialized into a list of Zipkin V2 JSON spans.
    • zipkin_thrift: the payload is deserialized into a list of Zipkin Thrift spans.
    • raw: (logs only) the payload's bytes are inserted as the body of a log record.
  • group_id (default = otel-collector): The consumer group that receiver will be consuming messages from
  • client_id (default = otel-collector): The consumer client ID that receiver will use
  • initial_offset (default = latest): The initial offset to use if no offset was previously committed. Must be latest or earliest.
  • auth
    • plain_text
      • username: The username to use.
      • password: The password to use
    • tls
      • ca_file: path to the CA cert. For a client this verifies the server certificate. Should only be used if insecure is set to true.
      • cert_file: path to the TLS cert to use for TLS required connections. Should only be used if insecure is set to true.
      • key_file: path to the TLS key to use for TLS required connections. Should only be used if insecure is set to true.
      • insecure (default = false): Disable verifying the server's certificate chain and host name (InsecureSkipVerify in the tls config)
      • server_name_override: ServerName indicates the name of the server requested by the client in order to support virtual hosting.
    • kerberos
      • service_name: Kerberos service name
      • realm: Kerberos realm
      • use_keytab: Use of keytab instead of password, if this is true, keytab file will be used instead of password
      • username: The Kerberos username used for authenticate with KDC
      • password: The Kerberos password used for authenticate with KDC
      • config_file: Path to Kerberos configuration. i.e /etc/krb5.conf
      • keytab_file: Path to keytab file. i.e /etc/security/kafka.keytab
  • metadata
    • full (default = true): Whether to maintain a full set of metadata. When disabled the client does not make the initial request to broker at the startup.
    • retry
      • max (default = 3): The number of retries to get metadata
      • backoff (default = 250ms): How long to wait between metadata retries
  • autocommit
    • enable: (default = true) Whether or not to auto-commit updated offsets back to the broker
    • interval: (default = 1s) How frequently to commit updated offsets. Ineffective unless auto-commit is enabled
  • message_marking:
    • after: (default = false) If true, the messages are marked after the pipeline execution
    • on_error: (default = false) If false, only the successfully processed messages are marked Note: this can block the entire partition in case a message processing returns a permanent error

Example:

receivers:
  kafka:
    protocol_version: 2.0.0

Documentation

Overview

Package kafkareceiver receives traces from Kafka.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MetricViews

func MetricViews() []*view.View

MetricViews return metric views for Kafka receiver.

func NewFactory

func NewFactory(options ...FactoryOption) receiver.Factory

NewFactory creates Kafka receiver factory.

Types

type AutoCommit

type AutoCommit struct {
	// Whether or not to auto-commit updated offsets back to the broker.
	// (default enabled).
	Enable bool `mapstructure:"enable"`
	// How frequently to commit updated offsets. Ineffective unless
	// auto-commit is enabled (default 1s)
	Interval time.Duration `mapstructure:"interval"`
}

type Config

type Config struct {
	// The list of kafka brokers (default localhost:9092)
	Brokers []string `mapstructure:"brokers"`
	// Kafka protocol version
	ProtocolVersion string `mapstructure:"protocol_version"`
	// The name of the kafka topic to consume from (default "otlp_spans")
	Topic string `mapstructure:"topic"`
	// Encoding of the messages (default "otlp_proto")
	Encoding string `mapstructure:"encoding"`
	// The consumer group that receiver will be consuming messages from (default "otel-collector")
	GroupID string `mapstructure:"group_id"`
	// The consumer client ID that receiver will use (default "otel-collector")
	ClientID string `mapstructure:"client_id"`
	// The initial offset to use if no offset was previously committed.
	// Must be `latest` or `earliest` (default "latest").
	InitialOffset string `mapstructure:"initial_offset"`

	// Metadata is the namespace for metadata management properties used by the
	// Client, and shared by the Producer/Consumer.
	Metadata kafkaexporter.Metadata `mapstructure:"metadata"`

	Authentication kafkaexporter.Authentication `mapstructure:"auth"`

	// Controls the auto-commit functionality
	AutoCommit AutoCommit `mapstructure:"autocommit"`

	// Controls the way the messages are marked as consumed
	MessageMarking MessageMarking `mapstructure:"message_marking"`
}

Config defines configuration for Kafka receiver.

func (*Config) Validate

func (cfg *Config) Validate() error

Validate checks the receiver configuration is valid

type FactoryOption

type FactoryOption func(factory *kafkaReceiverFactory)

FactoryOption applies changes to kafkaExporterFactory.

func WithLogsUnmarshalers

func WithLogsUnmarshalers(logsUnmarshalers ...LogsUnmarshaler) FactoryOption

WithLogsUnmarshalers adds LogsUnmarshalers.

func WithMetricsUnmarshalers

func WithMetricsUnmarshalers(metricsUnmarshalers ...MetricsUnmarshaler) FactoryOption

WithMetricsUnmarshalers adds MetricsUnmarshalers.

func WithTracesUnmarshalers

func WithTracesUnmarshalers(tracesUnmarshalers ...TracesUnmarshaler) FactoryOption

WithTracesUnmarshalers adds Unmarshalers.

type LogsUnmarshaler

type LogsUnmarshaler interface {
	// Unmarshal deserializes the message body into traces.
	Unmarshal([]byte) (plog.Logs, error)

	// Encoding of the serialized messages.
	Encoding() string
}

LogsUnmarshaler deserializes the message body.

type MessageMarking

type MessageMarking struct {
	// If true, the messages are marked after the pipeline execution
	After bool `mapstructure:"after"`

	// If false, only the successfully processed messages are marked, it has no impact if
	// After is set to false.
	// Note: this can block the entire partition in case a message processing returns
	// a permanent error.
	OnError bool `mapstructure:"on_error"`
}

type MetricsUnmarshaler

type MetricsUnmarshaler interface {
	// Unmarshal deserializes the message body into traces
	Unmarshal([]byte) (pmetric.Metrics, error)

	// Encoding of the serialized messages
	Encoding() string
}

MetricsUnmarshaler deserializes the message body

type TracesUnmarshaler

type TracesUnmarshaler interface {
	// Unmarshal deserializes the message body into traces.
	Unmarshal([]byte) (ptrace.Traces, error)

	// Encoding of the serialized messages.
	Encoding() string
}

TracesUnmarshaler deserializes the message body.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL